UNPKG

3.52 kBtext/coffeescriptView Raw
1
2debug = require('debug')('msgflo:mqtt')
3interfaces = require './interfaces'
4routing = require './routing'
5
6try
7 mqtt = require 'mqtt'
8catch e
9 mqtt = e
10
11class Client extends interfaces.MessagingClient
12 constructor: (@address, @options) ->
13 @client = null
14 @subscribers = {} # queueName -> [handler1, ...]
15
16 ## Broker connection management
17 connect: (callback) ->
18 if mqtt.message
19 return callback mqtt
20
21 @client = mqtt.connect @address
22
23 # debug
24 @client.on 'reconnect', () =>
25 debug 'reconnect'
26 @client.on 'offline', () =>
27 debug 'offline'
28
29 @client.on 'error', (err) =>
30 debug 'error', err
31 if callback
32 callback err
33 callback = null
34 return
35 onConnected = (connack) =>
36 debug 'connected'
37 @client.on 'message', (topic, message) =>
38 @_onMessage topic, message
39 if callback
40 callback null
41 callback = null
42 return
43 @client.once 'connect', onConnected
44
45 disconnect: (callback) ->
46 @client.removeAllListeners 'message'
47 @client.removeAllListeners 'connect'
48 @client.removeAllListeners 'reconnect'
49 @client.removeAllListeners 'offline'
50 @client.removeAllListeners 'error'
51 @subscribers = {}
52 @client.end (err) =>
53 debug 'disconnected'
54 @client = null
55 return callback err
56
57 ## Manipulating queues
58 createQueue: (type, queueName, options, callback) ->
59 if not callback
60 callback = options
61 options = {}
62
63 # Noop, in MQTT one can send messages on 'topics' at any time
64 return callback null
65
66 removeQueue: (type, queueName, callback) ->
67 # Noop, in MQTT one can send messages on 'topics' at any time
68 return callback null
69
70 ## Sending/Receiving messages
71 sendTo: (type, queueName, message, callback) ->
72 published = (err, granted) =>
73 debug 'published', queueName, err, granted
74 return callback err if err
75 return callback null
76 data = JSON.stringify message
77 debug 'publishing', queueName, data
78 @client.publish queueName, data, published
79
80 subscribeToQueue: (queueName, handler, callback) ->
81 debug 'subscribing', queueName
82 @client.subscribe queueName, (err) =>
83 debug 'subscribed', queueName, err
84 return callback err if err
85 subs = @subscribers[queueName]
86 if subs then subs.push handler else @subscribers[queueName] = [ handler ]
87 return callback null
88
89 ## ACK/NACK messages
90 ackMessage: (message) ->
91 return
92 nackMessage: (message) ->
93 return
94
95 _onMessage: (topic, message) ->
96 return if not @client
97 return if not Object.keys(@subscribers).length > 0
98
99 msg = null
100 try
101 msg = JSON.parse message.toString()
102 catch e
103 debug 'failed to parse discovery message', e
104 msg = message.toString()
105 handlers = @subscribers[topic]
106
107 debug 'message', handlers.length, msg != null
108 return if not msg or not handlers
109 out =
110 data: msg
111 mqtt: message
112 for handler in handlers
113 handler out
114
115 registerParticipant: (part, callback) ->
116 msg =
117 protocol: 'discovery'
118 command: 'participant'
119 payload: part
120 @sendTo 'inqueue', 'fbp', msg, callback
121
122class MessageBroker extends Client
123 constructor: (address, options) ->
124 super address, options
125 routing.binderMixin this
126
127 # Participant registration
128 subscribeParticipantChange: (handler) ->
129 @createQueue '', 'fbp', (err) =>
130 @subscribeToQueue 'fbp', handler, () ->
131
132exports.Client = Client
133exports.MessageBroker = MessageBroker