This is a very simple queing wrapper for Redis that is intended for communication between separate processes. It comes with two APIs:
Channel -- the push/pop interface
The process creates an instance of Channel.
The sending process uses the Channel instance to push data onto the queue via the push
function.
The receiving process uses the Channel instance to remove data from the same queue via the pop
function,
which delivers the data to a callback function which accepts a single data parameter. Some variants of the
pop
function may have a timeout parameter so they don't block indefinitely.
WorkQueueMgr -- the send/consume interface
The process creates an instance of WorkQueueMgr.
Then, it uses that instance to create one or more instances of WorkQueue, each representing a different queue having a unique name.
The sending process uses a WorkQueue instance to send data to the corresponding queue via the send
function. The receiving process uses
a WorkQueue instance to remove data from the corresponding queue via the consume
function, which delivers the data to a callback function
which accepts a data parameter and an ack parameter. The latter is a function that is called to indicate that the callback function
is complete and is ready to accept some additional data, if any, in the queue. See the usage examples below and also
the worker03 and worker04 files in the demo src or lib directories for examples of how to do this. To achieve greater throughput with
consume
, one may specify the number of async callback functions to operate in parallel. The later feature is referred to as 'arity'.
One also may specify a timeout parameter so that consume
doesn't block indefinitely.
consume
is different from pop
in that a single call to consume
can fetch multiple data items from the given queue, while pop
must be called repeatedly to fetch items from the queue.
Channel
class are:constructor configFilePath
-- Takes an optional config file path, which may be overridden by the QUEUE_CONFIG_PATH
environment variable.
connect onReadyCB
-- Obtains a Redis client connection using the config file information. Calls the given callback
when the connection is ready.
attach client
-- This is an alternative to calling connect
. It attaches to a given Redis client connection.
push queueName, data
-- Pushes the given data into the given queue.
pop key, onDataCB
-- Accepts a single key (or queue name), blocking indefinitely until data becomes available. Calls
the given callback when the data is available. The callback is called with a single data parameter.
popTimeout key, timeout, onDataCB
-- Accepts a single key (or queue name) and waits for input on that key. It blocks
but times out and emits a timeout
event when the specified timeout interval is exceeded without any data becoming available.
Calls the given callback when data is available. The callback is called with a single data parameter.
popAny keys..., onDataCB
-- Accepts one or more keys (or queue names) and waits for input on any of those, blocking
indefinitely until data becomes available. Calls the given callback when data is available. The callback is called with a
keyName parameter and a data parameter.
popAnyTimeout keys..., timeout, onDataCB
-- Accepts one or more keys (or queue names) and waits for input on any of
those. It blocks but times out and emits a timeout
event when the specified timeout interval is exceeded without any data
becoming available. Calls the given callback when data is available. The callback receives a keyName parameter and a dat
a parameter. The callback is called with a keyName parameter and a data parameter.
clear keys..., onClearCB
-- Removes the data from the queues specified by the given keys (or queue names). Calls the
given callback when the operation is complete.
disconnect
-- Quits accepting data and closes the connection.
end
-- Closes the connection
commandQueueLength
-- The size of the Redis client's command queue (i.e., commands queued to be sent to Redis).
WorkQueue
class are:send data
-- Pushes data into the associated work queue.
consume onDataCB, arity, timeout
-- Consumes data that becomes available in the associated work queue. The arity
and timeout parameters are optional and default to 1 and 0 respectively.
clear onClearCB
-- Removes the data from the associated queue. Calls the given callback when the operation is complete.
destroy
-- Destroys the meta-data about this WorkQueue instance in the associated WorkQueueMgr class.
WorkQueueMgr
class are:constructor configFilePath
-- Takes an optional config file path, which may be overridden by the QUEUE_CONFIG_PATH
environment variable. Creates an internal Channel instance.
connect onReadyCB
-- Obtains a Redis client connection using the config file information. Calls the given callback
when the connection is ready.
attach client
-- This is an alternative to calling connect
. It attaches to a given Redis client connection.
createQueue queueName
-- Creates a WorkQueue instance for the given queue name.
send queueName, data
-- Pushes the given data into the queue corresponding to the given queue name. This is an
alternative to WorkQueue.send and requires no WorkQueue instance.
consume queueName, onDataCB, arity, timeout
-- Consumes data that becomes available in the given queue, calling the
onDataCB function each time data becomes available. Arity is the number of times the given async callback function is to
be executed in parallel. Defaults to 1. Timeout is the number of seconds to wait for data to become available, after which
a 'timeout'
event is emitted. Defaults to zero, in which case it blocks indefinitely.
clear keys..., onClearCB
-- Removes the data from the queues specified by the given keys (or queue names). Calls the
given callback when the operation is complete.
disconnect
-- Quits accepting data and closes the connection.
end
-- Closes the connection
commandQueueLength
-- Returns the size of the Redis client's command queue (i.e., commands queued to be sent to Redis).
'error'
-- emitted when an error is reported by Redis
'end'
-- emitted when a connection to the Redis server has been lost
'timeout'
-- emitted when a timeout occurs on a popTimeout operation, popAnyTimeout operation,
or on a consume operation with a timeout specified. The provided callback provided to
on 'timeout'
receives two parameters:
keys
-- one or more key values or queue names on which
the operation was waiting when the timeout occurred.
cancel
-- a function that may be called to prevent another
outstanding blocking operation from being performed. This is useful, for example, to get a clean exit from a test case.
'drain'
-- emitted when the TCP connection to the Redis server has been buffering, but is now writable.
This event can be used to stream commands in to Redis and adapt to backpressure: Call commandQueueLength
to detect when the length is too much, then use the 'drain'
event to resume sending data to the queue or queues.
npm install node-redis-queue --save
Sample configuration files may be found in the sample-configs directory. In each config file, the redis_provider type setting specifies the strategy to use. The verbose setting, if true, specifies to display the config file settings on startup.
The environment variable QUEUE_CONFIG_FILE specifies which config file is to be used. If not set, it defaults to node-redis-queue/redis-queue-config.json, which specifies to use the local Redis server with no password. If you do nothing, that is what you get.
Currently implemented strategies are:
connStrategyDefaultLocal -- local Redis server, no password
connStrategyCustom -- configurable host, port, and password; defaults to local Redis server, no password
connStrategyHerokuRedisCloud -- host, port, and password specified by REDISCLOUD_URL environment variable; if not set, then defaults to local Redis server, no password
connStrategyBlueMixRedisCloud -- host, port, and password specified by VCAP_SERVICES environment variable; if not set, then defaults to local Redis server, no password
redisQueueConfig determines which strategy is used to configure the client. It is easy to add your own strategy.
See the Coffeescript usage examples here.
See the Javascript usage examples here.
Instructions for running the demo code may be found here.
For developers who wish to make changes to the code, information on running the test
suite and how to use grunt
may be found here;
View the change log here.
The Channel class is a very thin wrapper around existing redis module functions. It delegates all its operations to that module. The Channel class uses different strategies to connect to redis. A config file specifies which strategy to use and also supplies options to redis.
The WorkQueueMgr class serves as a factory for WorkQueue class instances.
The WorkQueueMgr class delegates queue send and consume to the Channel class. It maintains a hash of queues created (@queue), which defines which queues currently are valid. For each queue that is consuming data, it maintains an entry in a hash of callbacks (@consumingCB) and and an entry in an ordered list of names of queues currently being consumed (@consumingNames). The ordered list of names represents the priority for consumption of queue data and is used by the consume function as the list of keys to be monitored by qmgr.popAny.
The WorkQueue class is a very simple envelope wrapping four applicative functions, effectively delegating all operations to WorkQueueMgr class.
Part of this work is derived from node-simple-redis-queue v0.9.3 by James Smith and retains the same license. However, the current version bears almost no resemblance to James' project.