1 | var Duplex = require('stream').Duplex;
|
2 | var inherits = require('util').inherits;
|
3 | var logger = require('./logger');
|
4 | var util = require('./util');
|
5 |
|
6 | function StreamSocket() {
|
7 | this.readyState = 0;
|
8 | this.stream = new ServerStream(this);
|
9 | }
|
10 | module.exports = StreamSocket;
|
11 |
|
12 | StreamSocket.prototype._open = function() {
|
13 | if (this.readyState !== 0) return;
|
14 | this.readyState = 1;
|
15 | this.onopen();
|
16 | };
|
17 | StreamSocket.prototype.close = function(reason) {
|
18 | if (this.readyState === 3) return;
|
19 | this.readyState = 3;
|
20 |
|
21 | this.stream.push(null);
|
22 | this.onclose(reason || 'closed');
|
23 | };
|
24 | StreamSocket.prototype.send = function(data) {
|
25 |
|
26 | this.stream.push(JSON.parse(data));
|
27 | };
|
28 | StreamSocket.prototype.onmessage = util.doNothing;
|
29 | StreamSocket.prototype.onclose = util.doNothing;
|
30 | StreamSocket.prototype.onerror = util.doNothing;
|
31 | StreamSocket.prototype.onopen = util.doNothing;
|
32 |
|
33 |
|
34 | function ServerStream(socket) {
|
35 | Duplex.call(this, {objectMode: true});
|
36 |
|
37 | this.socket = socket;
|
38 |
|
39 | this.on('error', function(error) {
|
40 | logger.warn('ShareDB client message stream error', error);
|
41 | socket.close('stopped');
|
42 | });
|
43 |
|
44 |
|
45 |
|
46 | this.on('finish', function() {
|
47 | socket.close('stopped');
|
48 | });
|
49 | }
|
50 | inherits(ServerStream, Duplex);
|
51 |
|
52 | ServerStream.prototype.isServer = true;
|
53 |
|
54 | ServerStream.prototype._read = util.doNothing;
|
55 |
|
56 | ServerStream.prototype._write = function(chunk, encoding, callback) {
|
57 | var socket = this.socket;
|
58 | process.nextTick(function() {
|
59 | if (socket.readyState !== 1) return;
|
60 | socket.onmessage({data: JSON.stringify(chunk)});
|
61 | callback();
|
62 | });
|
63 | };
|