1 |
|
2 | debug = require('debug')('msgflo:mqtt')
|
3 | interfaces = require './interfaces'
|
4 | routing = require './routing'
|
5 |
|
6 | try
|
7 | mqtt = require 'mqtt'
|
8 | catch e
|
9 | mqtt = e
|
10 |
|
11 | class Client extends interfaces.MessagingClient
|
12 | constructor: (@address, @options) ->
|
13 | @client = null
|
14 | @subscribers = {}
|
15 |
|
16 |
|
17 | connect: (callback) ->
|
18 | if mqtt.message
|
19 | return callback mqtt
|
20 |
|
21 | @client = mqtt.connect @address
|
22 |
|
23 |
|
24 | @client.on 'reconnect', () =>
|
25 | debug 'reconnect'
|
26 | @client.on 'offline', () =>
|
27 | debug 'offline'
|
28 |
|
29 | @client.on 'error', (err) =>
|
30 | debug 'error', err
|
31 | if callback
|
32 | callback err
|
33 | callback = null
|
34 | return
|
35 | onConnected = (connack) =>
|
36 | debug 'connected'
|
37 | @client.on 'message', (topic, message) =>
|
38 | @_onMessage topic, message
|
39 | if callback
|
40 | callback null
|
41 | callback = null
|
42 | return
|
43 | @client.once 'connect', onConnected
|
44 |
|
45 | disconnect: (callback) ->
|
46 | @client.removeAllListeners 'message'
|
47 | @client.removeAllListeners 'connect'
|
48 | @client.removeAllListeners 'reconnect'
|
49 | @client.removeAllListeners 'offline'
|
50 | @client.removeAllListeners 'error'
|
51 | @subscribers = {}
|
52 | @client.end (err) =>
|
53 | debug 'disconnected'
|
54 | @client = null
|
55 | return callback err
|
56 |
|
57 |
|
58 | createQueue: (type, queueName, options, callback) ->
|
59 | if not callback
|
60 | callback = options
|
61 | options = {}
|
62 |
|
63 |
|
64 | return callback null
|
65 |
|
66 | removeQueue: (type, queueName, callback) ->
|
67 |
|
68 | return callback null
|
69 |
|
70 |
|
71 | sendTo: (type, queueName, message, callback) ->
|
72 | published = (err, granted) =>
|
73 | debug 'published', queueName, err, granted
|
74 | return callback err if err
|
75 | return callback null
|
76 | data = JSON.stringify message
|
77 | debug 'publishing', queueName, data
|
78 | @client.publish queueName, data, published
|
79 |
|
80 | subscribeToQueue: (queueName, handler, callback) ->
|
81 | debug 'subscribing', queueName
|
82 | @client.subscribe queueName, (err) =>
|
83 | debug 'subscribed', queueName, err
|
84 | return callback err if err
|
85 | subs = @subscribers[queueName]
|
86 | if subs then subs.push handler else @subscribers[queueName] = [ handler ]
|
87 | return callback null
|
88 |
|
89 |
|
90 | ackMessage: (message) ->
|
91 | return
|
92 | nackMessage: (message) ->
|
93 | return
|
94 |
|
95 | _onMessage: (topic, message) ->
|
96 | return if not @client
|
97 | return if not Object.keys(@subscribers).length > 0
|
98 |
|
99 | msg = null
|
100 | try
|
101 | msg = JSON.parse message.toString()
|
102 | catch e
|
103 | debug 'failed to parse discovery message', e
|
104 | msg = message.toString()
|
105 | handlers = @subscribers[topic]
|
106 |
|
107 | debug 'message', handlers.length, msg != null
|
108 | return if not msg or not handlers
|
109 | out =
|
110 | data: msg
|
111 | mqtt: message
|
112 | for handler in handlers
|
113 | handler out
|
114 |
|
115 | registerParticipant: (part, callback) ->
|
116 | msg =
|
117 | protocol: 'discovery'
|
118 | command: 'participant'
|
119 | payload: part
|
120 | @sendTo 'inqueue', 'fbp', msg, callback
|
121 |
|
122 | class MessageBroker extends Client
|
123 | constructor: (address, options) ->
|
124 | super address, options
|
125 | routing.binderMixin this
|
126 |
|
127 |
|
128 | subscribeParticipantChange: (handler) ->
|
129 | @createQueue '', 'fbp', (err) =>
|
130 | @subscribeToQueue 'fbp', handler, () ->
|
131 |
|
132 | exports.Client = Client
|
133 | exports.MessageBroker = MessageBroker
|