UNPKG

2.89 kBJavaScriptView Raw
1'use strict'
2
3var handleClient
4var websocket = require('websocket-stream')
5var WebSocketServer = require('ws').Server
6var Connection = require('mqtt-connection')
7var http = require('http')
8
9handleClient = function (client) {
10 var self = this
11
12 if (!self.clients) {
13 self.clients = {}
14 }
15
16 client.on('connect', function (packet) {
17 if (packet.clientId === 'invalid') {
18 client.connack({returnCode: 2})
19 } else {
20 client.connack({returnCode: 0})
21 }
22 self.clients[packet.clientId] = client
23 client.subscriptions = []
24 })
25
26 client.on('publish', function (packet) {
27 var i, k, c, s, publish
28 switch (packet.qos) {
29 case 0:
30 break
31 case 1:
32 client.puback(packet)
33 break
34 case 2:
35 client.pubrec(packet)
36 break
37 }
38
39 for (k in self.clients) {
40 c = self.clients[k]
41 publish = false
42
43 for (i = 0; i < c.subscriptions.length; i++) {
44 s = c.subscriptions[i]
45
46 if (s.test(packet.topic)) {
47 publish = true
48 }
49 }
50
51 if (publish) {
52 try {
53 c.publish({topic: packet.topic, payload: packet.payload})
54 } catch (error) {
55 delete self.clients[k]
56 }
57 }
58 }
59 })
60
61 client.on('pubrel', function (packet) {
62 client.pubcomp(packet)
63 })
64
65 client.on('pubrec', function (packet) {
66 client.pubrel(packet)
67 })
68
69 client.on('pubcomp', function () {
70 // Nothing to be done
71 })
72
73 client.on('subscribe', function (packet) {
74 var qos
75 var topic
76 var reg
77 var granted = []
78
79 for (var i = 0; i < packet.subscriptions.length; i++) {
80 qos = packet.subscriptions[i].qos
81 topic = packet.subscriptions[i].topic
82 reg = new RegExp(topic.replace('+', '[^/]+').replace('#', '.+') + '$')
83
84 granted.push(qos)
85 client.subscriptions.push(reg)
86 }
87
88 client.suback({messageId: packet.messageId, granted: granted})
89 })
90
91 client.on('unsubscribe', function (packet) {
92 client.unsuback(packet)
93 })
94
95 client.on('pingreq', function () {
96 client.pingresp()
97 })
98}
99
100function start (startPort, done) {
101 var server = http.createServer()
102 var wss = new WebSocketServer({server: server})
103
104 wss.on('connection', function (ws) {
105 var stream, connection
106
107 if (!(ws.protocol === 'mqtt' ||
108 ws.protocol === 'mqttv3.1')) {
109 return ws.close()
110 }
111
112 stream = websocket(ws)
113 connection = new Connection(stream)
114 handleClient.call(server, connection)
115 })
116 server.listen(startPort, done)
117 server.on('request', function (req, res) {
118 res.statusCode = 404
119 res.end('Not Found')
120 })
121 return server
122}
123
124if (require.main === module) {
125 start(process.env.PORT || process.env.ZUUL_PORT, function (err) {
126 if (err) {
127 console.error(err)
128 return
129 }
130 console.log('tunnelled server started on port', process.env.PORT || process.env.ZUUL_PORT)
131 })
132}