UNPKG

1.06 kBJavaScriptView Raw
1var inherits = require('util').inherits;
2var Readable = require('stream').Readable;
3var util = require('./util');
4
5// Stream of operations. Subscribe returns one of these
6function OpStream() {
7 Readable.call(this, {objectMode: true});
8 this.id = null;
9 this.open = true;
10}
11module.exports = OpStream;
12
13inherits(OpStream, Readable);
14
15// This function is for notifying us that the stream is empty and needs data.
16// For now, we'll just ignore the signal and assume the reader reads as fast
17// as we fill it. I could add a buffer in this function, but really I don't
18// think that is any better than the buffer implementation in nodejs streams
19// themselves.
20OpStream.prototype._read = util.doNothing;
21
22OpStream.prototype.pushData = function(data) {
23 // Ignore any messages after unsubscribe
24 if (!this.open) return;
25 // This data gets consumed in Agent#_subscribeToStream
26 this.push(data);
27};
28
29OpStream.prototype.destroy = function() {
30 // Only close stream once
31 if (!this.open) return;
32 this.open = false;
33
34 this.push(null);
35 this.emit('close');
36};