1 |
|
2 | debug = require('debug')('msgflo:direct')
|
3 | EventEmitter = require('events').EventEmitter
|
4 |
|
5 | interfaces = require './interfaces'
|
6 | routing = require './routing'
|
7 |
|
8 | brokers = {}
|
9 |
|
10 |
|
11 | class Client extends interfaces.MessagingClient
|
12 | constructor: (@address, @options) ->
|
13 |
|
14 | @broker = null
|
15 |
|
16 |
|
17 | connect: (callback) ->
|
18 | debug 'client connect'
|
19 | @broker = brokers[@address]
|
20 | return callback null
|
21 | disconnect: (callback) ->
|
22 | debug 'client disconnect'
|
23 | @broker = null
|
24 | return callback null
|
25 |
|
26 | _assertBroker: (callback) ->
|
27 | err = new Error "no broker connected #{@address}" if not @broker
|
28 | return callback err if err
|
29 |
|
30 |
|
31 | createQueue: (type, queueName, options, callback) ->
|
32 | if not callback
|
33 | callback = options
|
34 | options = {}
|
35 |
|
36 |
|
37 | @_assertBroker callback
|
38 | @broker.createQueue type, queueName, callback
|
39 |
|
40 | removeQueue: (type, queueName, callback) ->
|
41 | @_assertBroker callback
|
42 | @broker.removeQueue type, queueName, callback
|
43 |
|
44 |
|
45 | sendTo: (type, queueName, message, callback) ->
|
46 | debug 'client sendTo', type, queueName
|
47 | @_assertBroker callback
|
48 | @broker.sendTo type, queueName, message, callback
|
49 |
|
50 | subscribeToQueue: (queueName, handler, callback) ->
|
51 | @_assertBroker callback
|
52 | @broker.subscribeToQueue queueName, handler, callback
|
53 |
|
54 |
|
55 | ackMessage: (message) ->
|
56 | return
|
57 | nackMessage: (message) ->
|
58 | return
|
59 |
|
60 |
|
61 | registerParticipant: (part, callback) ->
|
62 | @createQueue '', 'fbp', (err) =>
|
63 | msg =
|
64 | protocol: 'discovery'
|
65 | command: 'participant'
|
66 | payload: part
|
67 | @sendTo 'outqueue', 'fbp', msg, callback
|
68 |
|
69 | class Queue extends EventEmitter
|
70 | constructor: () ->
|
71 |
|
72 | send: (msg) ->
|
73 | @_emitSend msg
|
74 |
|
75 | _emitSend: (msg) ->
|
76 | @emit 'message', msg
|
77 |
|
78 | class MessageBroker extends interfaces.MessageBroker
|
79 | constructor: (@address) ->
|
80 | routing.binderMixin this
|
81 | @queues = {}
|
82 |
|
83 |
|
84 |
|
85 | connect: (callback) ->
|
86 | debug 'broker connect'
|
87 | brokers[@address] = this
|
88 | return callback null
|
89 | disconnect: (callback) ->
|
90 | debug 'broker disconnect'
|
91 | delete brokers[@address]
|
92 | return callback null
|
93 |
|
94 |
|
95 | createQueue: (type, queueName, callback) ->
|
96 | @queues[queueName] = new Queue if not @queues[queueName]?
|
97 | return callback null
|
98 |
|
99 | removeQueue: (type, queueName, callback) ->
|
100 | delete @queues[queueName]
|
101 | return callback null
|
102 |
|
103 |
|
104 | sendTo: (type, queueName, message, callback) ->
|
105 | debug 'broker sendTo', queueName
|
106 | @queues[queueName].send message
|
107 | return callback null
|
108 |
|
109 | subscribeToQueue: (queueName, handler, callback) ->
|
110 | @queues[queueName] = new Queue if not @queues[queueName]?
|
111 | @queues[queueName].on 'message', (data) ->
|
112 | out =
|
113 | direct: null
|
114 | data: data
|
115 | return handler out
|
116 | return callback null
|
117 |
|
118 |
|
119 | ackMessage: (message) ->
|
120 | return
|
121 | nackMessage: (message) ->
|
122 | return
|
123 |
|
124 | subscribeParticipantChange: (handler) ->
|
125 | @createQueue '', 'fbp', (err) =>
|
126 | @subscribeToQueue 'fbp', handler, () ->
|
127 |
|
128 | exports.MessageBroker = MessageBroker
|
129 | exports.Client = Client
|
130 |
|