UNPKG

5.41 kBJavaScriptView Raw
1'use strict';
2
3var assert = require('assert');
4var util = require('util');
5var EventEmitter = require('events');
6var _ = require('lodash');
7var protocol = require('./protocol');
8var Message = protocol.Message;
9var KeyedMessage = protocol.KeyedMessage;
10var ProduceRequest = protocol.ProduceRequest;
11var partitioner = require('./partitioner');
12var DefaultPartitioner = partitioner.DefaultPartitioner;
13var RandomPartitioner = partitioner.RandomPartitioner;
14var CyclicPartitioner = partitioner.CyclicPartitioner;
15var KeyedPartitioner = partitioner.KeyedPartitioner;
16var CustomPartitioner = partitioner.CustomPartitioner;
17
18var PARTITIONER_TYPES = {
19 default: 0,
20 random: 1,
21 cyclic: 2,
22 keyed: 3,
23 custom: 4
24};
25
26var PARTITIONER_MAP = {
27 0: DefaultPartitioner,
28 1: RandomPartitioner,
29 2: CyclicPartitioner,
30 3: KeyedPartitioner,
31 4: CustomPartitioner
32};
33
34var DEFAULTS = {
35 requireAcks: 1,
36 ackTimeoutMs: 100
37};
38
39/**
40 * Provides common functionality for a kafka producer
41 *
42 * @param {Client} client A kafka client object to use for the producer
43 * @param {Object} [options] An object containing configuration options
44 * @param {Number} [options.requireAcks=1] Configuration for when to consider a message as acknowledged.
45 * <li>0 = No ack required</li>
46 * <li>1 = Leader ack required</li>
47 * <li>-1 = All in sync replicas ack required</li>
48 *
49 * @param {Number} [options.ackTimeoutMs=100] The amount of time in milliseconds to wait for all acks before considered
50 * the message as errored
51 * @param {Number} [defaultPartitionType] The default partitioner type
52 * @param {Object} [customPartitioner] a custom partitinoer to use of the form: function (partitions, key)
53 * @constructor
54 */
55function BaseProducer (client, options, defaultPartitionerType, customPartitioner) {
56 EventEmitter.call(this);
57 options = options || {};
58
59 this.ready = false;
60 this.client = client;
61
62 this.requireAcks = options.requireAcks === undefined ? DEFAULTS.requireAcks : options.requireAcks;
63 this.ackTimeoutMs = options.ackTimeoutMs === undefined ? DEFAULTS.ackTimeoutMs : options.ackTimeoutMs;
64
65 if (customPartitioner !== undefined && options.partitionerType !== PARTITIONER_TYPES.custom) {
66 throw new Error('Partitioner Type must be custom if providing a customPartitioner.');
67 } else if (customPartitioner === undefined && options.partitionerType === PARTITIONER_TYPES.custom) {
68 throw new Error('No customer partitioner defined');
69 }
70
71 var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
72
73 // eslint-disable-next-line
74 this.partitioner = new partitionerType(customPartitioner);
75
76 this.connect();
77}
78
79util.inherits(BaseProducer, EventEmitter);
80
81BaseProducer.prototype.connect = function () {
82 // emiter...
83 var self = this;
84 this.ready = this.client.ready;
85 if (this.ready) self.emit('ready');
86 this.client.on('ready', function () {
87 if (!self.ready) {
88 self.ready = true;
89 self.emit('ready');
90 }
91 });
92 this.client.on('brokersChanged', function () {
93 let topics = Object.keys(this.topicMetadata);
94 this.refreshMetadata(topics, function (error) {
95 if (error) {
96 self.emit('error', error);
97 }
98 });
99 });
100 this.client.on('error', function (err) {
101 self.emit('error', err);
102 });
103 this.client.on('close', function () {});
104};
105
106/**
107 * Sends a new message or array of messages to a topic/partition
108 * This will use the
109 *
110 * @see Client#sendProduceRequest for a more low level way to send messages to kafka
111 *
112 * @param {Array.<BaseProducer~sendPayload>} payloads An array of topic payloads
113 * @param {BaseProducer~sendCallback} cb A function to call once the send has completed
114 */
115BaseProducer.prototype.send = function (payloads, cb) {
116 var client = this.client;
117 var requireAcks = this.requireAcks;
118 var ackTimeoutMs = this.ackTimeoutMs;
119
120 client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb);
121};
122
123BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
124 const topicPartitionRequests = Object.create(null);
125 payloads.forEach(p => {
126 p.partition = p.hasOwnProperty('partition')
127 ? p.partition
128 : this.partitioner.getPartition(_.map(topicMetadata[p.topic], 'partition'), p.key);
129 p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
130 let messages = _.isArray(p.messages) ? p.messages : [p.messages];
131
132 messages = messages.map(function (message) {
133 if (message instanceof KeyedMessage) {
134 return message;
135 }
136 return new Message(0, 0, p.key, message, p.timestamp || Date.now());
137 });
138
139 let key = p.topic + p.partition;
140 let request = topicPartitionRequests[key];
141
142 if (request == null) {
143 topicPartitionRequests[key] = new ProduceRequest(p.topic, p.partition, messages, p.attributes);
144 } else {
145 assert(request.attributes === p.attributes);
146 Array.prototype.push.apply(request.messages, messages);
147 }
148 });
149 return _.values(topicPartitionRequests);
150};
151
152BaseProducer.prototype.createTopics = function (topics, async, cb) {
153 if (!this.ready) {
154 return cb(new Error('Producer not ready!'));
155 }
156
157 this.client.createTopics(topics, async, cb);
158};
159
160BaseProducer.prototype.close = function (cb) {
161 this.client.close(cb);
162};
163
164BaseProducer.PARTITIONER_TYPES = PARTITIONER_TYPES;
165
166module.exports = BaseProducer;