UNPKG

6.41 kBtext/coffeescriptView Raw
1_ = require 'lodash'
2Table = require 'cli-table'
3{EventEmitter} = require 'events'
4rabbit = require 'amqp'
5log4js = require 'log4js'
6
7json = (obj) -> JSON.stringify obj, null, ''
8
9logTable = (logger, table, level='trace', delay=false)->
10 fn = ->
11 return if table._logged and delay
12 table._logged = true
13 logger[level] '\n%s', table.toString()
14
15 if delay
16 setTimeout fn, delay
17 else
18 fn()
19
20module.exports = class Exchange
21
22 constructor: (@exchangeName, amqpConfig, opts={}) ->
23 logger = @logger = log4js.getLogger @exchangeName
24 logLevel = opts.logLevel or 'trace'
25
26 @logger.setLevel logLevel
27
28
29 @exchangeName.should.be.a 'string', 'argument exchangeName is required'
30 @exchangeName.length.should.not.equal 0, 'argument exchangeName may not be empty'
31 logger.debug 'connecting to amqp', amqpConfig
32
33 connection = @connection = rabbit.createConnection amqpConfig
34
35 @_pushQueue = []
36 @_subQueue = []
37
38
39 _info = @info = new Table {
40 head: ['', 'RabbitMQ Connection Info']
41 style: compact: true
42 }
43
44 _info.push(
45 ["connection", "connecting to #{ json(amqpConfig).bold }".yellow]
46 ["exchange", "pending".grey]
47 ["queue", "pending".grey]
48 ["push", "not ready".red]
49 ["subscribe", "not ready".red]
50 )
51
52 _timestamps = @timestamps =
53 init: Date.now()
54 connection: Date.now()
55
56
57 ## cache
58 @queues = {}
59
60 ## default push (before exchange being connected)
61 ## only push message to queue and wait for exchange to be connected
62 @push = ->
63 logger.debug 'calling push (queued)'
64 @_pushQueue.push arguments
65
66 @subscribe = ->
67 logger.debug 'calling subscribe (queued)'
68 @_subQueue.push arguments
69
70
71 ## START CONNECTING
72 ## ----------------
73
74 logger.debug 'connecting to exchange', @exchangeName
75 logTable logger, _info, 'fatal', 2000
76
77 @exchangeReady = new Promise (resolve, reject)=>
78 connection.once 'ready', =>
79 now = Date.now()
80 duration = now - _timestamps.connection
81 _timestamps.connection = now
82
83 logger.debug 'CONNECTION ..... OK\t'.bold.green
84 logger.trace 'connection ready. Connecting to exchange...'
85
86 _info[0] = [
87 "connection"
88 "connected to #{ json(amqpConfig).bold } in #{ String(duration).bold }ms"
89 ]
90
91 _info[1] = [
92 "exchange"
93 "connecting to #{ @exchangeName.bold }".yellow
94 ]
95
96 _timestamps.exchange = Date.now()
97 logTable logger, _info, 'fatal', 200
98
99 @exchange = connection.exchange @exchangeName, {}, (err) =>
100 now = Date.now()
101 duration = now - _timestamps.exchange
102 _timestamps.exchange = now
103
104 _info[1] = [
105 "exchange"
106 "connected to #{ @exchangeName.bold } in #{ String(duration).bold}ms"
107 ]
108
109 try
110 logger.debug 'EXCHANGE ....... OK\t'.bold.green
111 logger.trace 'exchange [%s] connected!', @exchangeName
112
113 delete @push
114 delete @subscribe
115
116 duration = String now - _timestamps.init
117 _info[3] = ['push', "ready in #{ duration.bold }ms, queued: #{@_pushQueue.length}"]
118 _info[4] = ['subscribe', "ready in #{ duration.bold }ms, queued: #{@_subQueue.length}"]
119
120
121 ## push queued messages
122 @subscribe params... for params in @_subQueue
123 @push params... for params in @_pushQueue
124
125 delete @_pushQueue
126 delete @_subQueue
127
128
129 resolve @exchange
130 logTable logger, _info, 'info', 100
131
132 catch ex
133 logger.warn 'error exchange', ex
134 reject ex
135
136 getQueue: (queueName) ->
137 {logger, info: _info, timestamps: _timestamps} = @
138
139 logger.debug "getting queue #{ queueName.bold }"
140
141 opts = {durable: true, autoDelete: false}
142
143 if @queues[queueName]
144 logger.debug 'got queue %s from cached', queueName.bold
145 return Promise.resolve @queues[queueName]
146
147 return new Promise (resolve, reject) =>
148 logger.debug 'inside promise'
149
150 _info[2] = [
151 'queue'
152 "connecting to queue #{ queueName } with options #{ json(opts).bold }".yellow
153 ]
154
155
156 _timestamps.queue = Date.now()
157 queue = @connection.queue queueName, opts, =>
158 now = Date.now()
159 duration = now - _timestamps.queue
160 _timestamps.queue = now
161 _info[2] = [
162 'queue'
163 "connected to queue #{ queueName.bold } with options #{ json(opts).bold } in #{ String(duration).bold}ms"
164 ]
165
166 try
167 logger.debug 'QUEUE .......... OK'.bold.green
168 logger.trace 'queue %s connected!', queueName
169
170 logTable logger, _info, 'info'
171
172 @queues[queueName] = queue
173 resolve queue
174
175 catch ex
176 logger.warn 'queue error', ex
177 reject ex
178
179 push: (message, headers, key)->
180 @logger.trace 'calling push (immediate)', key
181 @exchange.publish key, message, {
182 headers
183 deliveryMode: 2
184 }
185
186 subscribe: (queueName, handler)->
187 logger = @logger
188 @logger.trace 'calling subscribe (immediate)'
189
190 _timestamps = @timestamps
191 _info = @info
192
193 @getQueue(queueName).then (queue)->
194 logger.trace "queue.subscribe {ack: false}, (message, headers, deliveryInfo, messageObj)"
195 queue.subscribe {ack: false}, (message, headers, deliveryInfo, messageObj) ->
196 logger.debug 'received message', message.event
197 logger.debug ' > event=%s', message.event
198 logger.trace ' > ', message, 'retry=', deliveryInfo.redelivered
199 handler arguments...
200
201
202 bindQueue: (queueName, pattern)->
203 logger = @logger
204
205 logger.trace 'bindQueue queueName=%s pattern=%s', queueName.bold, pattern.bold
206 Promise
207 .all([@exchangeReady, @getQueue queueName])
208 .then ([ex, queue]) ->
209 logger.debug 'ex and queue connected'
210 queue.bind ex, pattern, ->
211 logger.debug 'queue [%s] bound to pattern %s', queueName, pattern
212
213 unbindQueue: (queueName, pattern)->
214 logger = @logger
215
216 logger.trace 'bindQueue queueName=%s pattern=%s', queueName.bold, pattern.bold
217 Promise
218 .all([@exchangeReady, @getQueue queueName])
219 .then ([ex, queue]) ->
220 logger.debug 'ex and queue connected'
221 queue.unbind ex, pattern, ->
222 logger.debug 'queue [%s] bound to pattern %s', queueName, pattern