All files / lib/flows wait.js

88.89% Statements 24/27
62.5% Branches 5/8
100% Functions 8/8
95.65% Lines 22/23

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 579x 9x 9x                             1x 1x 1x 1x 1x 1x 1x   1x 2x   2x 2x   2x 2x           2x       2x       4x 1x       3x         9x
const {Readable} = require('stream');
const {Manager} = require('../manager/manager.js');
const {DataWrapper} = require('../wrapper/data_wrapper.js')
 
/**
 * Wait untill every source ended to emit end event.
 * 
 * @extends Readable
 */
class FlowWait extends Readable {
 
    /** 
     * Create a FlowWait stream
     * @param {object} options Global options.
     * @param {string} [options.name] Name for this stream.
    */
    constructor(options) {
        options = {...options, objectMode: true};
        super(options);
        this.options = options;
        this._sources = [];
        this._readableState.sync = false;
        this.type = 'FlowWait';
        Iif(this.options.name) Manager.set(this.type, this);
 
        this.on('pipe', (src)=>{
            this._sources.push(src);
 
            this.on('pause', ()=>{
                src.pause();
            });
            this.on('resume', ()=>{
                src.resume();
            });
        });
    }
 
    write(payload) {
        Iif(this.goal) {
            // Is goal player
            payload = new DataWrapper(null, this).setParents(payload);
        }
        Iif(!this.push(payload)) this.pause();
    }
 
    end(src) {
        if(this._sources.find(src => !src._readableState.ended)) return;
        this.push(null);
    }
 
    _read() {
        this.resume();
    }
 
}
 
module.exports.FlowWait = FlowWait;