1 | var Client = module.exports
|
2 |
|
3 | const Promise = require("bluebird")
|
4 | const d = require("debug")("raptorjs:client:mqtt")
|
5 | const EventEmitter = require("eventemitter3")
|
6 |
|
7 | Client.create = function (container) {
|
8 |
|
9 | var emit = function() {
|
10 | container.emit.apply(container, arguments)
|
11 | }
|
12 |
|
13 | var opts = container.getConfig()
|
14 |
|
15 | var client
|
16 | var instance = {}
|
17 | var emitter = new EventEmitter({})
|
18 |
|
19 | var parseJSON = function (msg) {
|
20 | try {
|
21 | return JSON.parse(msg)
|
22 | } catch(e) {
|
23 |
|
24 | }
|
25 | return msg
|
26 | }
|
27 |
|
28 | instance.emitter = emitter
|
29 |
|
30 | var mqttConnect = null
|
31 |
|
32 | instance.connect = function () {
|
33 |
|
34 | if(!mqttConnect) {
|
35 |
|
36 | mqttConnect = function (/*currentUser*/) {
|
37 | return new Promise(function (resolve, reject) {
|
38 | const opts = container.getConfig()
|
39 |
|
40 | let mqttUrl = opts.mqttUrl
|
41 | if (!mqttUrl) {
|
42 |
|
43 |
|
44 |
|
45 | var url = require("url").parse(opts.url)
|
46 | const isBrowser = this.window === this
|
47 | const isSecure = url && url.protocol !== "http:"
|
48 |
|
49 | mqttUrl = (isBrowser ? "ws" : "mqtt") + (!isSecure ? "" : "s" )
|
50 | + "://"
|
51 | + url.hostname
|
52 | + ":" + ( isBrowser ? (url.port ? url.port : "") + "/ws" : (isSecure ? "8883" : "1883"))
|
53 | }
|
54 | var withCred = "credentials"
|
55 | var username = opts.username
|
56 | var password = opts.password
|
57 |
|
58 | if (opts.token) {
|
59 | withCred = "token"
|
60 | username = "*"
|
61 | password = opts.token
|
62 | }
|
63 |
|
64 | d("Connecting to %s with %s", mqttUrl, withCred)
|
65 |
|
66 |
|
67 | var mqtt = require("mqtt/lib/connect/index.js")
|
68 | client = mqtt.connect(mqttUrl, {
|
69 | protocolId: "MQTT",
|
70 | protocolVersion: 4,
|
71 | username: username,
|
72 | password: password,
|
73 | clientId: 'raptorbox_mqttjs_ans_asdfghqwerty_' + Math.random().toString(16).substr(2, 8)
|
74 | })
|
75 | instance.client = client
|
76 |
|
77 | var onError = function (e) {
|
78 | d("Connection error", e)
|
79 |
|
80 | emit("error", e)
|
81 |
|
82 | client.removeListener("connect", onConnect)
|
83 | client.removeListener("error", onError)
|
84 | mqttConnect = null
|
85 |
|
86 | reject(e)
|
87 | }
|
88 | var onConnect = function () {
|
89 | d("Connected")
|
90 |
|
91 | emit("connected")
|
92 |
|
93 | client.removeListener("connect", onConnect)
|
94 | client.removeListener("error", onError)
|
95 |
|
96 | resolve()
|
97 | }
|
98 |
|
99 | client.on("connect", onConnect)
|
100 | client.on("error", onError)
|
101 |
|
102 | client.on("error", function (e) {
|
103 | emitter.emit("error", e)
|
104 | })
|
105 |
|
106 | client.on("message", function (topic, message) {
|
107 |
|
108 | var msg = parseJSON(message.toString())
|
109 |
|
110 | d("Received message: %j", msg)
|
111 |
|
112 | emit("message", {
|
113 | topic: topic,
|
114 | message: msg
|
115 | })
|
116 |
|
117 | emitter.emit("message", msg)
|
118 | emitter.emit(topic, msg)
|
119 | })
|
120 |
|
121 | })
|
122 | }
|
123 | return container.Auth().login().then(mqttConnect)
|
124 | }
|
125 | return Promise.resolve(mqttConnect)
|
126 | }
|
127 |
|
128 | instance.publish = function (topic, msg, opts) {
|
129 | return instance.connect().then(function () {
|
130 |
|
131 | if (typeof topic !== "string") {
|
132 | return Promise.reject(new Error("Topic must be a string"))
|
133 | }
|
134 |
|
135 | d("Publishing to %s", topic)
|
136 |
|
137 | opts = opts || {}
|
138 | if (typeof msg !== "string" && (Buffer && !(msg instanceof Buffer))) {
|
139 | msg = JSON.stringify(msg)
|
140 | }
|
141 |
|
142 | return new Promise(function(resolve, reject) {
|
143 | client.publish(topic, msg, opts, function (err) {
|
144 | if(err) {
|
145 | d("Publish failed on %s", topic)
|
146 | return reject(err)
|
147 | }
|
148 | emit("published", topic)
|
149 | d("Published")
|
150 | resolve(emitter)
|
151 | })
|
152 | })
|
153 | })
|
154 | }
|
155 | instance.subscribe = function (topic, fn) {
|
156 | return instance.connect().then(function () {
|
157 | d("Subscribing to %s", topic)
|
158 | client.subscribe(topic, function () {
|
159 | emit("subscribed", topic)
|
160 | d("Subscribed")
|
161 | })
|
162 | if(fn) emitter.on(topic, fn)
|
163 | return Promise.resolve(emitter)
|
164 | })
|
165 | }
|
166 | instance.unsubscribe = function (topic, fn) {
|
167 | return instance.connect().then(function () {
|
168 | d("Unsubscribing from %s", topic)
|
169 | client.unsubscribe(topic, function () {
|
170 | emit("unsubscribed", topic)
|
171 | d("Unsubscribed")
|
172 | })
|
173 | emitter.off(topic, fn || null)
|
174 | return Promise.resolve(emitter)
|
175 | })
|
176 | }
|
177 | instance.disconnect = function (force) {
|
178 | if(!instance.client) {
|
179 | return Promise.resolve()
|
180 | }
|
181 | return new Promise(function(resolve, reject) {
|
182 | instance.client.end(force || false, function(err) {
|
183 | if(err) return reject(err)
|
184 | instance.client = null
|
185 | resolve()
|
186 | })
|
187 | })
|
188 | }
|
189 |
|
190 | return instance
|
191 | }
|