UNPKG

7.04 kBJavaScriptView Raw
1var util = require('util');
2var EventEmitter = require('events').EventEmitter;
3var ObjectStream = require('zetta-streams').ObjectStream;
4var EventStreamsParser = require('zetta-events-stream-protocol').Parser;
5var StreamTopic = require('zetta-events-stream-protocol').StreamTopic;
6var buildDeviceActions = require('./api_formats/siren/device.siren').buildActions;
7var deviceFormatter = require('./api_formats/siren/device.siren');
8var JSCompiler = require('caql-js-compiler');
9
10//Flag to indicate that we expect data back on teh websocket
11//Tracking subscriptions
12var EventSocket = module.exports = function(ws, query, options) {
13 EventEmitter.call(this);
14
15 if (options === undefined) {
16 options = {};
17 }
18
19 this.ws = ws;
20 this.query = [];
21 this._queryCache = {};
22
23 // list of event streams
24 this._subscriptions = [];
25 this._subscriptionIndex = 0;
26
27 // Flags
28 this.streamEnabled = !!(options.streamEnabled);
29 this.filterMultiple = !!(options.filterMultiple);
30
31 this.hasBeenSent = function(msg) {
32 return this._sendBuffer.add(msg);
33 };
34 this._sendBuffer = {
35 add: function(msg) {
36 if (this._buffer.indexOf(msg) >= 0) {
37 return true;
38 }
39
40 if (this._buffer.unshift(msg) > this.max) {
41 this._buffer.pop();
42 }
43
44 return false;
45 },
46 max: 50,
47 _buffer: []
48 };
49
50
51 // only setup parser when using event stream
52 if (this.streamEnabled) {
53 var self = this;
54 this._parser = new EventStreamsParser();
55 this._parser.on('error', function(err, original) {
56 var msg = {
57 type: 'error',
58 code: (err.name === 'InvalidTypeError') ? 405 : 400,
59 timestamp: new Date().getTime(),
60 topic: (typeof original === 'object') ? original.topic : null,
61 message: err.message
62 };
63 self.ws.send(JSON.stringify(msg));
64 });
65
66 this._parser.on('ping', function(msg) {
67 var msg = {
68 type: 'pong',
69 timestamp: new Date().getTime(),
70 data: msg.data
71 };
72 self.ws.send(JSON.stringify(msg));
73 });
74
75 this._parser.on('subscribe', function(msg) {
76 var topic = new StreamTopic();
77 try {
78 topic.parse(msg.topic);
79 } catch(err) {
80 var msg = {
81 type: 'error',
82 code: 400,
83 timestamp: new Date().getTime(),
84 topic: msg.topic,
85 message: err.message
86 };
87 self.ws.send(JSON.stringify(msg));
88 return;
89 }
90
91 if (topic.pubsubIdentifier() === '') {
92 var msg = {
93 type: 'error',
94 code: 400,
95 timestamp: new Date().getTime(),
96 topic: msg.topic,
97 message: 'Topic must have server and specific topic. Specific topic missing.'
98 };
99 self.ws.send(JSON.stringify(msg));
100 return;
101 }
102
103 if(topic.streamQuery && !self._queryCache[topic.streamQuery]) {
104 try {
105 var compiler = new JSCompiler();
106 var compiled = compiler.compile(topic.streamQuery);
107 self._queryCache[topic.streamQuery] = compiled;
108 } catch(err) {
109 var msg = {
110 type: 'error',
111 code: 400,
112 timestamp: new Date().getTime(),
113 topic: msg.topic,
114 message: err.message
115 }
116 self.ws.send(JSON.stringify(msg));
117 return;
118 }
119 }
120
121 var subscription = { subscriptionId: ++self._subscriptionIndex, topic: topic, limit: msg.limit };
122 self._subscriptions.push(subscription);
123
124 var msg = {
125 type: 'subscribe-ack',
126 timestamp: new Date().getTime(),
127 topic: msg.topic,
128 subscriptionId: subscription.subscriptionId
129 };
130 self.ws.send(JSON.stringify(msg));
131 self.emit('subscribe', subscription);
132 });
133
134 this._parser.on('unsubscribe', function(msg) {
135 self._unsubscribe(msg.subscriptionId, function(err, subscription) {
136 if (subscription) {
137 self.emit('unsubscribe', subscription);
138 }
139 });
140 });
141 } else {
142 if (!Array.isArray(query)) {
143 query = [query];
144 }
145 this.query = query; // contains .topic, .name
146 }
147
148 this.init();
149};
150util.inherits(EventSocket, EventEmitter);
151
152EventSocket.prototype._unsubscribe = function(subscriptionId, cb) {
153 var self = this;
154 var foundIdx = -1;
155 self._subscriptions.some(function(subscription, idx) {
156 if(subscription.subscriptionId === subscriptionId) {
157 foundIdx = idx;
158 return true;
159 }
160 });
161
162 if (foundIdx < 0) {
163 var msg = {
164 type: 'error',
165 code: 405,
166 timestamp: new Date().getTime(),
167 message: (new Error('Unable to unsubscribe from invalid subscriptionId')).message
168 };
169 self.ws.send(JSON.stringify(msg));
170 return;
171 }
172
173 var subscription = self._subscriptions.splice(foundIdx, 1)[0];
174 var msg = {
175 type: 'unsubscribe-ack',
176 timestamp: new Date().getTime(),
177 subscriptionId: subscription.subscriptionId
178 };
179
180 self.ws.send(JSON.stringify(msg));
181 if (typeof cb === 'function') {
182 cb(null, subscription);
183 }
184};
185
186EventSocket.prototype.send = function(topic, data) {
187 if (!Buffer.isBuffer(data) && typeof data === 'object') {
188 var tmpData = (this.streamEnabled) ? data.data : data;
189
190 if (tmpData['transitions']) {
191 // format device logs
192 tmpData.actions = buildDeviceActions(tmpData.properties.id, this.ws._env, this.ws._loader, tmpData.transitions);
193 delete tmpData.transitions;
194 if (this.streamEnabled) {
195 data.data = tmpData;
196 } else {
197 data = tmpData;
198 }
199 } else if (tmpData['query']) {
200 // format device queries
201 tmpData = deviceFormatter({ loader: this.ws._loader, env: this.ws._env, model: tmpData.device });
202 if (this.streamEnabled) {
203 data.data = tmpData;
204 } else {
205 data = tmpData;
206 }
207 }
208
209 // used for _peer/connect _peer/disconnect
210 if (topic.indexOf('_peer/') === 0 && typeof tmpData.peer === 'object') {
211 var properties = tmpData.peer.properties();
212 if (tmpData.error) {
213 properties.error = tmpData.error;
214 }
215
216 if (this.streamEnabled) {
217 data.data = properties;
218 } else {
219 data = ObjectStream.format(topic, properties);
220 }
221 }
222
223 try {
224 data = JSON.stringify(data);
225 } catch (err) {
226 console.error('ws JSON.stringify ', err);
227 return;
228 }
229 }
230
231 var args = Array.prototype.slice.call(arguments);
232 args.splice(0, 1); // remove topic
233
234 // add callback to args list if it does not have one
235 if (args.length < 1 && typeof args[args.length - 1] !== 'function') {
236 args.push(function(err) { });
237 }
238
239 this.ws.send.apply(this.ws, args);
240};
241
242EventSocket.prototype.onClose = function() {
243 this.emit('close');
244};
245
246EventSocket.prototype.init = function() {
247 var self = this;
248 this.ws.on('message', function(buffer) {
249 if (self.streamEnabled) {
250 self._parser.add(buffer);
251 }
252 });
253 this.ws.on('close', this.onClose.bind(this));
254 this.ws.on('error',function(err){
255 console.error('ws error:', err);
256 self.onClose();
257 });
258};