Tutorial: consumer-per-partition

consumer-per-partition

A consumer that is subscribed to multiple partitions can control the mix of messages consumed from each partition. How this is done is explained here.

The example below simulates a partition 0 which is slow (2s per consume). Other partitions consume at a rate of 0.5s. To use the example, create a topic "test" with two partitions. Produce 500 message to both partitions. This example does not require an active producer. Run the example to see the result. Run multiple instances to see the rebalancing take effect.

/*
 * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
 *
 * Copyright (c) 2016 Blizzard Entertainment
 *
 * This software may be modified and distributed under the terms
 * of the MIT license.  See the LICENSE.txt file for details.
 */

var Kafka = require('../');

var consumer = new Kafka.KafkaConsumer({
  //'debug': 'all',
  'metadata.broker.list': 'localhost:9092',
  'group.id': 'node-rdkafka-consumer-per-partition-example',
  'enable.auto.commit': false,
  'rebalance_cb': true,
}, {
  'auto.offset.reset': 'earliest', // start from the beginning
});

var topicName = 'test';

// Keep track of which partitions are assigned.
var assignments = [];

//logging debug messages, if debug is enabled
consumer.on('event.log', function(log) {
  console.log(log);
});

//logging all errors
consumer.on('event.error', function(err) {
  console.error('Error from consumer');
  console.error(err);
});

consumer.on('ready', function(arg) {
  console.log('consumer ready: ' + JSON.stringify(arg));

  consumer.subscribe([topicName]);

  // Remove the default timeout so that we won't wait on each consume
  consumer.setDefaultConsumeTimeout(0);

  // start a regular consume loop in flowing mode. This won't result in any
  // messages because will we start consuming from a partition directly.
  // This is required to serve the rebalancing events
  consumer.consume();
});

// Start our own consume loops for all newly assigned partitions
consumer.on('rebalance', function(err, updatedAssignments) {
  console.log('rebalancing done, got partitions assigned: ', updatedAssignments.map(function(a) {
    return a.partition;
  }));

  // find new assignments
  var newAssignments = updatedAssignments.filter(function (updatedAssignment) {
    return !assignments.some(function (assignment) {
      return assignment.partition === updatedAssignment.partition;
    });
  });

  // update global assignments array
  assignments = updatedAssignments;

  // then start consume loops for the new assignments
  newAssignments.forEach(function (assignment) {
    startConsumeMessages(assignment.partition);
  });
});

function startConsumeMessages(partition) {
  console.log('partition: ' + partition + ' starting to consume');

  function consume() {
    var isPartitionAssigned = assignments.some(function(assignment) {
      return assignment.partition === partition;
    });

    if (!isPartitionAssigned) {
      console.log('partition: ' + partition + ' stop consuming');
      return;
    }

    // consume per 5 messages
    consumer.consume(5, topicName, partition, callback);
  }

  function callback(err, messages) {
    messages.forEach(function(message) {
      // consume the message
      console.log('partition ' + message.partition + ' value ' + message.value.toString());
      consumer.commitMessage(message);
    });

    if (messages.length > 0) {
      consumer.commitMessage(messages.pop());
    }

    // simulate performance
    setTimeout(consume, partition === 0 ? 2000 : 500);
  }

  // kick-off recursive consume loop
  consume();
}

consumer.on('disconnected', function(arg) {
  console.log('consumer disconnected. ' + JSON.stringify(arg));
});

//starting the consumer
consumer.connect();

//stopping this example after 30s
setTimeout(function() {
  consumer.disconnect();
}, 30000);