Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | 9x 9x 9x 9x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 2x 2x 2x 2x 4x 2x 2x 2x 2x 4x 4x 4x 2x 2x 2x 2x 2x 2x 4x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 4x 2x 9x | const {Readable} = require('stream');
const {Manager} = require('../manager/manager.js');
const uuid = require('uuid');
const {DataWrapper} = require('../wrapper/data_wrapper.js')
/**
* Join messages from multiple sources into a single messages. If no method is overwitten, then releases a message when there is exactly one message from each source.
*
* @extends Readable
*/
class FlowJoin extends Readable {
/**
* Create a FlowJoin stream
* @param {object} options Global options.
* @param {string} [options.name] Name for this stream.
* @param {Process} [options.join] How messages are joined.
*/
constructor(options) {
options = {...options, objectMode: true};
super(options);
this.options = options;
this._sources = {};
this._payloads = [];
this._readableState.sync = false;
this.type = 'FlowJoin';
Iif(this.options.name) Manager.set(this.type, this);
Iif(this.options.join && typeof this.options.join == 'function') this._join = this.options.join;
this.on('pipe', (src)=>{
const id = this.goal || (src.options && src.options.name)?src.options.name:uuid.v4();
Iif(src.options && src.options.port) {
id += '/'+src.options.port;
}
this._sources[id] = src;
src.on('data', (data)=>{
//console.log('hay data');
this._join({
id,
data
});
});
//src.on('end', () => this.end(src));
this.on('pause', ()=>{
src.pause();
});
this.on('resume', ()=>{
src.resume();
});
});
}
// none(dst) {
// if(!this._none_reader) {
// this._none_reader = new Readable({
// objectMode: true,
// read() {}
// });
// this._none_reader._readableState.sync = false;
// }
// const pipes = Array.isArray(dst)?dst:[dst];
// pipes.forEach(dst=>this._none_reader.pipe(dst));
// return Array.isArray(dst)?this:dst;
// }
_join(data) {
var found = false;
var doSend = false;
for(let i = 0; i < this._payloads.length; i++) {
let payload = this._payloads[i];
Iif(payload.hasOwnProperty(data.id)) continue;
found=true;
payload[data.id] = data.data;
doSend = Object.keys(payload).length == Object.keys(this._sources).length;
break;
}
if(doSend) {
let payload = this._payloads.shift();
Iif(this.goal) {
// Is goal player
payload = new DataWrapper(null, this).setParents(payload);
}
Iif(!this.push(payload)) this.pause();
return;
}
Eif(!found) {
let payload = {};
payload[data.id] = data.data;
this._payloads.push(payload);
}
}
_read() {
this.resume();
}
write(payload) {
}
end(src) {
for(let i in this._sources) {
Iif(!this._sources[i]._readableState.ended) return;
}
this.push(null);
}
}
module.exports.FlowJoin = FlowJoin; |