UNPKG

6.31 kBJavaScriptView Raw
1var Client = module.exports
2
3const Promise = require("bluebird")
4const d = require("debug")("raptorjs:client:mqtt")
5const EventEmitter = require("eventemitter3")
6
7Client.create = function (container) {
8
9 var emit = function() {
10 container.emit.apply(container, arguments)
11 }
12
13 var opts = container.getConfig()
14
15 var client
16 var instance = {}
17 var emitter = new EventEmitter({})
18
19 var parseJSON = function (msg) {
20 try {
21 return JSON.parse(msg)
22 } catch(e) {
23 //foo
24 }
25 return msg
26 }
27
28 instance.emitter = emitter
29
30 var mqttConnect = null
31
32 instance.connect = function () {
33
34 if(!mqttConnect) {
35
36 mqttConnect = function (/*currentUser*/) {
37 return new Promise(function (resolve, reject) {
38 const opts = container.getConfig()
39
40 let mqttUrl = opts.mqttUrl
41 if (!mqttUrl) {
42 // var url = require("url").parse(opts.url)
43 // mqttUrl = "mqtt" + (url.protocol === "http:" ? "" : "s" ) + "://"
44 // + url.hostname + ":1883"
45 var url = require("url").parse(opts.url)
46 const isBrowser = this.window === this
47 const isSecure = url && url.protocol !== "http:"
48 // mqttUrl = "wss://v5.raptorbox.eu:8883/"
49 mqttUrl = (isBrowser ? "ws" : "mqtt") + (!isSecure ? "" : "s" )
50 + "://"
51 + url.hostname
52 + ":" + ( isBrowser ? (url.port ? url.port : "") + "/ws" : (isSecure ? "8883" : "1883"))
53 }
54 var withCred = "credentials"
55 var username = opts.username
56 var password = opts.password
57
58 if (opts.token) {
59 withCred = "token"
60 username = "*"
61 password = opts.token
62 }
63
64 d("Connecting to %s with %s", mqttUrl, withCred)
65
66 // var mqtt = require("mqtt")
67 var mqtt = require("mqtt/lib/connect/index.js")
68 client = mqtt.connect(mqttUrl, {
69 protocolId: "MQTT",
70 protocolVersion: 4,
71 username: username,
72 password: password,
73 clientId: 'raptorbox_mqttjs_ans_asdfghqwerty_' + Math.random().toString(16).substr(2, 8)
74 })
75 instance.client = client
76
77 var onError = function (e) {
78 d("Connection error", e)
79
80 emit("error", e)
81
82 client.removeListener("connect", onConnect)
83 client.removeListener("error", onError)
84 mqttConnect = null
85
86 reject(e)
87 }
88 var onConnect = function () {
89 d("Connected")
90
91 emit("connected")
92
93 client.removeListener("connect", onConnect)
94 client.removeListener("error", onError)
95
96 resolve()
97 }
98
99 client.on("connect", onConnect)
100 client.on("error", onError)
101
102 client.on("error", function (e) {
103 emitter.emit("error", e)
104 })
105
106 client.on("message", function (topic, message) {
107
108 var msg = parseJSON(message.toString())
109
110 d("Received message: %j", msg)
111
112 emit("message", {
113 topic: topic,
114 message: msg
115 })
116
117 emitter.emit("message", msg)
118 emitter.emit(topic, msg)
119 })
120
121 })
122 }
123 return container.Auth().login().then(mqttConnect)
124 }
125 return Promise.resolve(mqttConnect)
126 }
127
128 instance.publish = function (topic, msg, opts) {
129 return instance.connect().then(function () {
130
131 if (typeof topic !== "string") {
132 return Promise.reject(new Error("Topic must be a string"))
133 }
134
135 d("Publishing to %s", topic)
136
137 opts = opts || {}
138 if (typeof msg !== "string" && (Buffer && !(msg instanceof Buffer))) {
139 msg = JSON.stringify(msg)
140 }
141
142 return new Promise(function(resolve, reject) {
143 client.publish(topic, msg, opts, function (err) {
144 if(err) {
145 d("Publish failed on %s", topic)
146 return reject(err)
147 }
148 emit("published", topic)
149 d("Published")
150 resolve(emitter)
151 })
152 })
153 })
154 }
155 instance.subscribe = function (topic, fn) {
156 return instance.connect().then(function () {
157 d("Subscribing to %s", topic)
158 client.subscribe(topic, function () {
159 emit("subscribed", topic)
160 d("Subscribed")
161 })
162 if(fn) emitter.on(topic, fn)
163 return Promise.resolve(emitter)
164 })
165 }
166 instance.unsubscribe = function (topic, fn) {
167 return instance.connect().then(function () {
168 d("Unsubscribing from %s", topic)
169 client.unsubscribe(topic, function () {
170 emit("unsubscribed", topic)
171 d("Unsubscribed")
172 })
173 emitter.off(topic, fn || null)
174 return Promise.resolve(emitter)
175 })
176 }
177 instance.disconnect = function (force) {
178 if(!instance.client) {
179 return Promise.resolve()
180 }
181 return new Promise(function(resolve, reject) {
182 instance.client.end(force || false, function(err) {
183 if(err) return reject(err)
184 instance.client = null
185 resolve()
186 })
187 })
188 }
189
190 return instance
191}