Source: Store.js

/*-
 *
 *  This file is part of Oracle NoSQL Database
 *  Copyright (C) 2011, 2014 Oracle and/or its affiliates.  All rights reserved.
 *
 * If you have received this file as part of Oracle NoSQL Database the
 * following applies to the work as a whole:
 *
 *   Oracle NoSQL Database server software is free software: you can
 *   redistribute it and/or modify it under the terms of the GNU Affero
 *   General Public License as published by the Free Software Foundation,
 *   version 3.
 *
 *   Oracle NoSQL Database is distributed in the hope that it will be useful,
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 *   Affero General Public License for more details.
 *
 * If you have received this file as part of Oracle NoSQL Database Client or
 * distributed separately the following applies:
 *
 *   Oracle NoSQL Database client software is free software: you can
 *   redistribute it and/or modify it under the terms of the Apache License
 *   as published by the Apache Software Foundation, version 2.0.
 *
 * You should have received a copy of the GNU Affero General Public License
 * and/or the Apache License in the LICENSE file along with Oracle NoSQL
 * Database client or server distribution.  If not, see
 * <http://www.gnu.org/licenses/>
 * or
 * <http://www.apache.org/licenses/LICENSE-2.0>.
 *
 * An active Oracle commercial licensing agreement for this product supersedes
 * these licenses and in such case the license notices, but not the copyright
 * notice, may be removed by you in connection with your distribution that is
 * in accordance with the commercial licensing terms.
 *
 * For more information please contact:
 *
 * berkeleydb-info_us@oracle.com
 *
 */
"use strict";

/*global thrift*/
/*global kvLogger*/
/*global LOG_LEVELS*/
/*global Errors*/
/*global Types*/

var EventEmitter    = require('events').EventEmitter;
var Proxy           = require('./Proxy');
var Iterator        = require('./Iterator');
var ONDBClient      = require('./thrift/ONDB');
var ttypes          = require('./thrift/ondb_types');
var Parse           = require('./Parse').parse;
var Stringify       = require('./Stringify').stringify;
var Readable        = require('./Readable');

module.exports = Store;

util.inherits(Store, EventEmitter);
/**
 * Store constructor
 * @param {Configuration} configuration configuration object
 * @constructor
 */
function Store (    /*Configuration*/ configuration ) {
    kvLogger.info('New Store instance');
    EventEmitter.call(this);
    var self = this;
    var isConnected = false;
    var thriftClient, thriftConnection;
    var proxyConf = configuration.proxyConfiguration;

    function setClient(callback) {
        callback = callback || function(){};
        kvLogger.debug('Set kvClient to Proxy: ' + proxyConf.host);
        var host, port;
        if (proxyConf.startProxy) {
            host = "localhost";
            port = "5010";
        } else {
            var colon = proxyConf.host.indexOf(':');
            if (colon) {
                host = proxyConf.host.substr(0, colon);
                port = proxyConf.host.substr(colon + 1);
            } else {
                callback(new Error(Errors.PROXY_HOST_ERROR));
            }
        }
        thriftConnection = thrift.createConnection(host, port, {
            transport: thrift.TFramedTransport,
            protocol: thrift.TBinaryProtocol
        }).on('error', function (err) {
            kvLogger.debug('Error on thrift Connection' + err);
            callback(err);
            thriftConnection.removeAllListeners('error');
            thriftConnection.removeAllListeners('connect');
        }).on('connect', function (err) {
            kvLogger.debug('Thrift Connection successful');
            thriftClient = thrift.createClient(ONDBClient, thriftConnection);
            callback();
            thriftConnection.removeAllListeners('error');
            thriftConnection.removeAllListeners('connect');
        });
    }

    function verify(callback) {
        kvLogger.debug('Starting verify process');
        kvLogger.debug(configuration);
        callback = callback || function(){};
        var verify =
            new Types.VerifyProperties(
                proxyConf.kvStoreName,
                proxyConf.kvStoreHelperHosts,
                proxyConf.username );

        kvLogger.debug(verify);

        var timeout = setTimeout(function () {
            callback(new Error(Errors.PROXY_TIMEOUT))
        }, 2000);

        thriftClient.verify(verify, function (err) {
            if (err) callback(new Error(err));
            else callback();
            callback = null;
            clearTimeout(timeout);
        });
    }

    /**
     * This methods opens a connection to a kvstore server, it tries to start a thrift proxy if this is not specified
     * on configuration options. This process also calls the 'open' event in the listener protocol or 'error' if an error
     * occurs during opening.
     * @param {function} callback calls this function when the process finishes, will return an error object
     * if the opening process fail.
     * @method
     */
    this.open = function open(  /*function*/ callback, attempt ) {
        kvLogger.info('Store open');
        callback = callback || function(){};
        if (typeof attempt === 'undefined') attempt = 1; else attempt++;

        if (isConnected) {
            kvLogger.error(Errors.ALREADY_CONNECTED);
            callback(new Error(Errors.ALREADY_CONNECTED));
            callback = null;
            self.emit('error', new Error(Errors.ALREADY_CONNECTED));
            return;
        }
        if (attempt > proxyConf.connectionAttempts) {
            kvLogger.error(Errors.CONNECTION_ATTEMPTS);
            callback(new Error(Errors.CONNECTION_ATTEMPTS));
            callback = null;
            self.emit('error', new Error(Errors.CONNECTION_ATTEMPTS));
            return;
        }
        kvLogger.debug('Attempt to open: ' + attempt);

        kvLogger.info('Setting client');
        setClient(function (err) {
            if (err) {
                if (proxyConf.startProxy)
                    Proxy.startProxy(proxyConf, function (err) {
                        self.open(callback, attempt);
                    });
                else {
                    kvLogger.error(Errors.PROXY_CONNECTION_ERROR);
                    callback(new Error(Errors.PROXY_CONNECTION_ERROR));
                    callback = null;
                    self.emit('error', new Error(Errors.PROXY_CONNECTION_ERROR));
                }
            } else {
                verify(function (err) { // verify failed
                    if (err) {
                        kvLogger.error(Errors.PROXY_VERIFY_ERROR);
                        callback(err);
                        callback = null;
                        self.emit('error', err);
                    } else {
                        kvLogger.debug('Store connected to proxy');
                        isConnected = true;
                        callback();
                        callback = null;
                        self.emit('open');
                    }
                });
            }
        });
    };

    /**
     * Closes the connection.
     */
    this.close = function () {
        kvLogger.info('Store close');
        if (isConnected) {
            thriftConnection.end();
            isConnected = false;
            self.emit('close');
        }
    };

    /**
     * Attempt to shutdown the proxy related to the configuration of this Store
     */
    this.shutdownProxy = function (callback) {
        self.close();
        Proxy.stopProxy(proxyConf, callback );
    };


    /**
     * Gets the Row associated with the primary key.
     * @param {String} tableName the table name
     * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {ReadOptions} readOptions non-default options for the operation or null to get default behavior.
     * @param {function} callback a function that is called when the process finish.
     *
     */
    this.get = function (   /*String*/ tableName,
                            /*Object*/ primaryKey,
                            /*ReadOptions*/ readOptions,
                            /*function*/ callback) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        var _primaryKey;
        if (primaryKey)
            _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });
        thriftClient.get(tableName, _primaryKey, readOptions, function (err, response) {
            if (kvLogger.logLevel >= LOG_LEVELS.DEBUG)   // avoid unnecessary call
                kvLogger.debug('Return from get with err:' + err);
            if (err)      err = new Error(err);  // return an Error object
            if (response) response.currentRow = Parse(response.currentRow.jsonRow);
            callback(err, response);
        });
    };

    /**
     * Returns the rows associated with a partial primary key in an atomic manner. Rows are returned in primary key order.
     * The key used must ontain all of the fields defined for the table's shard key.
     * @param {String} tableName the table name
     * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {FieldRange} fieldRange The FieldRange to be used to restrict the range of the operation.
     * @param {Array} includedTables The list of tables to be included in an operation that returns
     * multiple rows or keys.
     * @param {ReadOptions} readOptions non-default options for the operation or null to get default behavior.
     * @param {function} callback a function that is called when the process finish.
     */
    this.multiGet = function (  /*String*/ tableName,
                                /*Object*/ primaryKey,
                                /*FieldRange*/ fieldRange,
                                /*Array*/ includedTables,
                                /*ReadOptions*/ readOptions,
                                /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        var _primaryKey;
        if (primaryKey)
            _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });
        thriftClient.multiGet(tableName, _primaryKey, fieldRange, includedTables, readOptions, function (err, response) {
            if (err)    err = new Error(err);  // return an Error object
            if (response) {
                for (var index = 0; index < response.rowsWithMetadata.length; index++) {
                    response.rowsWithMetadata[index].row = Parse(response.rowsWithMetadata[index].jsonRow);
                    delete response.rowsWithMetadata[index].jsonRow;
                }
            }
            callback(err, response);
        });

    };

    /**
     * Return the rows associated with a partial primary key in an atomic manner. Keys are returned in primary key order.
     * The key used must contain all of the fields defined for the table's shard key.
     * @param {String} tableName the table name
     * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {FieldRange} fieldRange The FieldRange to be used to restrict the range of the operation.
     * @param {Array} includedTables The list of tables to be included in an operation that returns
     * multiple rows or keys.
     * the primaryKey parameter is always included as a target.
     * @param {ReadOptions} readOptions non-default options for the operation or null to get default behavior.
     * @param {function} callback a function that is called when the process finish.
     */
    this.multiGetKeys = function (  /*String*/ tableName,
                                    /*Object*/ primaryKey,
                                    /*FieldRange*/ fieldRange,
                                    /*Array*/ includedTables,
                                    /*ReadOptions*/ readOptions,
                                    /*function*/ callback) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        var _primaryKey;
        if (primaryKey)
            _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });
        thriftClient.multiGetKeys(tableName, _primaryKey, fieldRange, includedTables, readOptions, function (err, response) {
            if (err)    err = new Error(err);  // return an Error object
            if (response) {
                for (var index = 0; index < response.rowsWithMetadata.length; index++) {
                    response.rowsWithMetadata[index].key = Parse(response.rowsWithMetadata[index].jsonRow);
                    delete response.rowsWithMetadata[index].jsonRow;
                }
            }
            callback(err, response);
        });
    };

    /**
     * Puts a row into a table. The row must contain a complete primary key and all required fields.
     * @param {String} tableName the table name
     * @param {Object} row the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {WriteOptions} writeOptions non-default arguments controlling the Durability of the operation,
     * or null to get default behaviour.
     * @param {function} callback a function that is called when the process finish.
     */
    this.put = function (   /*String*/ tableName,
                            /*Object*/ row,
                            /*WriteOptions*/ writeOptions,
                            /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        var _row = new ttypes.TRow({ jsonRow: Stringify(row) });
        thriftClient.put(tableName, _row, writeOptions, function (err, response) {
            if (kvLogger.logLevel >= LOG_LEVELS.DEBUG)   // avoid unnecessary call
                kvLogger.debug('Return from put with err:' + err);
            if (err)        err = new Error(err);  // return an Error object
            if (response)   response.previousRow = Parse(response.previousRow.jsonRow);
            callback(err, response);
        });
    };

    /**
     * Puts a row into a table, but only if the row does not exist. The row must contain a complete primary key
     * and all required fields.
     * @param {String} tableName the table name
     * @param {Object} row the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {WriteOptions} writeOptions non-default arguments controlling the Durability of the operation,
     * or null to get default behaviour.
     * @param {function} callback a function that is called when the process finish.
     */
    this.putIfAbsent = function (   /*String*/ tableName,
                                    /*Object*/ row,
                                    /*WriteOptions*/ writeOptions,
                                    /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        var _row = new ttypes.TRow({ jsonRow: Stringify(row) });
        thriftClient.putIfAbsent(tableName, _row, writeOptions, function (err, response) {
            if (kvLogger.logLevel >= LOG_LEVELS.DEBUG)   // avoid unnecessary call
                kvLogger.debug('Return from putIfAbsent with err:' + err);
            if (err)        err = new Error(err);  // return an Error object
            if (response)   response.previousRow = Parse(response.previousRow.jsonRow);
            callback(err, response);
        });
    };

    /**
     * Puts a row into a table, but only if the row already exists. The row must contain a complete primary key
     * and all required fields.
     * @param {String} tableName the table name
     * @param {Object} row the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {WriteOptions} writeOptions non-default arguments controlling the Durability of the operation,
     * or null to get default behaviour.
     * @param {function} callback a function that is called when the process finish.
     */
    this.putIfPresent = function (  /*String*/ tableName,
                                    /*Object*/ row,
                                    /*WriteOptions*/ writeOptions,
                                    /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        var _row = new ttypes.TRow({ jsonRow: Stringify(row) });
        thriftClient.putIfPresent(tableName, _row, writeOptions, function (err, response) {
            if (kvLogger.logLevel >= LOG_LEVELS.DEBUG)   // avoid unnecessary call
                kvLogger.debug('Return from putIfPresent with err:' + err);
            if (err)        err = new Error(err);  // return an Error object
            if (response)   response.previousRow = Parse(response.previousRow.jsonRow);
            callback(err, response);
        });
    };

    /**
     * Puts a row, but only if the version of the existing row matches the matchVersion argument. Used when
     * updating a value to ensure that it has not changed since it was last read. The row must contain a
     * complete primary key and all required fields.
     * @param {String} tableName the table name
     * @param {Object} row the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {Buffer} matchVersion the version to match.
     * @param {WriteOptions} writeOptions non-default arguments controlling the Durability of the operation,
     * or null to get default behaviour.
     * @param {function} callback a function that is called when the process finish.
     */
    this.putIfVersion = function (  /*String*/ tableName,
                                    /*Object*/ row,
                                    /*Buffer*/ matchVersion,
                                    /*WriteOptions*/ writeOptions,
                                    /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        var _row = new ttypes.TRow({ jsonRow: Stringify(row) });
        thriftClient.putIfVersion(tableName, _row, matchVersion, writeOptions, function (err, response) {
            if (kvLogger.logLevel >= LOG_LEVELS.DEBUG)   // avoid unnecessary call
                kvLogger.debug('Return from putIfVersion with err:' + err);
            if (err)        err = new Error(err);  // return an Error object
            if (response)   response.previousRow = Parse(response.previousRow.jsonRow);
            callback(err, response);
        });
    };

    /**
     * Deletes a row from a table.
     * @param {String} tableName the table name
     * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {WriteOptions} writeOptions non-default arguments controlling the Durability of the operation,
     * or null to get default behaviour.
     * @param {function} callback a function that is called when the process finish.
     */
    this.deleteRow = function ( /*String*/ tableName,
                                /*Object*/ primaryKey,
                                /*WriteOptions*/ writeOptions,
                                /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        var _primaryKey;
        if (primaryKey)
            _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });
        thriftClient.deleteRow(tableName, _primaryKey, writeOptions, function (err, response) {
            if (kvLogger.logLevel >= LOG_LEVELS.DEBUG)   // avoid unnecessary call
                kvLogger.debug('Return from deleteRow with err:' + err);
            if (err)        err = new Error(err);  // return an Error object
            if (response)   response.previousRow = Parse(response.previousRow.jsonRow);
            callback(err, response);
        });
    };

    /**
     * Deletes a row from a table but only if its version matches the one specified in matchVersion.
     * @param {String} tableName the table name
     * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {Buffer} matchVersion the version to match.
     * @param {WriteOptions} writeOptions non-default arguments controlling the Durability of the operation,
     * or null to get default behaviour.
     * @param {function} callback a function that is called when the process finish.
     */
    this.deleteRowIfVersion = function (    /*String*/ tableName,
                                            /*Object*/ primaryKey,
                                            /*Buffer*/ matchVersion,
                                            /*WriteOptions*/ writeOptions,
                                            /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        var _primaryKey;
        if (primaryKey)
            _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });
        thriftClient.deleteRowIfVersion(tableName, _primaryKey, matchVersion, writeOptions, function (err, response) {
            if (kvLogger.logLevel >= LOG_LEVELS.DEBUG)   // avoid unnecessary call
                kvLogger.debug('Return from deleteRowIfVersion with err:' + err);
            if (err)        err = new Error(err);  // return an Error object
            if (response)   response.previousRow = Parse(response.previousRow.jsonRow);
            callback(err, response);
        });
    };

    /**
     * Deletes multiple rows from a table in an atomic operation. The key used may be partial but must contain
     * all of the fields that are in the shard key.
     * @param {String} tableName the table name
     * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {FieldRange} fieldRange The FieldRange to be used to restrict the range of the operation.
     * @param {Array} includedTables The list of tables to be included in an operation that returns
     * multiple rows or keys.
     * @param {WriteOptions} writeOptions non-default arguments controlling the Durability of the operation,
     * or null to get default behaviour.
     * @param {function} callback a function that is called when the process finish.
     */
    this.multiDelete = function (   /*String*/ tableName,
                                    /*Object*/ primaryKey,
                                    /*FieldRange*/ fieldRange,
                                    /*Array*/ includedTables,
                                    /*WriteOptions*/ writeOptions,
                                    /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        var _primaryKey;
        if (primaryKey)
            _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });
        return thriftClient.multiDelete(tableName, _primaryKey, fieldRange, includedTables, writeOptions, function (err, response) {
            if (kvLogger.logLevel >= LOG_LEVELS.DEBUG)   // avoid unnecessary call
                kvLogger.debug('Return from multiDelete with err:' + err);
            if (err)        err = new Error(err);  // return an Error object
            callback(err, response);
        });

    };

    /**
     * Returns an iterator over the rows associated with a partial primary key.
     * @param {String} tableName the table name
     * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {FieldRange} fieldRange The FieldRange to be used to restrict the range of the operation.
     * @param {Array} includedTables The list of tables to be included in an operation that returns
     * multiple rows or keys.
     * @param {ReadOptions} readOptions non-default options for the operation or null to get default behavior.
     * @param {Direction} direction The Direction for this operation. If the primary key contains a complete shard key
     * both Direction.FORWARD and Direction.REVERSE are allowed.
     * @param {function} callback a function that is called when the process finish.
     */
    this.tableIterator = function tableIterator(    /*String*/ tableName,
                                                    /*Object*/ primaryKey,
                                                    /*FieldRange*/ fieldRange,
                                                    /*Array*/ includedTables,
                                                    /*ReadOptions*/ readOptions,
                                                    /*Direction*/ direction,
                                                    /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        primaryKey = primaryKey || {};
        var _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });

        thriftClient.tableIterator(
            tableName,
            _primaryKey,
            fieldRange,
            includedTables,
            readOptions,
            direction,
            configuration.iteratorBufferSize,
            function (err, result) {
                if (err) callback(err);
                else callback(null, new Iterator(thriftClient, result));
            });


    };

    /**
     * Returns an iterator over the keys associated with a partial primary key.
     * @param {String} tableName the table name
     * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {FieldRange} fieldRange The FieldRange to be used to restrict the range of the operation.
     * @param {Array} includedTables The list of tables to be included in an operation that returns
     * multiple rows or keys.
     * @param {ReadOptions} readOptions non-default options for the operation or null to get default behavior.
     * @param {Direction} direction The Direction for this operation. If the primary key contains a complete shard key
     * both Direction.FORWARD and Direction.REVERSE are allowed.
     * @param {function} callback a function that is called when the process finish.
     */
    this.tableKeyIterator = function ( /*String*/ tableName,
                                       /*Object*/ primaryKey,
                                       /*FieldRange*/ fieldRange,
                                       /*Array*/ includedTables,
                                       /*ReadOptions*/ readOptions,
                                       /*Direction*/ direction,
                                       /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        primaryKey = primaryKey || {};
        var _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });

        thriftClient.tableKeyIterator(
            tableName,
            _primaryKey,
            fieldRange,
            includedTables,
            readOptions,
            direction,
            configuration.iteratorBufferSize,
            function (err, result) {
                if (err) callback(err);
                else callback(null, new Iterator(thriftClient, result));
            });
    };

    /**
     * Returns an iterator over the rows associated with an index key. This method requires an additional database read
     * on the server side to get row information for matching rows. Ancestor table rows for matching index rows may be
     * returned as well if specified in the getOptions paramter. Index operations may not specify the return of child
     * table rows.
     * @param {String} tableName the table name
     * @param {String} indexName the index name
     * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {FieldRange} fieldRange The FieldRange to be used to restrict the range of the operation.
     * @param {Array} includedTables The list of tables to be included in an operation that returns
     * multiple rows or keys.
     * @param {ReadOptions} readOptions non-default options for the operation or null to get default behavior.
     * @param {Direction} direction The Direction for this operation. If the primary key contains a complete shard key
     * both Direction.FORWARD and Direction.REVERSE are allowed.
     * @param {function} callback a function that is called when the process finish.
     */
    this.indexIterator = function ( /*String*/ tableName,
                                    /*String*/ indexName,
                                    /*Object*/ primaryKey,
                                    /*FieldRange*/ fieldRange,
                                    /*Array*/ includedTables,
                                    /*ReadOptions*/ readOptions,
                                    /*Direction*/ direction,
                                    /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        primaryKey = primaryKey || {};
        var _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });

        thriftClient.indexIterator(
            tableName,
            indexName,
            _primaryKey,
            fieldRange,
            includedTables,
            readOptions,
            direction,
            configuration.iteratorBufferSize,
            function (err, result) {
                if (err) callback(err);
                else callback(null, new Iterator(thriftClient, result));
            });
    };

    /**
     * Return the keys for matching rows associated with an index key. The iterator returned only references
     * information directly available from the index. No extra fetch operations are performed. Ancestor table keys
     * for matching index keys may be returned as well if specified in the getOptions paramter. Index operations
     * may not specify the return of child table keys.
     * @param {String} tableName the table name
     * @param {String} indexName the index name
     * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
     * @param {FieldRange} fieldRange The FieldRange to be used to restrict the range of the operation.
     * @param {Array} includedTables The list of tables to be included in an operation that returns
     * multiple rows or keys.
     * @param {ReadOptions} readOptions non-default options for the operation or null to get default behavior.
     * @param {Direction} direction The Direction for this operation. If the primary key contains a complete shard key
     * both Direction.FORWARD and Direction.REVERSE are allowed.
     * @param {function} callback a function that is called when the process finish.
     */
    this.indexKeyIterator = function (  /*String*/ tableName,
                                        /*String*/ indexName,
                                        /*Object*/ primaryKey,
                                        /*FieldRange*/ fieldRange,
                                        /*Array*/ includedTables,
                                        /*ReadOptions*/ readOptions,
                                        /*Direction*/ direction,
                                        /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
            return;
        }
        primaryKey = primaryKey || {};
        var _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });

        thriftClient.indexKeyIterator(
            tableName,
            indexName,
            _primaryKey,
            fieldRange,
            includedTables,
            readOptions,
            direction,
            configuration.iteratorBufferSize,
            function (err, result) {
                if (err) callback(err);
                else callback(null, new Iterator(thriftClient, result));
            });
    };


    /**
     * Refreshes cached information about the tables. This method is
     * required before using any tables that had been modified.
     */
    this.refreshTables = function(callback) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
        }
        thriftClient.refreshTables(function(err) {
            callback(err);
        });
    };

    /**
     * Executes a table statement. Currently, table statements
     * can be used to create or modify tables and indices.
     * @param {String} statement the statement to be performed.
     * @param {function} callback a function that is called when the process finish.
     */
    this.execute = function (   /*String*/ statement,
                                /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
        }
        thriftClient.executeSync(statement, function(err, result){
            callback(err, result);
        })

    };

    /**
     * Executes a table statement in the server asynchronously.
     * Currently, table statements can be used to create or modify tables and indices.
     * The operation is asynchronous in server side and may not be finished when
     * the method call the callback function.
     *
     * A execution identifier is returned and can be used to get
     * information about the status of the operation.
     *
     * If the statement is for an administrative command, and the store is
     * currently executing an administrative command that is the logical
     * equivalent the action specified by the statement, the method will
     * return a execution identifier that serves as a handle to that
     * operation, rather than starting a new invocation of the command.
     *
     * @param {String} statement the statement to be performed.
     * @param {function} callback a function that is called when the process finish.
     */
    this.executeAsync = function ( /*String*/ statement,
                                   /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
        }
        thriftClient.execute(statement, function(err, result){
            callback(err, result);
        })
    };


    /**
     * Attempts to cancel execution of a statement, execution identifier
     * of the statement must be provided. Return false if the
     * statement couldn't be cancelled, possibly because it has already
     * finished. If the statement hasn't succeeded already, and can be stopped,
     * the operation will transition to the FAILED state.
     * @param {Number} statement the statement to be performed.
     * @param {function} callback a function that is called when the process finish.
     */
    this.executeCancel = function ( /*Number*/ planId,
                                    /*Bool*/ interruptIfRunning,
                                    /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
        }
        thriftClient.executionFutureCancel(planId, interruptIfRunning, function(err, result){
            callback(err, result);
        });
    };

    /**
     * Callback until the command represented by a execution identifier completes.
     * Returns information about the execution of the statement.
     * This call will result in communication with the kvstore server.
     */
    this.executeCallback = function ( /*Number*/ planId,
                                      /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
        }
        thriftClient.executionFutureGet(planId, function(err, result){
            callback(err, result);
        });
    };


    /**
     * Callback until the command represented by a execution identifier completes
     * or the timeout period is exceeded.
     * This call will result in communication with the kvstore server.
     */
    this.executeTimeout = function ( /*Number*/ planId,
                                     /*Number*/ timeout,
                                     /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
        }
        thriftClient.executionFutureGetTimeout(planId, timeout, function(err, result){
            callback(err, result);
        });
    };

    /**
     * Returns information about the execution of the statement. If the
     * statement is still executing, this call will result in communication
     * with the kvstore server to obtain up to date status, and the status
     * returned will reflect interim information.
     */
    this.executeStatus = function ( /*Number*/ planId,
                                    /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
        }
        thriftClient.executionFutureUpdateStatus(planId, function(err, result){
            callback(err, result);
        });
    };



    /**
     * This method provides an efficient and transactional mechanism for
     * executing a sequence of operations associated with tables that share
     * the same shard key portion of their primary keys.
     */
    this.executeOperations = function ( /*Array*/ operations,
                                        /*WriteOptions*/ writeOptions,
                                        /*function*/ callback ) {
        callback = callback || function(){};
        if (!isConnected) {
            callback(new Error(Errors.NOT_CONNECTED));
        }
        if (typeof operations !== 'array') {
            operations = [operations];
        }
        thriftClient.executeUpdates(operations, writeOptions, function(err, result){
            if (result)
                for(var row in result) {
                    if (result[row].previousRow.jsonRow)
                        result[row].previousRow = Parse(result[row].previousRow.jsonRow);
                }
            callback(err, result);
        });
    };

/**
 * Returns a Readable Stream over the rows associated with a partial primary key.
 * @param {String} tableName the table name
 * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
 * @param {FieldRange} fieldRange The FieldRange to be used to restrict the range of the operation.
 * @param {Array} includedTables The list of tables to be included in an operation that returns
 * multiple rows or keys.
 * @param {ReadOptions} readOptions non-default options for the operation or null to get default behavior.
 * @param {Direction} direction The Direction for this operation. If the primary key contains a complete shard key
 * both Direction.FORWARD and Direction.REVERSE are allowed.
 * @param {function} callback a function that is called when the process finish.
 */
this.tableStream = function tableStream( /*String*/ tableName,
                                         /*Object*/ primaryKey,
                                         /*FieldRange*/ fieldRange,
                                         /*Array*/ includedTables,
                                         /*ReadOptions*/ readOptions,
                                         /*Direction*/ direction,
                                         /*function*/ callback ) {
    callback = callback || function(){};
    if (!isConnected) {
        callback(new Error(Errors.NOT_CONNECTED));
        return;
    }
    primaryKey = primaryKey || {};
    var _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });

    thriftClient.tableIterator(
        tableName,
        _primaryKey,
        fieldRange,
        includedTables,
        readOptions,
        direction,
        configuration.iteratorBufferSize,
        function (err, result) {
            if (err) callback(err);
            else callback(null, new Readable(thriftClient, result));
        });


};

/**
 * Returns a Readable Stream over the keys associated with a partial primary key.
 * @param {String} tableName the table name
 * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
 * @param {FieldRange} fieldRange The FieldRange to be used to restrict the range of the operation.
 * @param {Array} includedTables The list of tables to be included in an operation that returns
 * multiple rows or keys.
 * @param {ReadOptions} readOptions non-default options for the operation or null to get default behavior.
 * @param {Direction} direction The Direction for this operation. If the primary key contains a complete shard key
 * both Direction.FORWARD and Direction.REVERSE are allowed.
 * @param {function} callback a function that is called when the process finish.
 */
this.tableKeyStream = function ( /*String*/ tableName,
                                 /*Object*/ primaryKey,
                                 /*FieldRange*/ fieldRange,
                                 /*Array*/ includedTables,
                                 /*ReadOptions*/ readOptions,
                                 /*Direction*/ direction,
                                 /*function*/ callback ) {
    callback = callback || function(){};
    if (!isConnected) {
        callback(new Error(Errors.NOT_CONNECTED));
        return;
    }
    primaryKey = primaryKey || {};
    var _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });

    thriftClient.tableKeyIterator(
        tableName,
        _primaryKey,
        fieldRange,
        includedTables,
        readOptions,
        direction,
        configuration.iteratorBufferSize,
        function (err, result) {
            if (err) callback(err);
            else callback(null, new Readable(thriftClient, result));
        });
};

/**
 * Returns a Readable Stream over the rows associated with an index key. This method requires an additional database read
 * on the server side to get row information for matching rows. Ancestor table rows for matching index rows may be
 * returned as well if specified in the getOptions paramter. Index operations may not specify the return of child
 * table rows.
 * @param {String} tableName the table name
 * @param {String} indexName the index name
 * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
 * @param {FieldRange} fieldRange The FieldRange to be used to restrict the range of the operation.
 * @param {Array} includedTables The list of tables to be included in an operation that returns
 * multiple rows or keys.
 * @param {ReadOptions} readOptions non-default options for the operation or null to get default behavior.
 * @param {Direction} direction The Direction for this operation. If the primary key contains a complete shard key
 * both Direction.FORWARD and Direction.REVERSE are allowed.
 * @param {function} callback a function that is called when the process finish.
 */
this.indexStream = function ( /*String*/ tableName,
                              /*String*/ indexName,
                              /*Object*/ primaryKey,
                              /*FieldRange*/ fieldRange,
                              /*Array*/ includedTables,
                              /*ReadOptions*/ readOptions,
                              /*Direction*/ direction,
                              /*function*/ callback ) {
    callback = callback || function(){};
    if (!isConnected) {
        callback(new Error(Errors.NOT_CONNECTED));
        return;
    }
    primaryKey = primaryKey || {};
    var _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });

    thriftClient.indexIterator(
        tableName,
        indexName,
        _primaryKey,
        fieldRange,
        includedTables,
        readOptions,
        direction,
        configuration.iteratorBufferSize,
        function (err, result) {
            if (err) callback(err);
            else callback(null, new Readable(thriftClient, result));
        });
};

/**
 * Return a Readable Stream with the keys for matching rows associated with an index key. The Stream returned only references
 * information directly available from the index. No extra fetch operations are performed. Ancestor table keys
 * for matching index keys may be returned as well if specified in the getOptions paramter. Index operations
 * may not specify the return of child table keys.
 * @param {String} tableName the table name
 * @param {String} indexName the index name
 * @param {Object} primaryKey the primary key for a table. It must be a complete primary key, with all fields set.
 * @param {FieldRange} fieldRange The FieldRange to be used to restrict the range of the operation.
 * @param {Array} includedTables The list of tables to be included in an operation that returns
 * multiple rows or keys.
 * @param {ReadOptions} readOptions non-default options for the operation or null to get default behavior.
 * @param {Direction} direction The Direction for this operation. If the primary key contains a complete shard key
 * both Direction.FORWARD and Direction.REVERSE are allowed.
 * @param {function} callback a function that is called when the process finish.
 */
this.indexKeyStream = function ( /*String*/ tableName,
                                 /*String*/ indexName,
                                 /*Object*/ primaryKey,
                                 /*FieldRange*/ fieldRange,
                                 /*Array*/ includedTables,
                                 /*ReadOptions*/ readOptions,
                                 /*Direction*/ direction,
                                 /*function*/ callback ) {
    callback = callback || function(){};
    if (!isConnected) {
        callback(new Error(Errors.NOT_CONNECTED));
        return;
    }
    primaryKey = primaryKey || {};
    var _primaryKey = new ttypes.TRow({ jsonRow: Stringify(primaryKey) });

    thriftClient.indexKeyIterator(
        tableName,
        indexName,
        _primaryKey,
        fieldRange,
        includedTables,
        readOptions,
        direction,
        configuration.iteratorBufferSize,
        function (err, result) {
            if (err) callback(err);
            else callback(null, new Readable(thriftClient, result));
        });
};


}  //EOF