1 | 'use strict'
|
2 |
|
3 | var handleClient
|
4 | var websocket = require('websocket-stream')
|
5 | var WebSocketServer = require('ws').Server
|
6 | var Connection = require('mqtt-connection')
|
7 | var http = require('http')
|
8 |
|
9 | handleClient = function (client) {
|
10 | var self = this
|
11 |
|
12 | if (!self.clients) {
|
13 | self.clients = {}
|
14 | }
|
15 |
|
16 | client.on('connect', function (packet) {
|
17 | if (packet.clientId === 'invalid') {
|
18 | client.connack({returnCode: 2})
|
19 | } else {
|
20 | client.connack({returnCode: 0})
|
21 | }
|
22 | self.clients[packet.clientId] = client
|
23 | client.subscriptions = []
|
24 | })
|
25 |
|
26 | client.on('publish', function (packet) {
|
27 | var i, k, c, s, publish
|
28 | switch (packet.qos) {
|
29 | case 0:
|
30 | break
|
31 | case 1:
|
32 | client.puback(packet)
|
33 | break
|
34 | case 2:
|
35 | client.pubrec(packet)
|
36 | break
|
37 | }
|
38 |
|
39 | for (k in self.clients) {
|
40 | c = self.clients[k]
|
41 | publish = false
|
42 |
|
43 | for (i = 0; i < c.subscriptions.length; i++) {
|
44 | s = c.subscriptions[i]
|
45 |
|
46 | if (s.test(packet.topic)) {
|
47 | publish = true
|
48 | }
|
49 | }
|
50 |
|
51 | if (publish) {
|
52 | try {
|
53 | c.publish({topic: packet.topic, payload: packet.payload})
|
54 | } catch (error) {
|
55 | delete self.clients[k]
|
56 | }
|
57 | }
|
58 | }
|
59 | })
|
60 |
|
61 | client.on('pubrel', function (packet) {
|
62 | client.pubcomp(packet)
|
63 | })
|
64 |
|
65 | client.on('pubrec', function (packet) {
|
66 | client.pubrel(packet)
|
67 | })
|
68 |
|
69 | client.on('pubcomp', function () {
|
70 |
|
71 | })
|
72 |
|
73 | client.on('subscribe', function (packet) {
|
74 | var qos
|
75 | var topic
|
76 | var reg
|
77 | var granted = []
|
78 |
|
79 | for (var i = 0; i < packet.subscriptions.length; i++) {
|
80 | qos = packet.subscriptions[i].qos
|
81 | topic = packet.subscriptions[i].topic
|
82 | reg = new RegExp(topic.replace('+', '[^/]+').replace('#', '.+') + '$')
|
83 |
|
84 | granted.push(qos)
|
85 | client.subscriptions.push(reg)
|
86 | }
|
87 |
|
88 | client.suback({messageId: packet.messageId, granted: granted})
|
89 | })
|
90 |
|
91 | client.on('unsubscribe', function (packet) {
|
92 | client.unsuback(packet)
|
93 | })
|
94 |
|
95 | client.on('pingreq', function () {
|
96 | client.pingresp()
|
97 | })
|
98 | }
|
99 |
|
100 | function start (startPort, done) {
|
101 | var server = http.createServer()
|
102 | var wss = new WebSocketServer({server: server})
|
103 |
|
104 | wss.on('connection', function (ws) {
|
105 | var stream, connection
|
106 |
|
107 | if (!(ws.protocol === 'mqtt' ||
|
108 | ws.protocol === 'mqttv3.1')) {
|
109 | return ws.close()
|
110 | }
|
111 |
|
112 | stream = websocket(ws)
|
113 | connection = new Connection(stream)
|
114 | handleClient.call(server, connection)
|
115 | })
|
116 | server.listen(startPort, done)
|
117 | server.on('request', function (req, res) {
|
118 | res.statusCode = 404
|
119 | res.end('Not Found')
|
120 | })
|
121 | return server
|
122 | }
|
123 |
|
124 | if (require.main === module) {
|
125 | start(process.env.PORT || process.env.ZUUL_PORT, function (err) {
|
126 | if (err) {
|
127 | console.error(err)
|
128 | return
|
129 | }
|
130 | console.log('tunnelled server started on port', process.env.PORT || process.env.ZUUL_PORT)
|
131 | })
|
132 | }
|