UNPKG

7.16 kBtext/coffeescriptView Raw
1
2debug = require('debug')('msgflo:amqp')
3async = require 'async'
4interfaces = require './interfaces'
5
6try
7 amqp = require 'amqplib/callback_api'
8catch e
9 amqp = e
10
11
12class Client extends interfaces.MessagingClient
13 constructor: (@address, @options={}) ->
14 @connection = null
15 @channel = null
16 @options.prefetch = 2 if not @options.prefetch?
17
18 ## Broker connection management
19 connect: (callback) ->
20 debug 'connect', @address
21 if amqp.message
22 return callback amqp
23
24 amqp.connect @address, (err, conn) =>
25 debug 'connected', err
26 return callback err if err
27 @connection = conn
28 conn.createChannel (err, ch) =>
29 debug 'channel created', err
30 return callback err if err
31 @channel = ch
32 debug 'setting prefetch', @options.prefetch
33 @channel.prefetch @options.prefetch
34 @channel.on 'close', () ->
35 debug 'channel closed'
36 @channel.on 'error', (err) ->
37 throw err if err
38 return callback null
39
40 disconnect: (callback) ->
41 debug 'disconnect'
42 return callback null if not @connection
43 return callback null if not @channel
44 @channel.close (err) =>
45 debug 'channel closed', err
46 @channel = null
47 @connection.close (err) =>
48 debug 'connection closed'
49 @connection = null
50 return callback err
51
52 ## Manipulating queues
53 createQueue: (type, queueName, options, callback) ->
54 if not callback
55 callback = options
56 options = {}
57
58 debug 'create queue', type, queueName, options
59 queueOptions =
60 deadLetterExchange: 'dead-'+queueName # if not existing, messages will be dropped
61 exchangeOptions = {}
62 exchangeName = queueName
63
64 if options.persistent? and not options.persistent
65 queueOptions.durable = false
66 queueOptions.autoDelete = true
67 exchangeOptions.durable = false
68 exchangeOptions.autoDelete = true
69
70 if type == 'inqueue'
71 @channel.assertQueue queueName, queueOptions, (err) =>
72 # HACK: to make inqueue==outqueue work without binding.
73 # Has side-effect of creating an implicit exchange.
74 # Better than implicit queue, since a queue holds messages forever if noone is subscribed
75 @channel.assertExchange exchangeName, 'fanout', exchangeOptions, (err) =>
76 return callback err if err
77 @channel.bindQueue exchangeName, queueName, '', {}, callback
78 else
79 @channel.assertExchange exchangeName, 'fanout', exchangeOptions, callback
80
81 removeQueue: (type, queueName, callback) ->
82 debug 'remove queue', type, queueName
83 if type == 'inqueue'
84 @channel.deleteQueue queueName, {}, callback
85 else
86 exchangeName = queueName
87 @channel.deleteExchange exchangeName, {}, callback
88
89 ## Sending/Receiving messages
90 sendTo: (type, name, message, callback) ->
91 return callback new Error 'msgflo.amqp.sendTo(): Not connected' if not @channel
92 # queue must exists
93 data = new Buffer JSON.stringify message
94 showLimit = 80
95 dataShow = if data.length > showLimit then data.slice(0, showLimit)+'...' else data
96 debug 'sendTo', type, name, dataShow
97 if type == 'inqueue'
98 # direct to queue
99 exchange = ''
100 routingKey = name
101 else
102 # to fanout exchange
103 exchange = name
104 routingKey = ''
105 @channel.publish exchange, routingKey, data
106 return callback null
107
108
109 subscribeToQueue: (queueName, handler, callback) ->
110 return callback new Error 'msgflo.amqp.subscribeToQueue(): Not connected' if not @channel
111 debug 'subscribe', queueName
112 # queue must exists
113 deserialize = (message) =>
114 debug 'receive on queue', queueName, message.fields.deliveryTag
115 data = null
116 try
117 data = JSON.parse message.content.toString()
118 catch e
119 data = message.content.toString()
120 out =
121 amqp: message
122 data: data
123 return handler out
124 @channel.consume queueName, deserialize
125 debug 'subscribed', queueName
126 return callback null
127
128 ## ACK/NACK messages
129 ackMessage: (message) ->
130 return if not @channel
131 fields = message.amqp.fields
132 debug 'ACK', fields.routingKey, fields.deliveryTag
133 # NOTE: server will only give us new message after this
134 @channel.ack message.amqp, false
135 nackMessage: (message) ->
136 return if not @channel
137 fields = message.amqp.fields
138 debug 'NACK', fields.routingKey, fields.deliveryTag
139 @channel.nack message.amqp, false, false
140
141 # Participant registration
142 registerParticipant: (part, callback) ->
143 msg =
144 protocol: 'discovery'
145 command: 'participant'
146 payload: part
147 @channel.assertQueue 'fbp'
148 data = new Buffer JSON.stringify msg
149 @channel.sendToQueue 'fbp', data
150 return callback null
151
152class MessageBroker extends Client
153 constructor: (address, options) ->
154 super address, options
155
156 addBinding: (binding, callback) ->
157 # TODO: support roundrobin type
158 debug 'Broker.addBinding', binding
159 if binding.type == 'pubsub'
160 @channel.bindQueue binding.tgt, binding.src, '', {}, callback
161 else if binding.type == 'roundrobin'
162 pattern = ''
163 bindSrcTgt = (callback) =>
164 # TODO: avoid creating the direct exchange?
165 debug 'binding src to tgt', binding.src, binding.tgt
166 directExchange = 'out-'+binding.src
167 directOptions = {}
168 @channel.assertExchange directExchange, 'direct', directOptions, (err) =>
169 return callback err if err
170 # bind input
171 @channel.bindExchange directExchange, binding.src, pattern, (err), =>
172 return callback err if err
173 # bind output
174 @channel.bindQueue binding.tgt, directExchange, pattern, {}, (err) =>
175 return callback err
176
177 bindDeadLetter = (callback) =>
178 # Setup the deadletter exchange, bind to deadletter queue
179 debug 'binding deadletter queue', binding.deadletter, binding.tgt
180 deadLetterExchange = 'dead-'+binding.tgt
181 deadLetterOptions = {}
182 @channel.assertExchange deadLetterExchange, 'fanout', deadLetterOptions, (err) =>
183 return callback err if err
184 @channel.bindQueue binding.deadletter, deadLetterExchange, pattern, {}, callback
185
186 steps = []
187 steps.push bindSrcTgt if binding.src and binding.tgt
188 steps.push bindDeadLetter if binding.deadletter and binding.tgt
189 async.series steps, callback
190
191 else
192 return callback new Error 'Unsupported binding type: '+binding.type
193 removeBinding: (binding, callback) ->
194 # FIXME: implement
195 return callback null
196 listBindings: (from, callback) ->
197 return callback null, []
198
199 # Participant registration
200 subscribeParticipantChange: (handler) ->
201 deserialize = (message) =>
202 debug 'receive on fbp', message.fields.deliveryTag
203 data = null
204 try
205 data = JSON.parse message.content.toString()
206 catch e
207 debug 'JSON exception:', e
208 out =
209 amqp: message
210 data: data
211 return handler out
212
213 @channel.assertQueue 'fbp'
214 @channel.consume 'fbp', deserialize
215
216exports.Client = Client
217exports.MessageBroker = MessageBroker