UNPKG

12.3 kBJavaScriptView Raw
1/* skale client side library */
2
3// Copyright 2016 Luca-SAS, licensed under the Apache License 2.0
4
5'use strict';
6
7var net = require('net');
8var util = require('util');
9var stream = require('stream');
10//var trace = require('line-trace');
11var events = require('events');
12var thenify = require('thenify').withCallback;
13var websocket = require('websocket-stream'); // Keep this order for browserify
14
15var minMulticast = 4294901760; // 2^32 - 2^16, 65536 available multicast ids
16
17var streamMaxId = 10000;
18
19function encode(msg) {
20 var str = JSON.stringify(msg), len = Buffer.byteLength(str), buf = new Buffer(len + 8);
21 buf.writeUInt32LE(msg.id, 0, true);
22 buf.writeUInt32LE(len, 4, true);
23 buf.write(str, 8);
24 return buf;
25}
26
27function bencode(chunk, id, streamid) {
28 var header = new Buffer(9);
29 header.writeUInt32LE(id, 0, true);
30 header.writeUInt32LE(chunk.length + 1, 4, true);
31 header.writeInt8(streamid, 8, true);
32 if (chunk.length)
33 return Buffer.concat([header, chunk], chunk.length + 9);
34 return header;
35}
36
37function ToGrid(debug) {
38 stream.Transform.call(this, {objectMode: true});
39 this.debug = debug;
40}
41util.inherits(ToGrid, stream.Transform);
42
43ToGrid.prototype._transform = function (msg, encoding, done) {
44 if (this.debug > 2) console.error('\n# Send:', msg);
45 done(null, encode(msg));
46};
47
48function FromGrid() {
49 stream.Transform.call(this, {objectMode: true});
50 this._buf = null;
51}
52util.inherits(FromGrid, stream.Transform);
53
54FromGrid.prototype._transform = function (chunk, encoding, done) {
55 var len, data, buf, offset = 0;
56
57 if (this._buf) {
58 chunk = Buffer.concat([this._buf, chunk], this._buf.length + chunk.length);
59 this._buf = null;
60 }
61 do {
62 buf = chunk.slice(offset);
63 if (buf.length < 8) {
64 this._buf = buf;
65 break;
66 }
67 len = buf.readUInt32LE(4, true);
68 if (buf.length < 8 + len) {
69 this._buf = buf;
70 break;
71 }
72 data = buf.slice(0, 8 + len);
73 this.push(data);
74 offset += 8 + len;
75 } while (offset < chunk.length);
76 done();
77};
78
79function StreamToGrid(sock, id, sid) {
80 stream.Transform.call(this);
81 this.sock = sock;
82 this.clientId = id;
83 this.clientStreamId = sid;
84}
85util.inherits(StreamToGrid, stream.Transform);
86
87StreamToGrid.prototype._transform = function (chunk, encoding, done) {
88 this.sock.write(bencode(chunk, this.clientId, this.clientStreamId), encoding, done);
89};
90
91StreamToGrid.prototype._flush = function (done) {
92 this.sock.write(bencode('', this.clientId, this.clientStreamId), done);
93};
94
95function Consumer(debug) {
96 if (!(this instanceof Consumer))
97 return new Consumer(debug);
98 stream.Transform.call(this, {objectMode: true});
99 this.subscriber = {};
100 this.streams = [];
101 this.debug = debug;
102}
103util.inherits(Consumer, stream.Transform);
104
105Consumer.prototype._transform = function (chunk, encoding, done) {
106 var streamid = chunk[8];
107 if (this.streams[streamid]) {
108 if (chunk.length == 9) return this.streams[streamid].end(done);
109 return this.streams[streamid].write(chunk.slice(9), encoding, done);
110 }
111
112 var msg = JSON.parse(chunk.slice(8));
113
114 if (msg.ufrom && !this.client.hostId[msg.ufrom])
115 this.client.hostId[msg.ufrom] = msg.from;
116
117 if (this.debug > 2) console.error('\n# Received:', msg);
118 if (msg.cmd == 'reply') {
119 if (this.client.pending[msg.cid]) {
120 this.client.pending[msg.cid](msg.error, msg.data);
121 delete this.client.pending[msg.cid];
122 } else {
123 console.warn('[' + this.client.id + '] unwanted reply:', msg);
124 }
125 done();
126 } else if (this.subscriber[msg.cmd]) {
127 this.subscriber[msg.cmd].write(JSON.stringify(msg.data), done);
128 } else {
129 this.client.emit(msg.cmd, msg);
130 done();
131 }
132};
133
134function PubStream(client, name, dest) {
135 if (!(this instanceof PubStream))
136 return new PubStream(client, name, dest);
137 stream.Transform.call(this, {objectMode: true});
138 this.dest = dest;
139 this.client = client;
140 this.cmd = name;
141 this.pipe(client.output);
142}
143util.inherits(PubStream, stream.Transform);
144
145PubStream.prototype._transform = function (chunk, encoding, done) {
146 if (this.id === undefined) {
147 var self = this;
148 if (self.dest) {
149 this.client.send(0, {cmd: 'id', data: this.dest.uuid}, function (err, res) {
150 if (err || res === undefined)
151 throw new Error('PubStream error: ' + err);
152 self.client.hostId[self.dest.uuid] = self.id = res;
153 done(null, {cmd: self.cmd, id: self.id, data: chunk.toString()});
154 });
155 } else {
156 this.client.send(0, {cmd: 'tid', data: this.cmd}, function (err, res) {
157 if (err || res === undefined)
158 throw new Error('PubStream error: ' + err);
159 self.client.topicId[self.cmd] = self.id = minMulticast + res;
160 done(null, {cmd: self.cmd, id: self.id, data: chunk.toString()});
161 });
162 }
163 } else
164 done(null, {cmd: this.cmd, id: this.id, data: chunk.toString()});
165};
166
167function WriteStream(client, name, dest) {
168 if (!(this instanceof WriteStream))
169 return new WriteStream(client, name);
170 stream.Transform.call(this, {objectMode: true});
171 this.client = client;
172 this.name = name;
173 this.dest = dest;
174 this.ended = false;
175 var self = this;
176
177 self.on('end', function (ignore) {
178 self.ignore = ignore;
179 self.ended = true;
180 self.end();
181 });
182}
183
184util.inherits(WriteStream, stream.Transform);
185
186// Flow control is performed through reply from remote
187WriteStream.prototype._transform = function (chunk, encoding, done) {
188 this.client.send(this.dest, {
189 cmd: 'request',
190 data: {cmd: 'stream', stream: this.name, data: chunk}
191 }, done);
192};
193
194WriteStream.prototype._flush = function(done) {
195 try {
196 this.client.send(this.dest, {
197 cmd: 'request',
198 data: {cmd: 'stream', stream: this.name, data: null, ignore: this.ignore}
199 }, done);
200 } catch (err) { done(); }
201};
202
203function ReadStream (client, name, opt) {
204 stream.Transform.call(this, {objectMode: true});
205 this.client = client;
206 this.name = name;
207 this.opt = opt || {};
208 if (this.opt.text) {
209 this._transform = function (chunk, encoding, done) {
210 done(null, JSON.stringify(chunk) + '\n');
211 };
212 } else {
213 this._transform = function (chunk, encoding, done) {
214 done(null, chunk);
215 };
216 }
217}
218util.inherits(ReadStream, stream.Transform);
219
220function Client(opt, callback) {
221 if (!(this instanceof Client))
222 return new Client(opt, callback);
223 events.EventEmitter.call(this);
224 var inBrowser = (typeof window != 'undefined');
225 opt = opt || {};
226 if (!opt.ws) opt.ws = inBrowser ? true : process.env.SKALE_WS;
227 if (!opt.host && !inBrowser) opt.host = process.env.SKALE_HOST;
228 if (!opt.port && !inBrowser) opt.port = process.env.SKALE_PORT;
229 if (!opt.debug && !inBrowser) opt.debug = process.env.SKALE_DEBUG;
230 if (!opt.access && !inBrowser) opt.access = process.env.SKALE_KEY;
231 opt.host = opt.host || 'localhost';
232 opt.port = opt.port || (opt.ws ? 12348 : 12346);
233 var self = this;
234 this.pending = {};
235 this.hostId = {};
236 this.topicId = {};
237 this.cid = 0;
238 if (opt.ws) {
239 this.sock = websocket('ws://' + opt.host + ':' + opt.port);
240 } else {
241 this.sock = net.connect(opt.port, opt.host);
242 this.sock.setNoDelay();
243 }
244 this.hostname = opt.data.hostname;
245 this.debug = opt.debug;
246 this.input = new FromGrid();
247 this.output = new ToGrid(opt.debug);
248 this.consumer = new Consumer(opt.debug);
249 this.consumer.client = this;
250 this.sock.pipe(this.input).pipe(this.consumer);
251 this.output.pipe(this.sock);
252 this.send(0, {cmd: 'connect', access: opt.access, data: opt.data}, function (err, data) {
253 var i, d;
254 if (err) throw err;
255 if (data) {
256 self.id = data.id;
257 self.uuid = data.uuid;
258 self.emit('connect', data);
259 if (data.devices) { // Cache remote ids
260 for (i = 0; i < data.devices.length; i++) {
261 d = data.devices[i];
262 self.hostId[d.uuid] = d.id;
263 }
264 }
265 }
266 if (callback) callback(err, data);
267 });
268 self.on('notify', function (msg) { // Cache remote id
269 self.hostId[msg.data.uuid] = msg.data.id;
270 });
271 this.sock.on('end', function () {
272 self.emit('close');
273 });
274 this.sock.on('close', function () {
275 self.emit('close');
276 });
277 this.sock.on('error', function (err) {
278 if (inBrowser) return;
279 self.emit('error', err);
280 });
281}
282util.inherits(Client, events.EventEmitter);
283
284Client.prototype._getId = function (uuid, nTry, msg, callback) {
285 var self = this;
286 this.send(0, {cmd: 'id', data: uuid}, function (err, res) {
287 if (res) {
288 msg.id = self.hostId[uuid] = res;
289 msg.from = self.id;
290 self.output.write(msg);
291 } else {
292 if (--nTry < 0) {
293 if (self.pending[msg.cid]) {
294 console.error('_getId failed');
295 self.pending[msg.cid]('_getIDd failed');
296 delete self.pending[msg.cid];
297 }
298 } else {
299 setTimeout(function () {
300 self._getId(uuid, nTry, msg, callback);
301 }, Math.floor(Math.random() * 2000));
302 }
303 }
304 });
305};
306
307Client.prototype.send = thenify(function (uuid, msg, callback) {
308 try {
309 msg.cid = this.cid++;
310 if (callback) this.pending[msg.cid] = callback;
311 msg.from = this.id;
312 if (uuid) {
313 if (this.hostId[uuid]) msg.id = this.hostId[uuid];
314 else return this._getId(uuid, 3, msg, callback);
315 }
316 this.output.write(msg);
317 } catch(err) {
318 throw new Error('send error');
319 }
320});
321
322Client.prototype.devices = thenify(function (o, callback) {
323 var self = this;
324 this.send(0, {cmd: 'devices', data: {query: o}}, function (err, dev) {
325 for (var i in dev)
326 self.hostId[dev[i].uuid] = dev[i].id;
327 callback(err, dev);
328 });
329});
330
331Client.prototype.get = thenify(function (uuid, callback) {
332 this.send(0, {cmd: 'get', data: uuid}, callback);
333});
334
335Client.prototype.notify = function (uuid) {
336 this.output.write({cmd: 'notify', data: uuid});
337};
338
339Client.prototype.subscribe = function (topic) {
340 this.output.write({cmd: 'subscribe', data: topic});
341 return this;
342};
343
344Client.prototype.unsubscribe = function (topic) {
345 this.output.write({cmd: 'unsubscribe', data: topic});
346};
347
348Client.prototype.publish = function (topic, content) {
349 if (!(topic in this.topicId)) {
350 var self = this;
351 this.send(0, {cmd: 'tid', data: topic}, function (err, res) {
352 if (err || res === undefined) return;
353 self.topicId[topic] = res;
354 self.output.write({cmd: topic, id: minMulticast + res, data: content});
355 });
356 } else
357 this.output.write({cmd: topic, id: minMulticast + this.topicId[topic], data: content});
358};
359
360Client.prototype.request = thenify(function (dest, data, callback) {
361 this.send(dest.uuid, {cmd: 'request', ufrom: this.uuid, data: data}, callback);
362});
363
364Client.prototype.reply = function (msg, error, data) {
365 if (msg.cmd !== 'request') {
366 throw new Error('wrong msg: ' + msg);
367 }
368 //console.assert(msg.cmd === 'request');
369 msg.cmd = 'reply';
370 msg.id = msg.from;
371 msg.ufrom = null;
372 msg.data = data;
373 msg.error = error;
374 this.output.write(msg);
375};
376
377Client.prototype.set = function (data) {
378 this.output.write({cmd: 'set', data: data});
379};
380
381Client.prototype.pipe = function (topic, stream) {
382 this.consumer.subscriber[topic] = stream;
383 return stream;
384};
385
386Client.prototype.createReadStream = function (name, opt) {
387 return new ReadStream(this, name, opt);
388};
389
390Client.prototype.createWriteStream = function (name, dest) {
391 return new WriteStream(this, name, dest);
392};
393
394Client.prototype.createStreamTo = function (msg) {
395 return new StreamToGrid(this.sock, msg.from, msg.streamid);
396};
397
398Client.prototype.createStreamFrom = function (client, msg) {
399 var sid, s, self = this;
400 for (sid = 0; sid < streamMaxId; sid++)
401 if (this.consumer.streams[sid] === undefined) break;
402 if (sid == streamMaxId) throw 'no more free streamid';
403 this.consumer.streams[sid] = s = new stream.PassThrough();
404 s.once('end', function () {self.consumer.streams[sid] = undefined;});
405 msg.streamid = sid;
406 this.send(client, msg);
407 return s;
408};
409
410//Client.prototype.createWriteStream = function (name, dest) {
411// return new PubStream(this, name, dest);
412//};
413
414Client.prototype.end = Client.prototype._end = function () {
415 this.output.end({cmd: 'end'});
416};
417
418module.exports = Client;
419module.exports.encode = encode;
420module.exports.bencode = bencode;
421module.exports.FromGrid = FromGrid;
422module.exports.minMulticast = minMulticast;