node-redis-queue

Overview

This is a very simple queing wrapper for Redis that is intended for communication between separate processes. It comes with two APIs:

  1. Channel -- the push/pop interface

    The process creates an instance of Channel. The sending process uses the Channel instance to push data onto the queue via the push function. The receiving process uses the Channel instance to remove data from the same queue via the pop function, which delivers the data to a callback function which accepts a single data parameter. Some variants of the pop function may have a timeout parameter so they don't block indefinitely.

  2. WorkQueueMgr -- the send/consume interface

    The process creates an instance of WorkQueueMgr. Then, it uses that instance to create one or more instances of WorkQueue, each representing a different queue having a unique name. The sending process uses a WorkQueue instance to send data to the corresponding queue via the send function. The receiving process uses a WorkQueue instance to remove data from the corresponding queue via the consume function, which delivers the data to a callback function which accepts a data parameter and an ack parameter. The latter is a function that is called to indicate that the callback function is complete and is ready to accept some additional data, if any, in the queue. See the usage examples below and also the worker03 and worker04 files in the demo src or lib directories for examples of how to do this. To achieve greater throughput with consume, one may specify the number of async callback functions to operate in parallel. The later feature is referred to as 'arity'. One also may specify a timeout parameter so that consume doesn't block indefinitely.

    consume is different from pop in that a single call to consume can fetch multiple data items from the given queue, while pop must be called repeatedly to fetch items from the queue.

Details of the Channel Interface

The public methods of the Channel class are:

constructor configFilePath -- Takes an optional config file path, which may be overridden by the QUEUE_CONFIG_PATH environment variable.

connect onReadyCB -- Obtains a Redis client connection using the config file information. Calls the given callback when the connection is ready.

attach client -- This is an alternative to calling connect. It attaches to a given Redis client connection.

push queueName, data -- Pushes the given data into the given queue.

pop key, onDataCB -- Accepts a single key (or queue name), blocking indefinitely until data becomes available. Calls the given callback when the data is available. The callback is called with a single data parameter.

popTimeout key, timeout, onDataCB -- Accepts a single key (or queue name) and waits for input on that key. It blocks but times out and emits a timeout event when the specified timeout interval is exceeded without any data becoming available. Calls the given callback when data is available. The callback is called with a single data parameter.

popAny keys..., onDataCB -- Accepts one or more keys (or queue names) and waits for input on any of those, blocking indefinitely until data becomes available. Calls the given callback when data is available. The callback is called with a keyName parameter and a data parameter.

popAnyTimeout keys..., timeout, onDataCB -- Accepts one or more keys (or queue names) and waits for input on any of those. It blocks but times out and emits a timeout event when the specified timeout interval is exceeded without any data becoming available. Calls the given callback when data is available. The callback receives a keyName parameter and a dat a parameter. The callback is called with a keyName parameter and a data parameter.

clear keys..., onClearCB -- Removes the data from the queues specified by the given keys (or queue names). Calls the given callback when the operation is complete.

disconnect -- Quits accepting data and closes the connection.

end -- Closes the connection

commandQueueLength -- The size of the Redis client's command queue (i.e., commands queued to be sent to Redis).

Details of the WorkQueueMgr Interface

The public methods of the WorkQueue class are:

send data -- Pushes data into the associated work queue.

consume onDataCB, arity, timeout -- Consumes data that becomes available in the associated work queue. The arity and timeout parameters are optional and default to 1 and 0 respectively.

clear onClearCB -- Removes the data from the associated queue. Calls the given callback when the operation is complete.

destroy -- Destroys the meta-data about this WorkQueue instance in the associated WorkQueueMgr class.

The public methods of the WorkQueueMgr class are:

constructor configFilePath -- Takes an optional config file path, which may be overridden by the QUEUE_CONFIG_PATH environment variable. Creates an internal Channel instance.

connect onReadyCB -- Obtains a Redis client connection using the config file information. Calls the given callback when the connection is ready.

attach client -- This is an alternative to calling connect. It attaches to a given Redis client connection.

createQueue queueName -- Creates a WorkQueue instance for the given queue name.

send queueName, data -- Pushes the given data into the queue corresponding to the given queue name. This is an alternative to WorkQueue.send and requires no WorkQueue instance.

consume queueName, onDataCB, arity, timeout -- Consumes data that becomes available in the given queue, calling the onDataCB function each time data becomes available. Arity is the number of times the given async callback function is to be executed in parallel. Defaults to 1. Timeout is the number of seconds to wait for data to become available, after which a 'timeout' event is emitted. Defaults to zero, in which case it blocks indefinitely.

clear keys..., onClearCB -- Removes the data from the queues specified by the given keys (or queue names). Calls the given callback when the operation is complete.

disconnect -- Quits accepting data and closes the connection.

end -- Closes the connection

commandQueueLength -- Returns the size of the Redis client's command queue (i.e., commands queued to be sent to Redis).

Events Emitted by Both Interfaces

'error' -- emitted when an error is reported by Redis

'end' -- emitted when a connection to the Redis server has been lost

'timeout' -- emitted when a timeout occurs on a popTimeout operation, popAnyTimeout operation, or on a consume operation with a timeout specified. The provided callback provided to on 'timeout' receives two parameters:

  1. keys -- one or more key values or queue names on which the operation was waiting when the timeout occurred.

  2. cancel -- a function that may be called to prevent another outstanding blocking operation from being performed. This is useful, for example, to get a clean exit from a test case.

'drain' -- emitted when the TCP connection to the Redis server has been buffering, but is now writable. This event can be used to stream commands in to Redis and adapt to backpressure: Call commandQueueLength to detect when the length is too much, then use the 'drain' event to resume sending data to the queue or queues.

Installation

npm install node-redis-queue --save

Configuration

Sample configuration files may be found in the sample-configs directory. In each config file, the redis_provider type setting specifies the strategy to use. The verbose setting, if true, specifies to display the config file settings on startup.

The environment variable QUEUE_CONFIG_FILE specifies which config file is to be used. If not set, it defaults to node-redis-queue/redis-queue-config.json, which specifies to use the local Redis server with no password. If you do nothing, that is what you get.

Currently implemented strategies are:

redisQueueConfig determines which strategy is used to configure the client. It is easy to add your own strategy.

Usage

Coffescript Usage Examples

See the Coffeescript usage examples here.

Javascript Usage Examples

See the Javascript usage examples here.

Running the demos

Instructions for running the demo code may be found here.

Developer Info

For developers who wish to make changes to the code, information on running the test suite and how to use grunt may be found here;

Change Log

View the change log here.

Architecture Notes

The Channel class is a very thin wrapper around existing redis module functions. It delegates all its operations to that module. The Channel class uses different strategies to connect to redis. A config file specifies which strategy to use and also supplies options to redis.

The WorkQueueMgr class serves as a factory for WorkQueue class instances.

The WorkQueueMgr class delegates queue send and consume to the Channel class. It maintains a hash of queues created (@queue), which defines which queues currently are valid. For each queue that is consuming data, it maintains an entry in a hash of callbacks (@consumingCB) and and an entry in an ordered list of names of queues currently being consumed (@consumingNames). The ordered list of names represents the priority for consumption of queue data and is used by the consume function as the list of keys to be monitored by qmgr.popAny.

The WorkQueue class is a very simple envelope wrapping four applicative functions, effectively delegating all operations to WorkQueueMgr class.

Historical Note

Part of this work is derived from node-simple-redis-queue v0.9.3 by James Smith and retains the same license. However, the current version bears almost no resemblance to James' project.