UNPKG

msgflo

Version:

Polyglot FBP runtime based on message queues

133 lines (116 loc) 3.94 kB
amqp = require 'amqplib/callback_api' debug = require('debug')('msgflo:amqp') interfaces = require './interfaces' class Client constructor: (@address, @options={}) -> @connection = null @channel = null @options.prefetch = 2 if not @options.prefetch? ## Broker connection management connect: (callback) -> debug 'connect' amqp.connect @address, (err, conn) => debug 'connected', err return callback err if err @connection = conn conn.createChannel (err, ch) => debug 'channel created', err return callback err if err @channel = ch debug 'setting prefetch', @options.prefetch @channel.prefetch @options.prefetch @channel.on 'close', () -> debug 'channel closed' @channel.on 'error', (err) -> throw err if err return callback null disconnect: (callback) -> @channel.close (err) => debug 'close', err return callback err ## Manipulating queues createQueue: (type, queueName, callback) -> options = {} if type == 'inqueue' @channel.assertQueue queueName, options, callback else @channel.assertExchange queueName, 'fanout', options, (err) => return callback err if err # HACK: to make inqueue==outqueue work: @channel.assertQueue queueName, options, (err) => @channel.bindQueue queueName, queueName, '', {}, callback removeQueue: (type, queueName, callback) -> # FIXME: do something here? return callback null ## Sending/Receiving messages sendToQueue: (exchangeName, message, callback) -> # queue must exists debug 'send', exchangeName data = new Buffer JSON.stringify message routingKey = '' # ignored for fan-out exchanges @channel.publish exchangeName, routingKey, data, (err) -> throw err if err return callback null subscribeToQueue: (queueName, handler, callback) -> debug 'subscribe', queueName # queue must exists deserialize = (message) => debug 'receive on queue', queueName, message.fields.deliveryTag data = null try data = JSON.parse message.content.toString() catch e debug 'JSON exception:', e out = amqp: message data: data return handler out @channel.consume queueName, deserialize debug 'subscribed', queueName return callback null ## ACK/NACK messages ackMessage: (message) -> fields = message.amqp.fields debug 'ACK', fields.routingKey, fields.deliveryTag # NOTE: server will only give us new message after this @channel.ack message.amqp, false nackMessage: (message) -> fields = message.amqp.fields debug 'NACK', fields.routingKey, fields.deliveryTag @channel.nack message.amqp, false # Participant registration registerParticipant: (part, callback) -> msg = protocol: 'discovery' command: 'participant' payload: part @channel.assertQueue 'fbp' data = new Buffer JSON.stringify msg @channel.sendToQueue 'fbp', data return callback null class MessageBroker extends Client constructor: (address, options) -> super address, options bindQueue: (from, to, callback) -> debug 'bind', from, to @channel.bindQueue to, from, '', {}, callback unbindQueue: (from, to, callback) -> return callback null listBindings: (from, callback) -> return callback null # Participant registration subscribeParticipantChange: (handler) -> deserialize = (message) => debug 'receive on fbp', message.fields.deliveryTag data = null try data = JSON.parse message.content.toString() catch e debug 'JSON exception:', e out = amqp: message data: data return handler out @channel.assertQueue 'fbp' @channel.consume 'fbp', deserialize exports.Client = Client exports.MessageBroker = MessageBroker