Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | 9x 9x 9x 9x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 2x 2x 2x 2x 6x 6x 2x 2x 2x 2x 2x 2x 2x 6x 6x 6x 2x 4x 1x 1x 1x 1x 1x 1x 9x | const {Readable} = require('stream');
const {Manager} = require('../manager/manager.js');
const uuid = require('uuid');
const {DataWrapper} = require('../wrapper/data_wrapper.js')
/**
* Hold flow from multiple source streams until a condition is met. If no method is overwitten, then holds flow until end is signaled on all sources.
*
* @extends Readable
*/
class FlowHold extends Readable {
/**
* Create a FlowHold stream
* @param {object} options Global options.
* @param {string} [options.name] Name for this stream.
* @param {Process} [options.hold] How messages are stored.
* @param {function} [options.check] Checks a certain condition is met and call _options.release_.
* @param {function} [options.release] Controls how messages are released.
*/
constructor(options) {
options = {...options, objectMode: true};
super(options);
this.options = options;
/** @property Source streams */
this._sources = {};
/** @property Stored messages. There is an array for each source */
this._payloads = {};
this._parents = [];
this._readableState.sync = false;
this.type = 'FlowHold';
Iif(this.options.name) Manager.set(this.type, this);
Iif(this.options.hold && typeof this.options.hold == 'function') this._hold = this.options.hold;
Iif(this.options.release && typeof this.options.release == 'function') this._release = this.options.release;
Iif(this.options.check && typeof this.options.check == 'function') this._check = this.options.check;
this.on('pipe', (src)=>{
const id = this.goal || (src.options && src.options.name)?src.options.name:uuid.v4();
Iif(src.options && src.options.port) {
id += '/'+src.options.port;
}
this._sources[id] = {src, ended: false};
src.on('data', (data)=>{
//console.log('hay data');
Iif(this.goal) {
// Is goal player
this._parents.push(data);
}
this._hold({
id,
data
});
});
src.on('end', () => {
this._sources[id].ended = true;
this._check();
});
this.on('pause', ()=>{
src.pause();
});
this.on('resume', ()=>{
src.resume();
});
});
}
_hold(payload) {
if(!this._payloads[payload.id]) this._payloads[payload.id] = [];
Iif(this.goal) {
// Is goal player
// means payload.data is a DataWrapper
payload.data = payload.data.data;
}
this._payloads[payload.id].push(payload.data);
}
_check() {
for(let i in this._sources) {
if(!this._sources[i].ended) return;
}
this._release();
}
_release() {
let payload = this._payloads;
Iif(Object.keys(payload).length === 1) {
for(let i in payload) {
payload = payload[i];
}
}
Iif(this.goal) {
// Is goal player
payload = new DataWrapper(null, this).setParents(this._parents, payload);
}
this.push(payload);
this.push(null);
}
write(payload) {
}
end() {}
_read() {}
}
module.exports.FlowHold = FlowHold; |