1 |
|
2 | debug = require('debug')('msgflo:mqtt')
|
3 | interfaces = require './interfaces'
|
4 | routing = require './routing'
|
5 |
|
6 | try
|
7 | mqtt = require 'mqtt'
|
8 | catch e
|
9 | mqtt = e
|
10 |
|
11 | class Client extends interfaces.MessagingClient
|
12 | constructor: (@address, @options) ->
|
13 | @client = null
|
14 | @subscribers = {}
|
15 |
|
16 |
|
17 | connect: (callback) ->
|
18 | if mqtt.message
|
19 | return callback mqtt
|
20 |
|
21 | @client = mqtt.connect @address
|
22 | @client.on 'error', (err) =>
|
23 | debug 'error', err
|
24 | @client.on 'reconnect', () =>
|
25 | debug 'reconnect'
|
26 | @client.on 'offline', () =>
|
27 | debug 'offline'
|
28 | onConnected = (err) =>
|
29 | debug 'connected'
|
30 | @client.on 'message', (topic, message) =>
|
31 | @_onMessage topic, message
|
32 | return callback err
|
33 | @client.once 'connect', onConnected
|
34 |
|
35 | disconnect: (callback) ->
|
36 | @client.removeAllListeners 'message'
|
37 | @client.removeAllListeners 'connect'
|
38 | @client.removeAllListeners 'reconnect'
|
39 | @client.removeAllListeners 'offline'
|
40 | @client.removeAllListeners 'error'
|
41 | @subscribers = {}
|
42 | @client.end (err) =>
|
43 | debug 'disconnected'
|
44 | @client = null
|
45 | return callback err
|
46 |
|
47 |
|
48 | createQueue: (type, queueName, options, callback) ->
|
49 | if not callback
|
50 | callback = options
|
51 | options = {}
|
52 |
|
53 |
|
54 | return callback null
|
55 |
|
56 | removeQueue: (type, queueName, callback) ->
|
57 |
|
58 | return callback null
|
59 |
|
60 |
|
61 | sendTo: (type, queueName, message, callback) ->
|
62 | published = (err, granted) =>
|
63 | debug 'published', queueName, err, granted
|
64 | return callback err if err
|
65 | return callback null
|
66 | data = JSON.stringify message
|
67 | debug 'publishing', queueName, data
|
68 | @client.publish queueName, data, published
|
69 |
|
70 | subscribeToQueue: (queueName, handler, callback) ->
|
71 | debug 'subscribing', queueName
|
72 | @client.subscribe queueName, (err) =>
|
73 | debug 'subscribed', queueName, err
|
74 | return callback err if err
|
75 | subs = @subscribers[queueName]
|
76 | if subs then subs.push handler else @subscribers[queueName] = [ handler ]
|
77 | return callback null
|
78 |
|
79 |
|
80 | ackMessage: (message) ->
|
81 | return
|
82 | nackMessage: (message) ->
|
83 | return
|
84 |
|
85 | _onMessage: (topic, message) ->
|
86 | return if not @client
|
87 | return if not Object.keys(@subscribers).length > 0
|
88 |
|
89 | msg = null
|
90 | try
|
91 | msg = JSON.parse message.toString()
|
92 | catch e
|
93 | msg = message.toString()
|
94 | handlers = @subscribers[topic]
|
95 |
|
96 | debug 'message', handlers.length, msg != null
|
97 | return if not msg or not handlers
|
98 | out =
|
99 | data: msg
|
100 | mqtt: message
|
101 | for handler in handlers
|
102 | handler out
|
103 |
|
104 | registerParticipant: (part, callback) ->
|
105 | msg =
|
106 | protocol: 'discovery'
|
107 | command: 'participant'
|
108 | payload: part
|
109 | @sendTo 'inqueue', 'fbp', msg, callback
|
110 |
|
111 | class MessageBroker extends Client
|
112 | constructor: (address, options) ->
|
113 | super address, options
|
114 | routing.binderMixin this
|
115 |
|
116 |
|
117 | subscribeParticipantChange: (handler) ->
|
118 | @createQueue '', 'fbp', (err) =>
|
119 | @subscribeToQueue 'fbp', handler, () ->
|
120 |
|
121 | exports.Client = Client
|
122 | exports.MessageBroker = MessageBroker
|