All files / lib/flows first.js

78.05% Statements 32/41
55.56% Branches 10/18
76.92% Functions 10/13
88.57% Lines 31/35

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 889x 9x 9x 9x                                   1x 1x 1x   1x 1x 1x 1x 1x 1x 1x     1x               1x 3x   7x   3x 3x   3x 3x                   7x       7x 7x 7x 3x 3x                 1x       4x         9x
const {Readable} = require('stream');
const {Manager} = require('../manager/manager.js');
const uuid = require('uuid');
const {DataWrapper} = require('../wrapper/data_wrapper.js')
 
/**
 * Flow the first message from multiple source streams and discard the rest.
 * 
 * @extends Readable
 */
class FlowFirst extends Readable {
 
    /** 
     * Create a FlowFirst stream
     * @param {object} options Global options
     * @param {string} [options.name] Name for this stream
     * @param {Identify} options.identify Function that returns a message id to match
     * @param {Condition} [options.criteria] Add an extra criteria for message to match. Return false to discard the message.
     * @param {Process} [options.match] How messages are matched according to _options.identify_ function.
     */
    constructor(options) {
        options = {...options, objectMode: true};
        super(options);
        this.options = options;
        /** @property Source streams */
        this._sources = {};
        this._readableState.sync = false;
        this.type = 'FlowFirst';
        Iif(this.options.name) Manager.set(this.type, this);
        Iif(this.options.match && typeof this.options.match == 'function') this._match = this.options.match;
        Eif(this.options.identify && typeof this.options.identify == 'function') this._identify = this.options.identify;
        this._identities={};
 
        //Delete used identities every 5 seconds
        setInterval(()=>{
            Object.keys(this._identities).forEach(key => {
                const ts = Date.now() - 1000;
                if(this._identities[key] < ts) delete this._identities[key];
            });
        }, 5000);
 
 
        this.on('pipe', (src)=>{
            src.on('data', (data)=>{
                //console.log('hay data');
                this._match(data);
            });
            this.on('pause', ()=>{
                src.pause();
            });
            this.on('resume', ()=>{
                src.resume();
            });
        });
    }
 
    _identify(data) {
        throw 'METHOD NOT IMPLEMENTED _identify()';
    }
 
    _criteria(data) {
        return true;
    }
 
    _match(data) {
        Iif(!this._criteria(data)) return;
        const id = this._identify(data);
        if(!this._identities[id]) {
            this._identities[id] = Date.now();
            Iif(!this.push(data)) this.pause();
        }
    }
 
    write(payload) {
 
    }
 
    end() {
        this.push(null);
    }
 
    _read() {
        this.resume();
    }
 
}
 
module.exports.FlowFirst = FlowFirst;