UNPKG

6.55 kBJavaScriptView Raw
1var util = require('util');
2var EventEmitter = require('events').EventEmitter;
3var ObjectStream = require('zetta-streams').ObjectStream;
4var EventStreamsParser = require('zetta-events-stream-protocol').Parser;
5var StreamTopic = require('zetta-events-stream-protocol').StreamTopic;
6var buildDeviceActions = require('./api_formats/siren/device.siren').buildActions;
7var deviceFormatter = require('./api_formats/siren/device.siren');
8var JSCompiler = require('caql-js-compiler');
9
10//Flag to indicate that we expect data back on teh websocket
11//Tracking subscriptions
12var EventSocket = module.exports = function(ws, query, streamEnabled) {
13 EventEmitter.call(this);
14 this.ws = ws;
15 this.query = [];
16 this._queryCache = {};
17
18 // list of event streams
19 this._subscriptions = [];
20 this._subscriptionIndex = 0;
21 this.streamEnabled = !!(streamEnabled);
22
23 // only setup parser when using event stream
24 if (streamEnabled) {
25 var self = this;
26 this._parser = new EventStreamsParser();
27 this._parser.on('error', function(err, original) {
28 var msg = {
29 type: 'error',
30 code: (err.name === 'InvalidTypeError') ? 405 : 400,
31 timestamp: new Date().getTime(),
32 topic: (typeof original === 'object') ? original.topic : null,
33 message: err.message
34 };
35 self.ws.send(JSON.stringify(msg));
36 });
37
38 this._parser.on('ping', function(msg) {
39 var msg = {
40 type: 'pong',
41 timestamp: new Date().getTime(),
42 data: msg.data
43 };
44 self.ws.send(JSON.stringify(msg));
45 });
46
47 this._parser.on('subscribe', function(msg) {
48 var topic = new StreamTopic();
49 try {
50 topic.parse(msg.topic);
51 } catch(err) {
52 var msg = {
53 type: 'error',
54 code: 400,
55 timestamp: new Date().getTime(),
56 topic: msg.topic,
57 message: err.message
58 };
59 self.ws.send(JSON.stringify(msg));
60 return;
61 }
62
63 if (topic.pubsubIdentifier() === '') {
64 var msg = {
65 type: 'error',
66 code: 400,
67 timestamp: new Date().getTime(),
68 topic: msg.topic,
69 message: 'Topic must have server and specific topic. Specific topic missing.'
70 };
71 self.ws.send(JSON.stringify(msg));
72 return;
73 }
74
75 if(topic.streamQuery && !self._queryCache[topic.streamQuery]) {
76 try {
77 var compiler = new JSCompiler();
78 var compiled = compiler.compile(topic.streamQuery);
79 self._queryCache[topic.streamQuery] = compiled;
80 } catch(err) {
81 var msg = {
82 type: 'error',
83 code: 400,
84 timestamp: new Date().getTime(),
85 topic: msg.topic,
86 message: err.message
87 }
88 self.ws.send(JSON.stringify(msg));
89 return;
90 }
91 }
92
93 var subscription = { subscriptionId: ++self._subscriptionIndex, topic: topic, limit: msg.limit };
94 self._subscriptions.push(subscription);
95
96 var msg = {
97 type: 'subscribe-ack',
98 timestamp: new Date().getTime(),
99 topic: msg.topic,
100 subscriptionId: subscription.subscriptionId
101 };
102 self.ws.send(JSON.stringify(msg));
103 self.emit('subscribe', subscription);
104 });
105
106 this._parser.on('unsubscribe', function(msg) {
107 self._unsubscribe(msg.subscriptionId, function(err, subscription) {
108 if (subscription) {
109 self.emit('unsubscribe', subscription);
110 }
111 });
112 });
113 } else {
114 if (!Array.isArray(query)) {
115 query = [query];
116 }
117 this.query = query; // contains .topic, .name
118 }
119
120 this.init();
121};
122util.inherits(EventSocket, EventEmitter);
123
124EventSocket.prototype._unsubscribe = function(subscriptionId, cb) {
125 var self = this;
126 var foundIdx = -1;
127 self._subscriptions.some(function(subscription, idx) {
128 if(subscription.subscriptionId === subscriptionId) {
129 foundIdx = idx;
130 return true;
131 }
132 });
133
134 if (foundIdx < 0) {
135 var msg = {
136 type: 'error',
137 code: 405,
138 timestamp: new Date().getTime(),
139 message: (new Error('Unable to unsubscribe from invalid subscriptionId')).message
140 };
141 self.ws.send(JSON.stringify(msg));
142 return;
143 }
144
145 var subscription = self._subscriptions.splice(foundIdx, 1)[0];
146 var msg = {
147 type: 'unsubscribe-ack',
148 timestamp: new Date().getTime(),
149 subscriptionId: subscription.subscriptionId
150 };
151
152 self.ws.send(JSON.stringify(msg));
153 if (typeof cb === 'function') {
154 cb(null, subscription);
155 }
156};
157
158EventSocket.prototype.send = function(topic, data) {
159 if (!Buffer.isBuffer(data) && typeof data === 'object') {
160 var tmpData = (this.streamEnabled) ? data.data : data;
161
162 if (tmpData['transitions']) {
163 // format device logs
164 tmpData.actions = buildDeviceActions(tmpData.properties.id, this.ws._env, this.ws._loader, tmpData.transitions);
165 delete tmpData.transitions;
166 if (this.streamEnabled) {
167 data.data = tmpData;
168 } else {
169 data = tmpData;
170 }
171 } else if (data['query']) {
172 // format device queries
173 tmpData = deviceFormatter({ loader: this.ws._loader, env: this.ws._env, model: tmpData.device });
174 if (this.streamEnabled) {
175 data.data = tmpData;
176 } else {
177 data = tmpData;
178 }
179 }
180
181 // used for _peer/connect _peer/disconnect
182 if (topic.indexOf('_peer/') === 0 && typeof tmpData.peer === 'object') {
183 var properties = tmpData.peer.properties();
184 if (tmpData.error) {
185 properties.error = tmpData.error;
186 }
187
188 if (this.streamEnabled) {
189 data.data = properties;
190 } else {
191 data = ObjectStream.format(topic, properties);
192 }
193 }
194
195 try {
196 data = JSON.stringify(data);
197 } catch (err) {
198 console.error('ws JSON.stringify ', err);
199 return;
200 }
201 }
202
203 var args = Array.prototype.slice.call(arguments);
204 args.splice(0, 1); // remove topic
205
206 // add callback to args list if it does not have one
207 if (args.length < 1 && typeof args[args.length - 1] !== 'function') {
208 args.push(function(err) { });
209 }
210
211 this.ws.send.apply(this.ws, args);
212};
213
214EventSocket.prototype.onClose = function() {
215 this.emit('close');
216};
217
218EventSocket.prototype.init = function() {
219 var self = this;
220 this.ws.on('message', function(buffer) {
221 if (self.streamEnabled) {
222 self._parser.add(buffer);
223 }
224 });
225 this.ws.on('close', this.onClose.bind(this));
226 this.ws.on('error',function(err){
227 console.error('ws error:', err);
228 self.onClose();
229 });
230};