1 | var util = require('util');
|
2 | var EventEmitter = require('events').EventEmitter;
|
3 | var ObjectStream = require('zetta-streams').ObjectStream;
|
4 | var EventStreamsParser = require('zetta-events-stream-protocol').Parser;
|
5 | var StreamTopic = require('zetta-events-stream-protocol').StreamTopic;
|
6 | var buildDeviceActions = require('./api_formats/siren/device.siren').buildActions;
|
7 | var deviceFormatter = require('./api_formats/siren/device.siren');
|
8 | var JSCompiler = require('caql-js-compiler');
|
9 |
|
10 |
|
11 |
|
12 | var EventSocket = module.exports = function(ws, query, streamEnabled) {
|
13 | EventEmitter.call(this);
|
14 | this.ws = ws;
|
15 | this.query = [];
|
16 | this._queryCache = {};
|
17 |
|
18 |
|
19 | this._subscriptions = [];
|
20 | this._subscriptionIndex = 0;
|
21 | this.streamEnabled = !!(streamEnabled);
|
22 |
|
23 |
|
24 | if (streamEnabled) {
|
25 | var self = this;
|
26 | this._parser = new EventStreamsParser();
|
27 | this._parser.on('error', function(err, original) {
|
28 | var msg = {
|
29 | type: 'error',
|
30 | code: (err.name === 'InvalidTypeError') ? 405 : 400,
|
31 | timestamp: new Date().getTime(),
|
32 | topic: (typeof original === 'object') ? original.topic : null,
|
33 | message: err.message
|
34 | };
|
35 | self.ws.send(JSON.stringify(msg));
|
36 | });
|
37 |
|
38 | this._parser.on('ping', function(msg) {
|
39 | var msg = {
|
40 | type: 'pong',
|
41 | timestamp: new Date().getTime(),
|
42 | data: msg.data
|
43 | };
|
44 | self.ws.send(JSON.stringify(msg));
|
45 | });
|
46 |
|
47 | this._parser.on('subscribe', function(msg) {
|
48 | var topic = new StreamTopic();
|
49 | try {
|
50 | topic.parse(msg.topic);
|
51 | } catch(err) {
|
52 | var msg = {
|
53 | type: 'error',
|
54 | code: 400,
|
55 | timestamp: new Date().getTime(),
|
56 | topic: msg.topic,
|
57 | message: err.message
|
58 | };
|
59 | self.ws.send(JSON.stringify(msg));
|
60 | return;
|
61 | }
|
62 |
|
63 | if (topic.pubsubIdentifier() === '') {
|
64 | var msg = {
|
65 | type: 'error',
|
66 | code: 400,
|
67 | timestamp: new Date().getTime(),
|
68 | topic: msg.topic,
|
69 | message: 'Topic must have server and specific topic. Specific topic missing.'
|
70 | };
|
71 | self.ws.send(JSON.stringify(msg));
|
72 | return;
|
73 | }
|
74 |
|
75 | if(topic.streamQuery && !self._queryCache[topic.streamQuery]) {
|
76 | try {
|
77 | var compiler = new JSCompiler();
|
78 | var compiled = compiler.compile(topic.streamQuery);
|
79 | self._queryCache[topic.streamQuery] = compiled;
|
80 | } catch(err) {
|
81 | var msg = {
|
82 | type: 'error',
|
83 | code: 400,
|
84 | timestamp: new Date().getTime(),
|
85 | topic: msg.topic,
|
86 | message: err.message
|
87 | }
|
88 | self.ws.send(JSON.stringify(msg));
|
89 | return;
|
90 | }
|
91 | }
|
92 |
|
93 | var subscription = { subscriptionId: ++self._subscriptionIndex, topic: topic, limit: msg.limit };
|
94 | self._subscriptions.push(subscription);
|
95 |
|
96 | var msg = {
|
97 | type: 'subscribe-ack',
|
98 | timestamp: new Date().getTime(),
|
99 | topic: msg.topic,
|
100 | subscriptionId: subscription.subscriptionId
|
101 | };
|
102 | self.ws.send(JSON.stringify(msg));
|
103 | self.emit('subscribe', subscription);
|
104 | });
|
105 |
|
106 | this._parser.on('unsubscribe', function(msg) {
|
107 | self._unsubscribe(msg.subscriptionId, function(err, subscription) {
|
108 | if (subscription) {
|
109 | self.emit('unsubscribe', subscription);
|
110 | }
|
111 | });
|
112 | });
|
113 | } else {
|
114 | if (!Array.isArray(query)) {
|
115 | query = [query];
|
116 | }
|
117 | this.query = query;
|
118 | }
|
119 |
|
120 | this.init();
|
121 | };
|
122 | util.inherits(EventSocket, EventEmitter);
|
123 |
|
124 | EventSocket.prototype._unsubscribe = function(subscriptionId, cb) {
|
125 | var self = this;
|
126 | var foundIdx = -1;
|
127 | self._subscriptions.some(function(subscription, idx) {
|
128 | if(subscription.subscriptionId === subscriptionId) {
|
129 | foundIdx = idx;
|
130 | return true;
|
131 | }
|
132 | });
|
133 |
|
134 | if (foundIdx < 0) {
|
135 | var msg = {
|
136 | type: 'error',
|
137 | code: 405,
|
138 | timestamp: new Date().getTime(),
|
139 | message: (new Error('Unable to unsubscribe from invalid subscriptionId')).message
|
140 | };
|
141 | self.ws.send(JSON.stringify(msg));
|
142 | return;
|
143 | }
|
144 |
|
145 | var subscription = self._subscriptions.splice(foundIdx, 1)[0];
|
146 | var msg = {
|
147 | type: 'unsubscribe-ack',
|
148 | timestamp: new Date().getTime(),
|
149 | subscriptionId: subscription.subscriptionId
|
150 | };
|
151 |
|
152 | self.ws.send(JSON.stringify(msg));
|
153 | if (typeof cb === 'function') {
|
154 | cb(null, subscription);
|
155 | }
|
156 | };
|
157 |
|
158 | EventSocket.prototype.send = function(topic, data) {
|
159 | if (!Buffer.isBuffer(data) && typeof data === 'object') {
|
160 | var tmpData = (this.streamEnabled) ? data.data : data;
|
161 |
|
162 | if (tmpData['transitions']) {
|
163 |
|
164 | tmpData.actions = buildDeviceActions(tmpData.properties.id, this.ws._env, this.ws._loader, tmpData.transitions);
|
165 | delete tmpData.transitions;
|
166 | if (this.streamEnabled) {
|
167 | data.data = tmpData;
|
168 | } else {
|
169 | data = tmpData;
|
170 | }
|
171 | } else if (data['query']) {
|
172 |
|
173 | tmpData = deviceFormatter({ loader: this.ws._loader, env: this.ws._env, model: tmpData.device });
|
174 | if (this.streamEnabled) {
|
175 | data.data = tmpData;
|
176 | } else {
|
177 | data = tmpData;
|
178 | }
|
179 | }
|
180 |
|
181 |
|
182 | if (topic.indexOf('_peer/') === 0 && typeof tmpData.peer === 'object') {
|
183 | var properties = tmpData.peer.properties();
|
184 | if (tmpData.error) {
|
185 | properties.error = tmpData.error;
|
186 | }
|
187 |
|
188 | if (this.streamEnabled) {
|
189 | data.data = properties;
|
190 | } else {
|
191 | data = ObjectStream.format(topic, properties);
|
192 | }
|
193 | }
|
194 |
|
195 | try {
|
196 | data = JSON.stringify(data);
|
197 | } catch (err) {
|
198 | console.error('ws JSON.stringify ', err);
|
199 | return;
|
200 | }
|
201 | }
|
202 |
|
203 | var args = Array.prototype.slice.call(arguments);
|
204 | args.splice(0, 1);
|
205 |
|
206 |
|
207 | if (args.length < 1 && typeof args[args.length - 1] !== 'function') {
|
208 | args.push(function(err) { });
|
209 | }
|
210 |
|
211 | this.ws.send.apply(this.ws, args);
|
212 | };
|
213 |
|
214 | EventSocket.prototype.onClose = function() {
|
215 | this.emit('close');
|
216 | };
|
217 |
|
218 | EventSocket.prototype.init = function() {
|
219 | var self = this;
|
220 | this.ws.on('message', function(buffer) {
|
221 | if (self.streamEnabled) {
|
222 | self._parser.add(buffer);
|
223 | }
|
224 | });
|
225 | this.ws.on('close', this.onClose.bind(this));
|
226 | this.ws.on('error',function(err){
|
227 | console.error('ws error:', err);
|
228 | self.onClose();
|
229 | });
|
230 | };
|