1 | var util = require('util');
|
2 | var EventEmitter = require('events').EventEmitter;
|
3 | var ObjectStream = require('zetta-streams').ObjectStream;
|
4 | var EventStreamsParser = require('zetta-events-stream-protocol').Parser;
|
5 | var StreamTopic = require('zetta-events-stream-protocol').StreamTopic;
|
6 | var buildDeviceActions = require('./api_formats/siren/device.siren').buildActions;
|
7 | var deviceFormatter = require('./api_formats/siren/device.siren');
|
8 | var JSCompiler = require('caql-js-compiler');
|
9 |
|
10 |
|
11 |
|
12 | var EventSocket = module.exports = function(ws, query, options) {
|
13 | EventEmitter.call(this);
|
14 |
|
15 | if (options === undefined) {
|
16 | options = {};
|
17 | }
|
18 |
|
19 | this.ws = ws;
|
20 | this.query = [];
|
21 | this._queryCache = {};
|
22 |
|
23 |
|
24 | this._subscriptions = [];
|
25 | this._subscriptionIndex = 0;
|
26 |
|
27 |
|
28 | this.streamEnabled = !!(options.streamEnabled);
|
29 | this.filterMultiple = !!(options.filterMultiple);
|
30 |
|
31 | this.hasBeenSent = function(msg) {
|
32 | return this._sendBuffer.add(msg);
|
33 | };
|
34 | this._sendBuffer = {
|
35 | add: function(msg) {
|
36 | if (this._buffer.indexOf(msg) >= 0) {
|
37 | return true;
|
38 | }
|
39 |
|
40 | if (this._buffer.unshift(msg) > this.max) {
|
41 | this._buffer.pop();
|
42 | }
|
43 |
|
44 | return false;
|
45 | },
|
46 | max: 50,
|
47 | _buffer: []
|
48 | };
|
49 |
|
50 |
|
51 |
|
52 | if (this.streamEnabled) {
|
53 | var self = this;
|
54 | this._parser = new EventStreamsParser();
|
55 | this._parser.on('error', function(err, original) {
|
56 | var msg = {
|
57 | type: 'error',
|
58 | code: (err.name === 'InvalidTypeError') ? 405 : 400,
|
59 | timestamp: new Date().getTime(),
|
60 | topic: (typeof original === 'object') ? original.topic : null,
|
61 | message: err.message
|
62 | };
|
63 | self.ws.send(JSON.stringify(msg));
|
64 | });
|
65 |
|
66 | this._parser.on('ping', function(msg) {
|
67 | var msg = {
|
68 | type: 'pong',
|
69 | timestamp: new Date().getTime(),
|
70 | data: msg.data
|
71 | };
|
72 | self.ws.send(JSON.stringify(msg));
|
73 | });
|
74 |
|
75 | this._parser.on('subscribe', function(msg) {
|
76 | var topic = new StreamTopic();
|
77 | try {
|
78 | topic.parse(msg.topic);
|
79 | } catch(err) {
|
80 | var msg = {
|
81 | type: 'error',
|
82 | code: 400,
|
83 | timestamp: new Date().getTime(),
|
84 | topic: msg.topic,
|
85 | message: err.message
|
86 | };
|
87 | self.ws.send(JSON.stringify(msg));
|
88 | return;
|
89 | }
|
90 |
|
91 | if (topic.pubsubIdentifier() === '') {
|
92 | var msg = {
|
93 | type: 'error',
|
94 | code: 400,
|
95 | timestamp: new Date().getTime(),
|
96 | topic: msg.topic,
|
97 | message: 'Topic must have server and specific topic. Specific topic missing.'
|
98 | };
|
99 | self.ws.send(JSON.stringify(msg));
|
100 | return;
|
101 | }
|
102 |
|
103 | if(topic.streamQuery && !self._queryCache[topic.streamQuery]) {
|
104 | try {
|
105 | var compiler = new JSCompiler();
|
106 | var compiled = compiler.compile(topic.streamQuery);
|
107 | self._queryCache[topic.streamQuery] = compiled;
|
108 | } catch(err) {
|
109 | var msg = {
|
110 | type: 'error',
|
111 | code: 400,
|
112 | timestamp: new Date().getTime(),
|
113 | topic: msg.topic,
|
114 | message: err.message
|
115 | }
|
116 | self.ws.send(JSON.stringify(msg));
|
117 | return;
|
118 | }
|
119 | }
|
120 |
|
121 | var subscription = { subscriptionId: ++self._subscriptionIndex, topic: topic, limit: msg.limit };
|
122 | self._subscriptions.push(subscription);
|
123 |
|
124 | var msg = {
|
125 | type: 'subscribe-ack',
|
126 | timestamp: new Date().getTime(),
|
127 | topic: msg.topic,
|
128 | subscriptionId: subscription.subscriptionId
|
129 | };
|
130 | self.ws.send(JSON.stringify(msg));
|
131 | self.emit('subscribe', subscription);
|
132 | });
|
133 |
|
134 | this._parser.on('unsubscribe', function(msg) {
|
135 | self._unsubscribe(msg.subscriptionId, function(err, subscription) {
|
136 | if (subscription) {
|
137 | self.emit('unsubscribe', subscription);
|
138 | }
|
139 | });
|
140 | });
|
141 | } else {
|
142 | if (!Array.isArray(query)) {
|
143 | query = [query];
|
144 | }
|
145 | this.query = query;
|
146 | }
|
147 |
|
148 | this.init();
|
149 | };
|
150 | util.inherits(EventSocket, EventEmitter);
|
151 |
|
152 | EventSocket.prototype._unsubscribe = function(subscriptionId, cb) {
|
153 | var self = this;
|
154 | var foundIdx = -1;
|
155 | self._subscriptions.some(function(subscription, idx) {
|
156 | if(subscription.subscriptionId === subscriptionId) {
|
157 | foundIdx = idx;
|
158 | return true;
|
159 | }
|
160 | });
|
161 |
|
162 | if (foundIdx < 0) {
|
163 | var msg = {
|
164 | type: 'error',
|
165 | code: 405,
|
166 | timestamp: new Date().getTime(),
|
167 | message: (new Error('Unable to unsubscribe from invalid subscriptionId')).message
|
168 | };
|
169 | self.ws.send(JSON.stringify(msg));
|
170 | return;
|
171 | }
|
172 |
|
173 | var subscription = self._subscriptions.splice(foundIdx, 1)[0];
|
174 | var msg = {
|
175 | type: 'unsubscribe-ack',
|
176 | timestamp: new Date().getTime(),
|
177 | subscriptionId: subscription.subscriptionId
|
178 | };
|
179 |
|
180 | self.ws.send(JSON.stringify(msg));
|
181 | if (typeof cb === 'function') {
|
182 | cb(null, subscription);
|
183 | }
|
184 | };
|
185 |
|
186 | EventSocket.prototype.send = function(topic, data) {
|
187 | if (!Buffer.isBuffer(data) && typeof data === 'object' && data !== null) {
|
188 | var tmpData = (this.streamEnabled) ? data.data : data;
|
189 |
|
190 | if (tmpData !== null) {
|
191 | if (tmpData['transitions']) {
|
192 |
|
193 | tmpData.actions = buildDeviceActions(tmpData.properties.id, this.ws._env, this.ws._loader, tmpData.transitions);
|
194 | delete tmpData.transitions;
|
195 | if (this.streamEnabled) {
|
196 | data.data = tmpData;
|
197 | } else {
|
198 | data = tmpData;
|
199 | }
|
200 | } else if (tmpData['query']) {
|
201 |
|
202 | tmpData = deviceFormatter({ loader: this.ws._loader, env: this.ws._env, model: tmpData.device });
|
203 | if (this.streamEnabled) {
|
204 | data.data = tmpData;
|
205 | } else {
|
206 | data = tmpData;
|
207 | }
|
208 | }
|
209 |
|
210 |
|
211 | if (topic.indexOf('_peer/') === 0 && typeof tmpData.peer === 'object') {
|
212 | var properties = tmpData.peer.properties();
|
213 | if (tmpData.error) {
|
214 | properties.error = tmpData.error;
|
215 | }
|
216 |
|
217 | if (this.streamEnabled) {
|
218 | data.data = properties;
|
219 | } else {
|
220 | data = ObjectStream.format(topic, properties);
|
221 | }
|
222 | }
|
223 | }
|
224 |
|
225 | try {
|
226 | data = JSON.stringify(data);
|
227 | } catch (err) {
|
228 | console.error('ws JSON.stringify ', err);
|
229 | return;
|
230 | }
|
231 | }
|
232 |
|
233 | var args = Array.prototype.slice.call(arguments);
|
234 | args.splice(0, 1);
|
235 |
|
236 |
|
237 | if (args.length < 1 && typeof args[args.length - 1] !== 'function') {
|
238 | args.push(function(err) { });
|
239 | }
|
240 |
|
241 | this.ws.send.apply(this.ws, args);
|
242 | };
|
243 |
|
244 | EventSocket.prototype.onClose = function() {
|
245 | this.emit('close');
|
246 | };
|
247 |
|
248 | EventSocket.prototype.init = function() {
|
249 | var self = this;
|
250 | this.ws.on('message', function(buffer) {
|
251 | if (self.streamEnabled) {
|
252 | self._parser.add(buffer);
|
253 | }
|
254 | });
|
255 | this.ws.on('close', this.onClose.bind(this));
|
256 | this.ws.on('error',function(err){
|
257 | console.error('ws error:', err);
|
258 | self.onClose();
|
259 | });
|
260 | };
|