Class: HighLevelProducer

HighLevelProducer

new HighLevelProducer(conf, topicConf)

Producer class for sending messages to Kafka in a higher level fashion This is the main entry point for writing data to Kafka if you want more functionality than librdkafka supports out of the box. You configure this like you do any other client, with a global configuration and default topic configuration. Once you instantiate this object, you need to connect to it first. This allows you to get the metadata and make sure the connection can be made before you depend on it. After that, problems with the connection will by brought down by using poll, which automatically runs when a transaction is made on the object. This has a few restrictions, so it is not for free! 1. You may not define opaque tokens The higher level producer is powered by opaque tokens. 2. Every message ack will dispatch an event on the node thread. 3. Will use a ref counter to determine if there are outgoing produces. This will return the new object you should use instead when doing your produce calls

Parameters:

Name Type Description
conf object Key value pairs to configure the producer
topicConf object Key value pairs to create a default topic configuration
Source:

_disconnect

Save the base disconnect method here so we can overwrite it and add a flush

Inherited From:
Source:

_metadata :Client~Metadata

Metadata object. Starts out empty but will be filled with information after the initial connect.

Type:
Inherited From:
Source:

_modifiedProduce(topic, partition, message, key, timestamp, headers, callback) → {boolean}

Produce a message to Kafka asynchronously. This is the method mainly used in this class. Use it to produce a message to Kafka. When this is sent off, and you recieve your callback, the assurances afforded to you will be equal to those provided by your ack level.

Parameters:

Name Type Description
topic string The topic name to produce to.
partition number | null The partition number to produce to.
message Buffer | null The message to produce.
key string The key associated with the message.
timestamp number | null Timestamp to send with the message.
headers object A list of custom key value pairs that provide message metadata.
callback function Callback to call when the delivery report is recieved.
Source:
See:

Throws:

- Throws a librdkafka error if it failed.
Type
LibrdKafkaError
Returns:
- returns an error if it failed, or true if not
Type
boolean

abortTransaction(timeout, cb) → {Producer}

Aborts the ongoing transaction.

Parameters:

Name Type Description
timeout number Number of milliseconds to try to abort, defaults to 5 seconds
cb function Callback to return when operation is completed
Inherited From:
Source:
Returns:
- returns itself.
Type
Producer

beginTransaction() → {Producer}

Begin a transaction. 'initTransaction' must have been called successfully (once) before this function is called.

Inherited From:
Source:
Returns:
- returns itself.
Type
Producer

commitTransaction(timeout, cb) → {Producer}

Commit the current transaction (as started with 'beginTransaction').

Parameters:

Name Type Description
timeout number Number of milliseconds to try to commit before giving up, defaults to 5 seconds
cb function Callback to return when operation is completed
Inherited From:
Source:
Returns:
- returns itself.
Type
Producer

connect(metadataOptions, cb) → {Client}

Connect to the broker and receive its metadata. Connects to a broker by establishing the client and fetches its metadata.

Parameters:

Name Type Description
metadataOptions object Options to be sent to the metadata.
Properties
Name Type Description
topic string Topic to fetch metadata for. Empty string is treated as empty.
allTopics boolean Fetch metadata for all topics, not just the ones we know about.
timeout int The timeout, in ms, to allow for fetching metadata. Defaults to 30000ms
cb Client~connectionCallback Callback that indicates we are done connecting.
Inherited From:
Source:
Returns:
- Returns itself.
Type
Client

connectedTime() → {number}

Find out how long we have been connected to Kafka.

Inherited From:
Source:
Returns:
- Milliseconds since the connection has been established.
Type
number

disconnect(timeout, cb)

Disconnect the producer Flush everything on the internal librdkafka producer buffer. Then disconnect

Parameters:

Name Type Description
timeout number Number of milliseconds to try to flush before giving up, defaults to 5 seconds.
cb function The callback to fire when
Inherited From:
Source:

flush(timeout, callback) → {Producer}

Flush the producer Flush everything on the internal librdkafka producer buffer. Do this before disconnects usually

Parameters:

Name Type Description
timeout number Number of milliseconds to try to flush before giving up.
callback function Callback to fire when the flush is done.
Inherited From:
Source:
Returns:
- returns itself.
Type
Producer

getClient() → {Connection}

Get the native Kafka client. You probably shouldn't use this, but if you want to execute methods directly on the c++ wrapper you can do it here.

Inherited From:
Source:
See:
  • connection.cc
Returns:
- The native Kafka client.
Type
Connection

getLastError() → {LibrdKafkaError}

Get the last error emitted if it exists.

Inherited From:
Source:
Returns:
- Returns the LibrdKafkaError or null if one hasn't been thrown.
Type
LibrdKafkaError

getMetadata(metadataOptions, cb)

Get client metadata. Note: using a metadataOptions.topic parameter has a potential side-effect. A Topic object will be created, if it did not exist yet, with default options and it will be cached by librdkafka. A subsequent call to create the topic object with specific options (e.g. acks) will return the previous instance and the specific options will be silently ignored. To avoid this side effect, the topic object can be created with the expected options before requesting metadata, or the metadata request can be performed for all topics (by omitting metadataOptions.topic).

Parameters:

Name Type Description
metadataOptions object Metadata options to pass to the client.
Properties
Name Type Description
topic string Topic string for which to fetch metadata
timeout number Max time, in ms, to try to fetch metadata before timing out. Defaults to 3000.
cb Client~metadataCallback Callback to fire with the metadata.
Inherited From:
Source:

initTransactions(timeout, cb) → {Producer}

Init a transaction. Initialize transactions, this is only performed once per transactional producer.

Parameters:

Name Type Description
timeout number Number of milliseconds to try to initialize before giving up, defaults to 5 seconds.
cb function Callback to return when operation is completed
Inherited From:
Source:
Returns:
- returns itself.
Type
Producer

isConnected() → {boolean}

Whether or not we are connected to Kafka.

Inherited From:
Source:
Returns:
- Whether we are connected.
Type
boolean

offsetsForTimes(toppars, timeout, cb)

Query offsets for times from the broker. This function makes a call to the broker to get the offsets for times specified.

Parameters:

Name Type Description
toppars Array.<TopicPartition> Array of topic partitions. The offset in these should instead refer to a timestamp you want offsets for
timeout number Number of ms to wait to recieve a response.
cb Client~offsetsForTimesCallback Callback to fire with the filled in offsets.
Inherited From:
Source:

poll() → {Producer}

Poll for events We need to run poll in order to learn about new events that have occurred. This is no longer done automatically when we produce, so we need to run it manually, or set the producer to automatically poll.

Inherited From:
Source:
Returns:
- returns itself.
Type
Producer

produce(topic, partition, message, key, timestamp, opaque, headers) → {boolean}

Produce a message to Kafka synchronously. This is the method mainly used in this class. Use it to produce a message to Kafka. When this is sent off, there is no guarantee it is delivered. If you need guaranteed delivery, change your *acks* settings, or use delivery reports.

Parameters:

Name Type Description
topic string The topic name to produce to.
partition number | null The partition number to produce to.
message Buffer | null The message to produce.
key string The key associated with the message.
timestamp number | null Timestamp to send with the message.
opaque object An object you want passed along with this message, if provided.
headers object A list of custom key value pairs that provide message metadata.
Overrides:
Source:
See:

Throws:

- Throws a librdkafka error if it failed.
Type
LibrdKafkaError
Returns:
- returns an error if it failed, or true if not
Type
boolean

queryWatermarkOffsets(topic, partition, timeout, cb)

Query offsets from the broker. This function makes a call to the broker to get the current low (oldest/beginning) and high (newest/end) offsets for a topic partition.

Parameters:

Name Type Description
topic string Topic to recieve offsets from.
partition number Partition of the provided topic to recieve offsets from
timeout number Number of ms to wait to recieve a response.
cb Client~watermarkOffsetsCallback Callback to fire with the offsets.
Inherited From:
Source:

sendOffsetsToTransaction(offsets, consumer, timeout, cb) → {Producer}

Send the current offsets of the consumer to the ongoing transaction.

Parameters:

Name Type Description
offsets number Offsets to send as part of the next commit
consumer Consumer An instance of the consumer
timeout number Number of milliseconds to try to send offsets, defaults to 5 seconds
cb function Callback to return when operation is completed
Inherited From:
Source:
Returns:
- returns itself.
Type
Producer

setKeySerializer()

Set the key serializer This allows the value inside the produce call to differ from the value of the value actually produced to kafka. Good if, for example, you want to serialize it to a particular format.

Source:

setPollInterval(interval) → {Producer}

Set automatic polling for events. We need to run poll in order to learn about new events that have occurred. If you would like this done on an interval with disconnects and reconnections managed, you can do it here

Parameters:

Name Type Description
interval number Interval, in milliseconds, to poll
Inherited From:
Source:
Returns:
- returns itself.
Type
Producer

setValueSerializer()

Set the value serializer This allows the value inside the produce call to differ from the value of the value actually produced to kafka. Good if, for example, you want to serialize it to a particular format.

Source:

Events

disconnected

Disconnect event. Called after disconnection is finished.

Type:

  • object
Properties:
Name Type Description
connectionOpened date when the connection was opened.
Inherited From:
Source:

ready

Ready event. Called when the Client connects successfully

Type:

  • object
Properties:
Name Type Description
name string the name of the broker.
Inherited From:
Source: