UNPKG

3.32 kBtext/coffeescriptView Raw
1
2debug = require('debug')('msgflo:mqtt')
3interfaces = require './interfaces'
4routing = require './routing'
5
6try
7 mqtt = require 'mqtt'
8catch e
9 mqtt = e
10
11class Client extends interfaces.MessagingClient
12 constructor: (@address, @options) ->
13 @client = null
14 @subscribers = {} # queueName -> [handler1, ...]
15
16 ## Broker connection management
17 connect: (callback) ->
18 if mqtt.message
19 return callback mqtt
20
21 @client = mqtt.connect @address
22 @client.on 'error', (err) =>
23 debug 'error', err
24 @client.on 'reconnect', () =>
25 debug 'reconnect'
26 @client.on 'offline', () =>
27 debug 'offline'
28 onConnected = (err) =>
29 debug 'connected'
30 @client.on 'message', (topic, message) =>
31 @_onMessage topic, message
32 return callback err
33 @client.once 'connect', onConnected
34
35 disconnect: (callback) ->
36 @client.removeAllListeners 'message'
37 @client.removeAllListeners 'connect'
38 @client.removeAllListeners 'reconnect'
39 @client.removeAllListeners 'offline'
40 @client.removeAllListeners 'error'
41 @subscribers = {}
42 @client.end (err) =>
43 debug 'disconnected'
44 @client = null
45 return callback err
46
47 ## Manipulating queues
48 createQueue: (type, queueName, options, callback) ->
49 if not callback
50 callback = options
51 options = {}
52
53 # Noop, in MQTT one can send messages on 'topics' at any time
54 return callback null
55
56 removeQueue: (type, queueName, callback) ->
57 # Noop, in MQTT one can send messages on 'topics' at any time
58 return callback null
59
60 ## Sending/Receiving messages
61 sendTo: (type, queueName, message, callback) ->
62 published = (err, granted) =>
63 debug 'published', queueName, err, granted
64 return callback err if err
65 return callback null
66 data = JSON.stringify message
67 debug 'publishing', queueName, data
68 @client.publish queueName, data, published
69
70 subscribeToQueue: (queueName, handler, callback) ->
71 debug 'subscribing', queueName
72 @client.subscribe queueName, (err) =>
73 debug 'subscribed', queueName, err
74 return callback err if err
75 subs = @subscribers[queueName]
76 if subs then subs.push handler else @subscribers[queueName] = [ handler ]
77 return callback null
78
79 ## ACK/NACK messages
80 ackMessage: (message) ->
81 return
82 nackMessage: (message) ->
83 return
84
85 _onMessage: (topic, message) ->
86 return if not @client
87 return if not Object.keys(@subscribers).length > 0
88
89 msg = null
90 try
91 msg = JSON.parse message.toString()
92 catch e
93 msg = message.toString()
94 handlers = @subscribers[topic]
95
96 debug 'message', handlers.length, msg != null
97 return if not msg or not handlers
98 out =
99 data: msg
100 mqtt: message
101 for handler in handlers
102 handler out
103
104 registerParticipant: (part, callback) ->
105 msg =
106 protocol: 'discovery'
107 command: 'participant'
108 payload: part
109 @sendTo 'inqueue', 'fbp', msg, callback
110
111class MessageBroker extends Client
112 constructor: (address, options) ->
113 super address, options
114 routing.binderMixin this
115
116 # Participant registration
117 subscribeParticipantChange: (handler) ->
118 @createQueue '', 'fbp', (err) =>
119 @subscribeToQueue 'fbp', handler, () ->
120
121exports.Client = Client
122exports.MessageBroker = MessageBroker