UNPKG

1.71 kBJavaScriptView Raw
1var Duplex = require('stream').Duplex;
2var inherits = require('util').inherits;
3var logger = require('./logger');
4var util = require('./util');
5
6function StreamSocket() {
7 this.readyState = 0;
8 this.stream = new ServerStream(this);
9}
10module.exports = StreamSocket;
11
12StreamSocket.prototype._open = function() {
13 if (this.readyState !== 0) return;
14 this.readyState = 1;
15 this.onopen();
16};
17StreamSocket.prototype.close = function(reason) {
18 if (this.readyState === 3) return;
19 this.readyState = 3;
20 // Signal data writing is complete. Emits the 'end' event
21 this.stream.push(null);
22 this.onclose(reason || 'closed');
23};
24StreamSocket.prototype.send = function(data) {
25 // Data is an object
26 this.stream.push(JSON.parse(data));
27};
28StreamSocket.prototype.onmessage = util.doNothing;
29StreamSocket.prototype.onclose = util.doNothing;
30StreamSocket.prototype.onerror = util.doNothing;
31StreamSocket.prototype.onopen = util.doNothing;
32
33
34function ServerStream(socket) {
35 Duplex.call(this, {objectMode: true});
36
37 this.socket = socket;
38
39 this.on('error', function(error) {
40 logger.warn('ShareDB client message stream error', error);
41 socket.close('stopped');
42 });
43
44 // The server ended the writable stream. Triggered by calling stream.end()
45 // in agent.close()
46 this.on('finish', function() {
47 socket.close('stopped');
48 });
49}
50inherits(ServerStream, Duplex);
51
52ServerStream.prototype.isServer = true;
53
54ServerStream.prototype._read = util.doNothing;
55
56ServerStream.prototype._write = function(chunk, encoding, callback) {
57 var socket = this.socket;
58 process.nextTick(function() {
59 if (socket.readyState !== 1) return;
60 socket.onmessage({data: JSON.stringify(chunk)});
61 callback();
62 });
63};