UNPKG

3.35 kBtext/coffeescriptView Raw
1
2debug = require('debug')('msgflo:direct')
3EventEmitter = require('events').EventEmitter
4
5interfaces = require './interfaces'
6routing = require './routing'
7
8brokers = {}
9
10
11class Client extends interfaces.MessagingClient
12 constructor: (@address, @options) ->
13# console.log 'client', @address
14 @broker = null
15
16 ## Broker connection management
17 connect: (callback) ->
18 debug 'client connect'
19 @broker = brokers[@address]
20 return callback null
21 disconnect: (callback) ->
22 debug 'client disconnect'
23 @broker = null
24 return callback null
25
26 _assertBroker: (callback) ->
27 err = new Error "no broker connected #{@address}" if not @broker
28 return callback err if err
29
30 ## Manipulating queues
31 createQueue: (type, queueName, options, callback) ->
32 if not callback
33 callback = options
34 options = {}
35
36# console.log 'client create queue', queueName
37 @_assertBroker callback
38 @broker.createQueue type, queueName, callback
39
40 removeQueue: (type, queueName, callback) ->
41 @_assertBroker callback
42 @broker.removeQueue type, queueName, callback
43
44 ## Sending/Receiving messages
45 sendTo: (type, queueName, message, callback) ->
46 debug 'client sendTo', type, queueName
47 @_assertBroker callback
48 @broker.sendTo type, queueName, message, callback
49
50 subscribeToQueue: (queueName, handler, callback) ->
51 @_assertBroker callback
52 @broker.subscribeToQueue queueName, handler, callback
53
54 ## ACK/NACK messages
55 ackMessage: (message) ->
56 return
57 nackMessage: (message) ->
58 return
59
60 # Participant discovery
61 registerParticipant: (part, callback) ->
62 @createQueue '', 'fbp', (err) =>
63 msg =
64 protocol: 'discovery'
65 command: 'participant'
66 payload: part
67 @sendTo 'outqueue', 'fbp', msg, callback
68
69class Queue extends EventEmitter
70 constructor: () ->
71
72 send: (msg) ->
73 @_emitSend msg
74
75 _emitSend: (msg) ->
76 @emit 'message', msg
77
78class MessageBroker extends interfaces.MessageBroker
79 constructor: (@address) ->
80 routing.binderMixin this
81 @queues = {}
82# console.log 'broker', @address
83
84 ## Broker connection management
85 connect: (callback) ->
86 debug 'broker connect'
87 brokers[@address] = this
88 return callback null
89 disconnect: (callback) ->
90 debug 'broker disconnect'
91 delete brokers[@address]
92 return callback null
93
94 ## Manipulating queues
95 createQueue: (type, queueName, callback) ->
96 @queues[queueName] = new Queue if not @queues[queueName]?
97 return callback null
98
99 removeQueue: (type, queueName, callback) ->
100 delete @queues[queueName]
101 return callback null
102
103 ## Sending/Receiving messages
104 sendTo: (type, queueName, message, callback) ->
105 debug 'broker sendTo', queueName
106 @queues[queueName].send message
107 return callback null
108
109 subscribeToQueue: (queueName, handler, callback) ->
110 @queues[queueName] = new Queue if not @queues[queueName]?
111 @queues[queueName].on 'message', (data) ->
112 out =
113 direct: null
114 data: data
115 return handler out
116 return callback null
117
118 ## ACK/NACK messages
119 ackMessage: (message) ->
120 return
121 nackMessage: (message) ->
122 return
123
124 subscribeParticipantChange: (handler) ->
125 @createQueue '', 'fbp', (err) =>
126 @subscribeToQueue 'fbp', handler, () ->
127
128exports.MessageBroker = MessageBroker
129exports.Client = Client
130