Chapter 6. Data Access

Like any web server, Node needs access to data stores for persistent storage; without persistence, all you have is a brochure website, which would make using Node pointless. In this chapter, we’ll run through the basic ways to connect to common open source database choices and to store and retrieve data.

The following NoSQL and document stores are increasingly popular for web-facing applications and are easy to use with Node.

CouchDB provides MVCC-based[15] document storage in a JavaScript environment. When documents (records) are added or updated in CouchDB, the entire dataset is saved to storage and older versions of that data marked obsolete. Older versions of the record can still be merged into the newest version, but in every case a whole new version is created and written to contiguous memory for faster read times. CouchDB is said to be “eventually consistent.” In a large, scalable deployment, multiple instances can sometimes serve older, unsynced versions of records to clients with the expectation that any changes to those records will eventually be merged into the master.

One of the nice things about CouchDB is that its API is actually all just HTTP. Because Node is great at interacting with HTTP, this means it is really easy to work with CouchDB. Exploiting this fact, it is possible to perform database operations directly without any additional client libraries.

Example 6-1 shows how to generate a list of databases in the current CouchDB installation. In this case, there is no authentication or administrative permission on the CouchDB server—a decidedly bad idea for a database connected to the Internet, but suitable for demonstration purposes.

A client connection is created with the http library. Nothing distinguishes this connection from any other http connection; because CouchDB is RESTful, no additional communication protocol is needed. Of special note is the request.end() line inside the createServer method. If this line is omitted, the request will hang.

As mentioned earlier, all CouchDB methods are exposed in HTTP calls. Creating and deleting databases, therefore, involves making the appropriate PUT and DELETE statements against the server, as demonstrated in Example 6-2.

Here, /dbname refers to the resource being accessed. Combined with a PUT command, CouchDB is instructed to create a new database called dbname. An HTTP response code of 201 confirms that the database was created.

As shown in Example 6-3, deleting the resource is the reverse of a PUT: the DELETE command. An HTTP response code of 200 confirms the request was completed successfully.

These elements aren’t very useful on their own, but they can be put together to form a very basic (if unfriendly) database manager using the methods shown in Example 6-4.

Example 6-4. A simple CouchDB database creation form

var http = require('http');
var qs = require('querystring');
var url = require('url');

var dbHost = "127.0.0.1";
var dbPort = 5984;

deleteDb = function(res, dbpath) {
  var client = http.createClient(dbPort, dbHost)
  var request = client.request("DELETE", dbpath);
  request.end();

  request.on("response", function(response) {
    response.on("end", function() {
      if ( response.statusCode == 200 ) {
        showDbs(res, "Deleted database.");
      } else {
        showDbs(res, "Could not delete database.");
      }
    });
  });
}

createDb = function(res, dbname) {
  var client = http.createClient(dbPort, dbHost)
  var request = client.request("PUT", "/" + dbname);
  request.end();

  request.on("response", function(response) {
    response.on("end", function() {
      if ( response.statusCode == 201 ) {
        showDbs(res, dbname + " created.");
      } else {
        showDbs(res, "Could not create " + dbname);
      }
    });
  });
}

showDbs = function(res, message) {
  var client = http.createClient(dbPort, dbHost);
  var request = client.request("GET", "/_all_dbs");
  request.end();

  request.on("response", function(response) {
    var responseBody = "";

    response.on("data", function(chunk) {
      responseBody += chunk;
    });

    response.on("end", function() {
      res.writeHead(200, {'Content-Type': 'text/html'});
      res.write("<form method='post'>");
      res.write("New Database Name: <input type='text' name='dbname' />");
      res.write("<input type='submit' />");
      res.write("</form>");
      if ( null != message ) res.write("<h1>" + message + "</h1>");

      res.write("<h1>Active databases:</h1>");
      res.write("<ul>");
      var dblist = JSON.parse(responseBody);
      for ( i = 0; i < dblist.length; i++ ) {
        var dbname = dblist[i];
        res.write("<li><a href='/" + dbname + "'>"+dbname+"</a></li>");
      }
      res.write("</ul>");
      res.end();
    });
  });
};

http.createServer(function (req, res) {
  if ( req.method == 'POST' ) {
    // Parse the request
    var body = '';
    req.on('data', function (data) {
      body += data;
    });
    req.on('end', function () {
      var POST = qs.parse(body);
      var dbname = POST['dbname'];
      if ( null != dbname ) {
        // Create the DB
        createDb(res,dbname);
      } else {
        showDbs(res, "Bad DB name, cannot create database.");
      }
    });
  } else {
    var path = url.parse(req.url).pathname;
    if ( path != "/" ) {
      deleteDb(res,path);
    } else {
      showDbs(res);
    }
  }
}).listen(8080);

Knowing how to work with CouchDB over HTTP is useful, but this approach is verbose. Although it has the advantage of not needing external libraries, most developers opt for higher-level abstraction layers, regardless of how simple their database’s native driver implementation is. In this section, we look at the node-couchdb package, which simplifies the interface between Node and CouchDB.

You can install the drivers for CouchDB using npm:

npm install felix-couchdb

Redis is a memory-centric key-value store with persistence that will feel very familiar if you have experience with key-value caches such as Memcache. Redis is used when performance and scaling are important; in many cases, developers choose to use it as a cache for data retrieved from a relational database such as MySQL, although it is capable of much more.

Beyond its key-value storage capabilities, Redis provides network-accessible shared memory, is a nonblocking event bus, and exposes subscription and publishing capabilities.

As with many of the rest of the database engines, using Redis requires installing the database application as well as the Node drivers to communicate with it.

Redis is available in source form. There isn’t anything to do in the way of configuration; just download and compile per the instructions on the website.

If you are using Windows, you are on your own at the time of this writing because Redis is not supported on Windows. Fortunately, there is a passionate community behind Redis development, and several ports have been made available for both Cygwin and native compilation. The port at https://github.com/dmajkic/redis compiles to a native Windows binary using MinGW.

Hashes are objects that contain multiple keys. Example 6-11 sets a single key at a time.

Example 6-12 shows how to set multiple keys at the same time.

We could accomplish the same thing by providing a more developer-friendly object, rather than breaking it out into a list, as shown in Example 6-13.

Instead of manually supplying each field to Redis, you can pass an entire object into hmset, which will parse the fields and send the correct information to Redis.

Like regular sets, sorted sets do not allow duplicate members. Sorted sets add the concept of weighting, enabling score-based operations on data such as leaderboards, top scores, and content tables.

The producers of the American weight-loss reality show The Biggest Loser are real-world fans of sorted sets. In the 11th season of the series, the contestants were split into three groups based upon their age. On air, they had to perform a crude sorting operation by checking a number printed on everyone’s shirts and then line up in ascending order under the hot sun. If one of the contestants had brought her Node- and Redis-equipped laptop to the competition, she might have made a small program to do the work for them, such as the one in Example 6-16.

The output is:

Young team: Courtney,Jessica,Patrick,Ramon,Vinny
Middle team: Jennifer,John,Sunny,Joe,Antone
Elder team: Becky,Deborah,Mike,Bonnie

Adding members to a sorted set follows a pattern similar to the one for adding members to a normal set, with the addition of a rank. This allows for interesting slicing and dicing, as in this example. Knowing that each team consists of similarly aged individuals, getting three teams from a sorted list is a matter of pulling three equal groups straight out of the set. The number of contestants (14) is not perfectly divisible by 3, so the final group has only 4 members.

Redis supports the publish-subscribe (or pub-sub) messaging pattern, allowing senders (publishers) to issue messages into channels for use by receivers (subscribers) whom they know nothing about (see Example 6-17). Subscribers register their areas of interests (channels), and Redis pushes all relevant messages to them. Publishers do not need to be registered to specific channels, nor do subscribers need to be listening when messages are sent. Redis takes care of the brokering, which allows for a great deal of flexibility, as neither the publisher nor the subscriber needs to be aware of the other.

The output is:

quiet channel: Welcome to quiet channel
quiet channel: You subscribed to 1 channels!
peaceful channel: Welcome to peaceful channel
peaceful channel: You subscribed to 2 channels!
noisy channel: Welcome to noisy channel
noisy channel: You subscribed to 3 channels!

This example tells the story of two clients. One is quiet and thoughtful, while the other broadcasts inane details about its surroundings to anyone who will listen. The pensive client subscribes to three channels: quiet, peaceful, and noisy. The talkative client responds to each subscription by welcoming the newcomer to the channel and counting the number of active subscriptions.

About one second after subscribing, the pensive client unsubscribes from all three channels. When the unsubscribe command detects no more active subscriptions, both clients end their connection to Redis, and the program execution stops.

Redis supports password authentication. To add a password, edit Redis’s configuration file and include a line for requirepass, as shown in Example 6-18.

Once Redis is restarted, it will perform commands only for clients who authenticate using “hidengoseke” as their password (Example 6-19).

The auth command must occur before any other queries are issued. The client will store the password and use it on reconnection attempts.

Notice the lack of usernames and multiple passwords. Redis does not include user management functionality, because of the overhead it would incur. Instead, system administrators are expected to secure their servers using other means, such as port-blocking Redis from the outside world so that only internal, trusted users may access it.

Some “dangerous” commands can be renamed or removed entirely. For example, you may never need to use the CONFIG command. In that case, you can update the configuration file to either change its name to something obscure, or you can fully disable it to protect against unwanted access; both options are shown in Example 6-20.

Because Mongo supplies a JavaScript environment with BSON object storage (a binary adaption of JSON), reading and writing data from Node is extremely efficient. Mongo stores incoming records in memory, so it is ideal in high-write situations. Each new version adds improved clustering, replication, and sharding.

Because incoming records are stored in memory, inserting data into Mongo is nonblocking, making it ideal for logging operations and telemetry data. Mongo supports JavaScript functions inside queries, making it very powerful in read situations, including MapReduce queries.

Using MongoDB’s document-based storage allows you to store child records inside parent records. For example, a blog article and all of its associated comments can be stored inside a single record, allowing for incredibly fast retrieval.

The native MongoDB driver by Christian Kvaleim provides nonblocking access to MongoDB. Previous versions of the module included a C/C++ BSON parser/serializer, which has been deprecated due to improvements in the JavaScript parser/serializer.

The native MongoDB driver is a good choice when you need precise control over your MongoDB connection.

Node has a tremendous base of support for Mongo through its Mongoose library. Compared to the native drivers, Mongoose is an expressive environment that makes models and schemas more intuitive.

When you use MongoDB, you don’t need to define a data schema as you would with a relational database. Whenever requirements change or you need to store a new piece of information, you just save a new record containing the information you need, and you can query against it immediately. You can transform old data to include default or empty values for the new field, but MongoDB does not require that step.

Even though schemas aren’t important to MongoDB, they are useful because they help humans understand the contents of the database and implicit rules for working with domain data. Mongoose is useful because it works using human-readable schemas, providing a clean interface to communicate with the database.

What is a schema? Many programmers tend to think in terms of models that define data structures, but don’t think much about the underlying databases those models represent. A table inside an SQL database needs to be created before you can write data to it, and the fields inside that table probably closely match the fields in your model. The schema—that is, the definition of the model inside the database—is created separately from your program; therefore, the schema predates your data.

MongoDB—as well as the other NoSQL datastores—is often said to be schemaless because it doesn’t require explicitly defined structure for stored data. In reality, MongoDB does have a schema, but it is defined by the data as it gets stored. You may add a new property to your model months after you begin work on your application, but you don’t have to redefine the schema of previously entered information in order to search against the new field.

Example 6-22 illustrates how to define a sample schema for an article database and what information should be stored in each type of model. Once again, Mongo does not enforce schemas, but programmers need to define consistent access patterns in their own programs.

There are still many good reasons to use a traditional database with SQL, and Node interfaces with popular open source choices.

MySQL has become the workhorse of the open source world for good reason: it provides many of the same capabilities as larger commercial databases for free. In its current form, MySQL is performant and feature-rich.

The node-db module provides a native code interface to popular database systems, including MySQL, using a common API that the module exposes to Node. Although node-db supports more than just MySQL, this section focuses on using MySQL in your application code. Since Oracle’s purchase of Sun Microsystems, the future of MySQL and its community has come under much speculation. Some groups advocate moving to a drop-in replacement such as MariaDB or switching to a different relational database management system (RDBMS) entirely. Although MySQL isn’t going away anytime soon, you need to decide for yourself whether it will be the right choice of software for your work.

Sequelize is an object relational mapper (ORM) that takes much of the repetition out of the tasks performed in the preceding sections. You can use Sequelize to define objects shared between the database and your program, then pass data to and from the database using those objects rather than writing a query for every operation. This becomes a major time-saver when you need to perform maintenance or add a new column, and makes overall data management less error-prone. Sequelize supports installation using npm:

npm install sequelize

As the database and example user were already created for the examples in the previous section, it’s time to create an Author entity inside the database (Example 6-28). Sequelize handles the creation for you, so you don’t have to take care of any manual SQL at this point.

The output is:

Executing: CREATE TABLE IF NOT EXISTS `Authors` (`name` VARCHAR(255), `biography`
TEXT, `id` INT NOT NULL auto_increment , `createdAt` DATETIME NOT NULL, `updatedAt`
DATETIME NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB;
Author table was created.

In this example, an Author was defined as an entity containing a name field and a biography field. As you can see in the output, Sequelize added an autoincremented primary key column, a createdAt column, and an updatedAt column. This is typical of many ORM solutions, and provides standard hooks by which Sequelize is able to reference and interact with your data.

Sequelize differs from the other libraries shown in this chapter in that it is based on a listener-driven architecture, rather than the callback-driven architecture used elsewhere. This means that you have to listen for both success and failure events after each operation, rather than having errors and success indicators returned with the operation’s results.

Example 6-29 creates two tables with a many-to-many relationship. The order of operation is:

Example 6-29. Saving records and associations using Sequelize

var Sequelize = require('sequelize');

var db = new Sequelize('upandrunning', 'dev', 'dev', {
  host: 'localhost'
});

var Author = db.define('Author', {
  name: Sequelize.STRING,
  biography: Sequelize.TEXT
});

var Book = db.define('Book', {
  name: Sequelize.STRING
});

Author.hasMany(Book);
Book.hasMany(Author);

db.sync().on('success', function() {
  Book.build({
    name: 'Through the Storm'
  }).save().on('success', function(book) {
    console.log('Book saved');
    Author.build({
      name: 'Lynne Spears',
      biography: 'Author and mother of Britney'
    }).save().on('success', function(record) {
      console.log('Author saved.');
      record.setBooks([book]);
      record.save().on('success', function() {
        console.log('Author & Book Relation created');
      });
    });
  }).on('failure', function(error) {
    console.log('Could not save book');
  });
}).on('failure', function(error) {
  console.log('Failed to sync database');
});

To ensure that the entities are set up correctly, we do not create the author until after the book is successfully saved into the database. Likewise, the book is not added to the author until after the author has been successfully saved into the database. This ensures that both the author’s ID and the book’s ID are available for Sequelize to establish the association. The output is:

Executing: CREATE TABLE IF NOT EXISTS `AuthorsBooks` 
           (`BookId` INT , `AuthorId` INT , `createdAt` DATETIME NOT NULL, 
           `updatedAt` DATETIME NOT NULL, 
           PRIMARY KEY (`BookId`, `AuthorId`)) ENGINE=InnoDB;
Executing: CREATE TABLE IF NOT EXISTS `Authors` 
           (`name` VARCHAR(255), `biography` TEXT, 
           `id` INT NOT NULL auto_increment , `createdAt` DATETIME NOT NULL, 
           `updatedAt` DATETIME NOT NULL, PRIMARY KEY (`id`)) 
           ENGINE=InnoDB;
Executing: CREATE TABLE IF NOT EXISTS `Books` 
           (`name` VARCHAR(255), `id` INT NOT NULL auto_increment , 
           `createdAt` DATETIME NOT NULL, `updatedAt` DATETIME NOT NULL, 
           PRIMARY KEY (`id`)) ENGINE=InnoDB;
Executing: CREATE TABLE IF NOT EXISTS `AuthorsBooks` 
           (`BookId` INT , `AuthorId` INT , `createdAt` DATETIME NOT NULL,
           `updatedAt` DATETIME NOT NULL, 
           PRIMARY KEY (`BookId`, `AuthorId`)) ENGINE=InnoDB;
Executing: INSERT INTO `Books` (`name`,`id`,`createdAt`,`updatedAt`)
           VALUES ('Through the Storm',NULL,'2011-12-01 20:51:59',
                      '2011-12-01 20:51:59');
Book saved
Executing: INSERT INTO `Authors` (`name`,`biography`,`id`,`createdAt`,`updatedAt`)
           VALUES ('Lynne Spears','Author and mother of Britney',
                      NULL,'2011-12-01 20:51:59','2011-12-01 20:51:59');
Author saved.
Executing: UPDATE `Authors` SET `name`='Lynne Spears',
           `biography`='Author and mother of Britney',`id`=3,
           `createdAt`='2011-12-01 20:51:59',
           `updatedAt`='2011-12-01 20:51:59' WHERE `id`=3
Author & Book Relation created
Executing: SELECT * FROM `AuthorsBooks` WHERE `AuthorId`=3;
Executing: INSERT INTO `AuthorsBooks` (`AuthorId`,`BookId`,`createdAt`,`updatedAt`)
           VALUES (3,3,'2011-12-01 20:51:59','2011-12-01 20:51:59');

PostgreSQL is an object-oriented RDBMS originating from the University of California, Berkeley. The project was started by professor and project leader Michael Stonebraker as a successor to his earlier Ingres database system, and from 1985 to 1993 the Postgres team released four versions of the software. By the end of the project, the team was overwhelmed by support and feature requests from its growing number of users. After the Berkeley run, open source developers took over the project, replacing the original QUEL language interpreter with an SQL language interpreter and renaming the project to PostgreSQL. Since the first release of PostgreSQL 6.0 in 1997, the database system has gained a reputation as a feature-rich distribution that is especially friendly to users coming from an Oracle background.

When typing the SQL queries by hand, as we have seen, you might find it tempting to throw data values directly into the code through string concatenation, but wise programmers seek out methods that protect against SQL injection attacks. The pg library accepts parameterized queries, which should be leveraged everywhere that you use values taken from external sources (such as forms on websites). Example 6-31 demonstrates an insertion, and Examples 6-32 and 6-33 show updates and deletes, respectively.

The output is:

{ rows: [], command: 'INSERT', rowCount: 1, oid: 0 }

The query command accepts the SQL statement in the first parameter, and an array of values in the second parameter. Whereas the MySQL driver used question marks for the parameter values, PostgreSQL uses numbered parameters. Numbering the parameters gives you a lot of control over how variables are constructed.

Production environments are often composed of multiple resources: web servers, caching servers, and database servers. The database is typically hosted on a separate machine from the web server, allowing horizontal growth of the public-facing website without the need for setting up and configuring complex database clusters. Application developers must therefore be aware of the performance implications in accessing resources and how those access costs affect their site’s performance.

Connection pooling is an important concept in web development because the performance cost of establishing a database connection is relatively high; creating one or more new connections for every request creates an unnecessary burden on a heavily trafficked site and will contribute to weaker performance. The solution is to maintain database connections inside a cache pool after they are no longer needed, so they can be used immediately by the next incoming request.

Many database drivers provide pooling functionality, but that pattern goes against Node’s “one module, one purpose” philosophy. Instead, Node developers should use the generic-pool module in front of their data layer to serve new database connections (see Example 6-34). generic-pool will reuse connections where possible to prevent the overhead of creating new database connections, and the module can be used with any data library.

The output is:

pool mysql - dispense() clients=1 available=0
pool mysql - dispense() - creating obj - count=1
[ { id: 1, user_login: 'mwilson' } ]
pool mysql - timeout: 1319413992199
pool mysql - dispense() clients=0 available=1
pool mysql - availableObjects.length=1
pool mysql - availableObjects.length=1
pool mysql - removeIdle() destroying obj - now:1319413992211 timeout:1319413992199
pool mysql - removeIdle() all objects removed

The pool works through the magic of the create and destroy functions. When a consumer attempts to acquire a connection, the pool will call the create function if no connections have already been opened. If the connection sits idle for too long (an interval indicated in milliseconds by the idleTimeoutMillis attribute), it is destroyed and its memory resources freed.

The beauty of Node’s pool is that any persistent resource can be represented. Databases are a natural fit, but you can just as easily write commands to maintain connections to an outside session cache, or even to hardware interfaces.

We used a mailman analogy earlier to describe Node’s event loop. If the mailman were to arrive at a closed gate, he would be unable to deliver his message; but imagine an elderly and kind groundskeeper was in the process of opening the gate so the mailman could pass through. Being elderly and somewhat frail from his years of service, it takes the groundskeeper some time to clear the way—time during which the mailman is unable to deliver any messages.

This situation is a blocking process, but it is not a permanent state. Evenually the groundskeeper will manage to get the gate open, and the mailman will go about his business. Every house the mailman reaches with a similar gate-opening process will slow down the overall route. In the context of a Node application, this type of block will seriously degrade performance.

In the computer realm, similar situations may be caused by sending a user email during a registration process, by lots of math that needs to be done as a result of user input, or by any situation in which the time it takes to complete a task exceeds a user’s normally expected wait times. Node’s event-driven design handles the majority of these situations for you by using asynchronous functions and callbacks, but when an event is particularly “heavy” to process, it doesn’t make sense to process it inside Node. Node should only take care of handling results and fast operations.

By way of example, consider a generic user registration process. When a user registers herself, the application saves a new record in the database, sends an email to that user, and perhaps records some statistics about the registration process, such as the number of steps completed or amount of time taken. It probably doesn’t make sense to perform all of those actions right away when the user hits the Submit button on your web page. For one thing, the email process could take several seconds (or if you’re unlucky, minutes) to complete, the database call may not need to finish before the user is welcomed, and the statistics are probably separate from your main application flow. In this case, you might choose to generate a message that notifies other parts of your application instead—perhaps running on a different machine entirely—that a user has registered. This is known as a publish-subscribe pattern.

Another example: suppose you have a cluster of machines running Node.js. When a new machine is added to the cluster, it issues a message requesting configuration information. A configuration server responds to the message with a list of configuration information the new machine needs to integrate into the cluster. This is known as a request-reply pattern.

Message queues allow programmers to publish events and move on, enabling improved performance through parallel processing and higher levels of scalability through inter-process communication channels.

RabbitMQ is a message broker that supports the advanced message queueing protocol (AMQP). It is useful in situations where data needs to be communicated between different servers, or between different processes on the same server. Written in Erlang, RabbitMQ is capable of clustering for high availability, and is fairly straightforward to install and begin using.

RabbitMQ communicates using the standardized protocol AMQP. AMQP comes from the financial services industry, where reliable messaging is a matter of life or death. It provides a vendor-neutral and abstract specification for generic (not just financial) middleware messaging and is intended to solve the problem of communicating between different types of systems. AMQP is conceptually similar to email: email messages have specifications for headers and format, but their contents can be anything from text to photos and video. Just as two companies don’t need to run the same email server software to communicate, AMQP allows messaging between different platforms. For example, a publisher written in PHP can send a message to a consumer written in JavaScript.

Example 6-35 shows the most basic elements of RabbitMQ programming.

The output is:

Connected to RabbitMQ
Queue opened
Queue bound
Consumer has subscribed, publishing message.
Message received:
{ hello: 'world' }

The createConnection command opens a connection to the RabbitMQ message broker, which in this case defaults (as per AMQP) to localhost on port 5672. If necessary, this command can be overloaded; for example:

createConnection({host: 'dev.mycompany.com', port: 5555})

Next, a queue and exchange are defined. This step is not strictly required, because AMQP brokers are required to provide a default exchange, but by specifying up-and-running as the exchange name, you insulate your application from other exchanges that could be running on the server. An exchange is an entity that receives messages and passes them forward to attached queues.

The queue doesn’t do anything by itself; it must be bound to an exchange before it will do anything. The command q.bind(e, '#') instructs AMQP to attach the queue named up-and-running-queue to the exchange named up-and-running, and to listen for all messages passed to the exchange (the '#' parameter). You could easily change the # to some specific key to filter out messages.

Once the queue and exchange have been declared, an event is set up for basicConsumeOk, which is an event generated by the AMQP library when a client subscribes to a queue. When that happens, Node will publish a “hello world” message to the exchange under a filtering key of routingKey. In this example, the filter key doesn’t matter, because the queue is bound to all keys (via the bind('#') command), but a central tenet of AMQP is that the publisher is never aware of which subscribers (if any) are connected, so a routing key is supplied in any case.

Finally, the subscribe command is issued. The callback function that is passed as its argument is called every time an eligible message is received by the exchange and passed through to the queue. In this case, the callback causes the program to end, which is good for demonstration purposes, but in “real” applications it’s unlikely you would do this. When the subscribe command is successful, AMQP dispatches the basicConsumeOk event, which triggers the publishing of the “hello world” message and subsequently ends the demonstration program.

Queues are useful when long-running tasks take longer than is acceptable to the user (such as during a web page load) or when the task would otherwise block the application. Using RabbitMQ, is it possible to split tasks among multiple workers and ensure that tasks are completed even if the first worker that handles them dies mid-process (Example 6-36).

This example is a modified version of the straight publish-subscribe example from the previous section, but it is just a publisher, so the event listener for subscribing is gone. In its place is an interval timer that publishes a message to the queue every 1,000 milliseconds (that is, every second). The message contains a count variable that is incremented during each publish. This code can be used to implement a simple worker application. Example 6-37 shows the corresponding client.

The client works by taking a message from the queue, processing it (in this example, sleeping for 5 seconds), and then taking the next message from the queue and repeating. Although there is no “sleep” function in Node, you can fake it with a blocking loop, as done here.

There is a problem. Recall that the publisher posts a message to the queue every second. Because the client takes 5 seconds to process each message, it will very quickly get far behind the publisher. The solution? Open another window and run a second client, and now the messages are processed twice as fast. It’s still not quick enough to handle the volume produced by the publisher, but adding more clients can further spread the load and keep the unprocessed messages from falling behind. This setup is referred to as worker queues.

Worker queues function by round-robining the message publishing between clients connected to a named queue. The {ack:true} parameter to the subscribe command instructs AMQP to wait for the user to acknowledge that the processing has been completed for a message. The shift method provides that acknowledgment by shifting the message off the queue and removing it from service. This way, if the worker happens to die while processing a message, the RabbitMQ broker will send the message to the next available client. There is no timeout; as long as the client is connected, the message will be removed from play. Only when the client disconnects without acknowledging a message will it be sent to the next client.



[15] MVCC stands for multi-version concurrency control.