1 |
|
2 | debug = require('debug')('msgflo:amqp')
|
3 | async = require 'async'
|
4 | interfaces = require './interfaces'
|
5 |
|
6 | try
|
7 | amqp = require 'amqplib/callback_api'
|
8 | catch e
|
9 | amqp = e
|
10 |
|
11 |
|
12 | class Client extends interfaces.MessagingClient
|
13 | constructor: (@address, @options={}) ->
|
14 | @connection = null
|
15 | @channel = null
|
16 | @options.prefetch = 2 if not @options.prefetch?
|
17 |
|
18 |
|
19 | connect: (callback) ->
|
20 | debug 'connect', @address
|
21 | if amqp.message
|
22 | return callback amqp
|
23 |
|
24 | amqp.connect @address, (err, conn) =>
|
25 | debug 'connected', err
|
26 | return callback err if err
|
27 | @connection = conn
|
28 | conn.createChannel (err, ch) =>
|
29 | debug 'channel created', err
|
30 | return callback err if err
|
31 | @channel = ch
|
32 | debug 'setting prefetch', @options.prefetch
|
33 | @channel.prefetch @options.prefetch
|
34 | @channel.on 'close', () ->
|
35 | debug 'channel closed'
|
36 | @channel.on 'error', (err) ->
|
37 | throw err if err
|
38 | return callback null
|
39 |
|
40 | disconnect: (callback) ->
|
41 | debug 'disconnect'
|
42 | return callback null if not @connection
|
43 | return callback null if not @channel
|
44 | @channel.close (err) =>
|
45 | debug 'channel closed', err
|
46 | @channel = null
|
47 | @connection.close (err) =>
|
48 | debug 'connection closed'
|
49 | @connection = null
|
50 | return callback err
|
51 |
|
52 |
|
53 | createQueue: (type, queueName, options, callback) ->
|
54 | if not callback
|
55 | callback = options
|
56 | options = {}
|
57 |
|
58 | debug 'create queue', type, queueName, options
|
59 | queueOptions =
|
60 | deadLetterExchange: 'dead-'+queueName
|
61 | exchangeOptions = {}
|
62 | exchangeName = queueName
|
63 |
|
64 | if options.persistent? and not options.persistent
|
65 | queueOptions.durable = false
|
66 | queueOptions.autoDelete = true
|
67 | exchangeOptions.durable = false
|
68 | exchangeOptions.autoDelete = true
|
69 |
|
70 | if type == 'inqueue'
|
71 | @channel.assertQueue queueName, queueOptions, (err) =>
|
72 |
|
73 |
|
74 |
|
75 | @channel.assertExchange exchangeName, 'fanout', exchangeOptions, (err) =>
|
76 | return callback err if err
|
77 | @channel.bindQueue exchangeName, queueName, '', {}, callback
|
78 | else
|
79 | @channel.assertExchange exchangeName, 'fanout', exchangeOptions, callback
|
80 |
|
81 | removeQueue: (type, queueName, callback) ->
|
82 | debug 'remove queue', type, queueName
|
83 | if type == 'inqueue'
|
84 | @channel.deleteQueue queueName, {}, callback
|
85 | else
|
86 | exchangeName = queueName
|
87 | @channel.deleteExchange exchangeName, {}, callback
|
88 |
|
89 |
|
90 | sendTo: (type, name, message, callback) ->
|
91 | return callback new Error 'msgflo.amqp.sendTo(): Not connected' if not @channel
|
92 |
|
93 | data = new Buffer JSON.stringify message
|
94 | showLimit = 80
|
95 | dataShow = if data.length > showLimit then data.slice(0, showLimit)+'...' else data
|
96 | debug 'sendTo', type, name, dataShow
|
97 | if type == 'inqueue'
|
98 |
|
99 | exchange = ''
|
100 | routingKey = name
|
101 | else
|
102 |
|
103 | exchange = name
|
104 | routingKey = ''
|
105 | @channel.publish exchange, routingKey, data
|
106 | return callback null
|
107 |
|
108 |
|
109 | subscribeToQueue: (queueName, handler, callback) ->
|
110 | return callback new Error 'msgflo.amqp.subscribeToQueue(): Not connected' if not @channel
|
111 | debug 'subscribe', queueName
|
112 |
|
113 | deserialize = (message) =>
|
114 | debug 'receive on queue', queueName, message.fields.deliveryTag
|
115 | data = null
|
116 | try
|
117 | data = JSON.parse message.content.toString()
|
118 | catch e
|
119 | data = message.content.toString()
|
120 | out =
|
121 | amqp: message
|
122 | data: data
|
123 | return handler out
|
124 | @channel.consume queueName, deserialize
|
125 | debug 'subscribed', queueName
|
126 | return callback null
|
127 |
|
128 |
|
129 | ackMessage: (message) ->
|
130 | return if not @channel
|
131 | fields = message.amqp.fields
|
132 | debug 'ACK', fields.routingKey, fields.deliveryTag
|
133 |
|
134 | @channel.ack message.amqp, false
|
135 | nackMessage: (message) ->
|
136 | return if not @channel
|
137 | fields = message.amqp.fields
|
138 | debug 'NACK', fields.routingKey, fields.deliveryTag
|
139 | @channel.nack message.amqp, false, false
|
140 |
|
141 |
|
142 | registerParticipant: (part, callback) ->
|
143 | msg =
|
144 | protocol: 'discovery'
|
145 | command: 'participant'
|
146 | payload: part
|
147 | @channel.assertQueue 'fbp'
|
148 | data = new Buffer JSON.stringify msg
|
149 | @channel.sendToQueue 'fbp', data
|
150 | return callback null
|
151 |
|
152 | class MessageBroker extends Client
|
153 | constructor: (address, options) ->
|
154 | super address, options
|
155 |
|
156 | addBinding: (binding, callback) ->
|
157 |
|
158 | debug 'Broker.addBinding', binding
|
159 | if binding.type == 'pubsub'
|
160 | @channel.bindQueue binding.tgt, binding.src, '', {}, callback
|
161 | else if binding.type == 'roundrobin'
|
162 | pattern = ''
|
163 | bindSrcTgt = (callback) =>
|
164 |
|
165 | debug 'binding src to tgt', binding.src, binding.tgt
|
166 | directExchange = 'out-'+binding.src
|
167 | directOptions = {}
|
168 | @channel.assertExchange directExchange, 'direct', directOptions, (err) =>
|
169 | return callback err if err
|
170 |
|
171 | @channel.bindExchange directExchange, binding.src, pattern, (err), =>
|
172 | return callback err if err
|
173 |
|
174 | @channel.bindQueue binding.tgt, directExchange, pattern, {}, (err) =>
|
175 | return callback err
|
176 |
|
177 | bindDeadLetter = (callback) =>
|
178 |
|
179 | debug 'binding deadletter queue', binding.deadletter, binding.tgt
|
180 | deadLetterExchange = 'dead-'+binding.tgt
|
181 | deadLetterOptions = {}
|
182 | @channel.assertExchange deadLetterExchange, 'fanout', deadLetterOptions, (err) =>
|
183 | return callback err if err
|
184 | @channel.bindQueue binding.deadletter, deadLetterExchange, pattern, {}, callback
|
185 |
|
186 | steps = []
|
187 | steps.push bindSrcTgt if binding.src and binding.tgt
|
188 | steps.push bindDeadLetter if binding.deadletter and binding.tgt
|
189 | async.series steps, callback
|
190 |
|
191 | else
|
192 | return callback new Error 'Unsupported binding type: '+binding.type
|
193 | removeBinding: (binding, callback) ->
|
194 |
|
195 | return callback null
|
196 | listBindings: (from, callback) ->
|
197 | return callback null, []
|
198 |
|
199 |
|
200 | subscribeParticipantChange: (handler) ->
|
201 | deserialize = (message) =>
|
202 | debug 'receive on fbp', message.fields.deliveryTag
|
203 | data = null
|
204 | try
|
205 | data = JSON.parse message.content.toString()
|
206 | catch e
|
207 | debug 'JSON exception:', e
|
208 | out =
|
209 | amqp: message
|
210 | data: data
|
211 | return handler out
|
212 |
|
213 | @channel.assertQueue 'fbp'
|
214 | @channel.consume 'fbp', deserialize
|
215 |
|
216 | exports.Client = Client
|
217 | exports.MessageBroker = MessageBroker
|