1 | _ = require 'lodash'
|
2 | Table = require 'cli-table'
|
3 | {EventEmitter} = require 'events'
|
4 | rabbit = require 'amqp'
|
5 | log4js = require 'log4js'
|
6 |
|
7 | json = (obj) -> JSON.stringify obj, null, ''
|
8 |
|
9 | logTable = (logger, table, level='trace', delay=false)->
|
10 | fn = ->
|
11 | return if table._logged and delay
|
12 | table._logged = true
|
13 | logger[level] '\n%s', table.toString()
|
14 |
|
15 | if delay
|
16 | setTimeout fn, delay
|
17 | else
|
18 | fn()
|
19 |
|
20 | module.exports = class Exchange
|
21 |
|
22 | constructor: (@exchangeName, amqpConfig, opts={}) ->
|
23 | logger = @logger = log4js.getLogger @exchangeName
|
24 | logLevel = opts.logLevel or 'trace'
|
25 |
|
26 | @logger.setLevel logLevel
|
27 |
|
28 |
|
29 | @exchangeName.should.be.a 'string', 'argument exchangeName is required'
|
30 | @exchangeName.length.should.not.equal 0, 'argument exchangeName may not be empty'
|
31 | logger.debug 'connecting to amqp', amqpConfig
|
32 |
|
33 | connection = @connection = rabbit.createConnection amqpConfig
|
34 |
|
35 | @_pushQueue = []
|
36 | @_subQueue = []
|
37 |
|
38 |
|
39 | _info = @info = new Table {
|
40 | head: ['', 'RabbitMQ Connection Info']
|
41 | style: compact: true
|
42 | }
|
43 |
|
44 | _info.push(
|
45 | ["connection", "connecting to #{ json(amqpConfig).bold }".yellow]
|
46 | ["exchange", "pending".grey]
|
47 | ["queue", "pending".grey]
|
48 | ["push", "not ready".red]
|
49 | ["subscribe", "not ready".red]
|
50 | )
|
51 |
|
52 | _timestamps = @timestamps =
|
53 | init: Date.now()
|
54 | connection: Date.now()
|
55 |
|
56 |
|
57 |
|
58 | @queues = {}
|
59 |
|
60 |
|
61 |
|
62 | @push = ->
|
63 | logger.debug 'calling push (queued)'
|
64 | @_pushQueue.push arguments
|
65 |
|
66 | @subscribe = ->
|
67 | logger.debug 'calling subscribe (queued)'
|
68 | @_subQueue.push arguments
|
69 |
|
70 |
|
71 |
|
72 |
|
73 |
|
74 | logger.debug 'connecting to exchange', @exchangeName
|
75 | logTable logger, _info, 'fatal', 2000
|
76 |
|
77 | @exchangeReady = new Promise (resolve, reject)=>
|
78 | connection.once 'ready', =>
|
79 | now = Date.now()
|
80 | duration = now - _timestamps.connection
|
81 | _timestamps.connection = now
|
82 |
|
83 | logger.debug 'CONNECTION ..... OK\t'.bold.green
|
84 | logger.trace 'connection ready. Connecting to exchange...'
|
85 |
|
86 | _info[0] = [
|
87 | "connection"
|
88 | "connected to #{ json(amqpConfig).bold } in #{ String(duration).bold }ms"
|
89 | ]
|
90 |
|
91 | _info[1] = [
|
92 | "exchange"
|
93 | "connecting to #{ @exchangeName.bold }".yellow
|
94 | ]
|
95 |
|
96 | _timestamps.exchange = Date.now()
|
97 | logTable logger, _info, 'fatal', 200
|
98 |
|
99 | @exchange = connection.exchange @exchangeName, {}, (err) =>
|
100 | now = Date.now()
|
101 | duration = now - _timestamps.exchange
|
102 | _timestamps.exchange = now
|
103 |
|
104 | _info[1] = [
|
105 | "exchange"
|
106 | "connected to #{ @exchangeName.bold } in #{ String(duration).bold}ms"
|
107 | ]
|
108 |
|
109 | try
|
110 | logger.debug 'EXCHANGE ....... OK\t'.bold.green
|
111 | logger.trace 'exchange [%s] connected!', @exchangeName
|
112 |
|
113 | delete @push
|
114 | delete @subscribe
|
115 |
|
116 | duration = String now - _timestamps.init
|
117 | _info[3] = ['push', "ready in #{ duration.bold }ms, queued: #{@_pushQueue.length}"]
|
118 | _info[4] = ['subscribe', "ready in #{ duration.bold }ms, queued: #{@_subQueue.length}"]
|
119 |
|
120 |
|
121 |
|
122 | @subscribe params... for params in @_subQueue
|
123 | @push params... for params in @_pushQueue
|
124 |
|
125 | delete @_pushQueue
|
126 | delete @_subQueue
|
127 |
|
128 |
|
129 | resolve @exchange
|
130 | logTable logger, _info, 'info', 100
|
131 |
|
132 | catch ex
|
133 | logger.warn 'error exchange', ex
|
134 | reject ex
|
135 |
|
136 | getQueue: (queueName) ->
|
137 | {logger, info: _info, timestamps: _timestamps} = @
|
138 |
|
139 | logger.debug "getting queue #{ queueName.bold }"
|
140 |
|
141 | opts = {durable: true, autoDelete: false}
|
142 |
|
143 | if @queues[queueName]
|
144 | logger.debug 'got queue %s from cached', queueName.bold
|
145 | return Promise.resolve @queues[queueName]
|
146 |
|
147 | return new Promise (resolve, reject) =>
|
148 | logger.debug 'inside promise'
|
149 |
|
150 | _info[2] = [
|
151 | 'queue'
|
152 | "connecting to queue #{ queueName } with options #{ json(opts).bold }".yellow
|
153 | ]
|
154 |
|
155 |
|
156 | _timestamps.queue = Date.now()
|
157 | queue = @connection.queue queueName, opts, =>
|
158 | now = Date.now()
|
159 | duration = now - _timestamps.queue
|
160 | _timestamps.queue = now
|
161 | _info[2] = [
|
162 | 'queue'
|
163 | "connected to queue #{ queueName.bold } with options #{ json(opts).bold } in #{ String(duration).bold}ms"
|
164 | ]
|
165 |
|
166 | try
|
167 | logger.debug 'QUEUE .......... OK'.bold.green
|
168 | logger.trace 'queue %s connected!', queueName
|
169 |
|
170 | logTable logger, _info, 'info'
|
171 |
|
172 | @queues[queueName] = queue
|
173 | resolve queue
|
174 |
|
175 | catch ex
|
176 | logger.warn 'queue error', ex
|
177 | reject ex
|
178 |
|
179 | push: (message, headers, key)->
|
180 | @logger.trace 'calling push (immediate)', key
|
181 | @exchange.publish key, message, {
|
182 | headers
|
183 | deliveryMode: 2
|
184 | }
|
185 |
|
186 | subscribe: (queueName, handler)->
|
187 | logger = @logger
|
188 | @logger.trace 'calling subscribe (immediate)'
|
189 |
|
190 | _timestamps = @timestamps
|
191 | _info = @info
|
192 |
|
193 | @getQueue(queueName).then (queue)->
|
194 | logger.trace "queue.subscribe {ack: false}, (message, headers, deliveryInfo, messageObj)"
|
195 | queue.subscribe {ack: false}, (message, headers, deliveryInfo, messageObj) ->
|
196 | logger.debug 'received message', message.event
|
197 | logger.debug ' > event=%s', message.event
|
198 | logger.trace ' > ', message, 'retry=', deliveryInfo.redelivered
|
199 | handler arguments...
|
200 |
|
201 |
|
202 | bindQueue: (queueName, pattern)->
|
203 | logger = @logger
|
204 |
|
205 | logger.trace 'bindQueue queueName=%s pattern=%s', queueName.bold, pattern.bold
|
206 | Promise
|
207 | .all([@exchangeReady, @getQueue queueName])
|
208 | .then ([ex, queue]) ->
|
209 | logger.debug 'ex and queue connected'
|
210 | queue.bind ex, pattern, ->
|
211 | logger.debug 'queue [%s] bound to pattern %s', queueName, pattern
|
212 |
|
213 | unbindQueue: (queueName, pattern)->
|
214 | logger = @logger
|
215 |
|
216 | logger.trace 'bindQueue queueName=%s pattern=%s', queueName.bold, pattern.bold
|
217 | Promise
|
218 | .all([@exchangeReady, @getQueue queueName])
|
219 | .then ([ex, queue]) ->
|
220 | logger.debug 'ex and queue connected'
|
221 | queue.unbind ex, pattern, ->
|
222 | logger.debug 'queue [%s] bound to pattern %s', queueName, pattern
|