All files / lib/rules rules.js

4.04% Statements 4/99
0% Branches 0/80
0% Functions 0/15
4.4% Lines 4/91

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 1939x 9x 9x                                                                                                                                                                                                                                                                                                                                                                                           9x
const {Writable, Readable} = require('stream');
const {Manager} = require('../manager/manager.js');
const { DataWrapper } = require('../wrapper/data_wrapper.js');
 
/**
 * A rule represents a piece of business logic. Streams can be chained by an arbitrary event name, or by resolve/reject functions.
 * @extends Writable
 */
class Rule extends Writable {
 
    /** 
     * Create a Rule stream
     * @param {object} [options] Global options
     * @param {string} [options.name] Name for this stream
     * @param {RuleProcess} options.rule Method that defines the rule logic. 
     * 
     * If inside the rule you emit an event with data, it will be sent to streams attached to the corresponding event on chain method. 
     * 
     * If this function returns exactly _true_, incoming payload will be sent to streams attached in _resolve_ function. 
     * 
     * If this function returns exactly _false_, incoming payload will be sent to streams attached in _reject_ function.
     * 
     * If this function returns _undefined_, no message will be sent to either streams attached to _resolve_ or _reject_.
     * 
     * If anything else is returned, or an exception is thrown, that will be sent to streams attached in _reject_ function.
     */
    constructor(options) {
        options = {...options, objectMode: true};
        super(options);
        this.options = options;
        if(this.options.rule && typeof this.options.rule == 'function') this._rule = this.options.rule;
        this._chains = {};
        this._resolve_reader = null;
        this._reject_reader = null;
        this._corking_readers = {};
        this.type = 'Rule';
        this._needsEnd = false;
        if(this.options.name) Manager.set(this.type, this);
    }
 
    /**
     * Register streams to pipe to when _event_ is fired.
     * @param {string} event event name.
     * @param {Writable} dst stream to pipe to.
     */
    chain(event, dst) {
        if(!this._chains[event+""]) {
            const self = this;
            this._chains[event+""] = new Readable({
                objectMode: true,
                read() {
                    if(self._corking_readers[event+""]) {
                        self.uncork();
                        delete self._corking_readers[event+""];
                    }
                }
            });
            this._chains[event+""].options = {...this._chains[event+""].options||{}, name: this.options.name, port: event};
            this._chains[event+""]._readableState.sync = false;
            this.on(event+"", (data) => {
                let goOn = this._chains[event+""].push(data);
                if(!goOn && !self._corking_readers[event+""]) {
                    this.cork();
                    self._corking_readers[event+""] = true;
                }
            });
            this._chains[event+""].on('resume', ()=>this.uncork());
        }
 
        const pipes = Array.isArray(dst)?dst:[dst];
        pipes.forEach(dst=>{
            this._chains[event+""].pipe(dst);
            dst.emit && dst.emit('chained', {event, src: this});
        });
        return Array.isArray(dst)?this:dst;
    }
 
    /**
     * Register streams to pipe to when _options.rule_ returns _true_
     * @param {Writable} dst stream to pipe to
     */
    resolve(dst) {
        if(!this._resolve_reader) {
            const self = this;
            this._resolve_reader = new Readable({
                objectMode: true,
                read() {
                    if(self._corking_readers["resolve"]) {
                        self.uncork();
                        delete self._corking_readers["resolve"];
                    }
                }
            });
            this._resolve_reader.options = {...this._resolve_reader.options||{}, name: this.options.name, port: 'resolve'};
            this._resolve_reader._readableState.sync = false;
        }
 
        const pipes = Array.isArray(dst)?dst:[dst];
        pipes.forEach(dst=>this._resolve_reader.pipe(dst));
        return Array.isArray(dst)?this:dst;
    }
 
    /**
     * Register streams to pipe to when _options.rule_ returns anything else __but__ _true_, or when an exception is thrown.
     * @param {Writable} dst stream to pipe to
     */
    reject(dst) {
        if(!this._reject_reader) {
            const self = this;
            this._reject_reader = new Readable({
                objectMode: true,
                read() {
                    if(self._corking_readers["reject"]) {
                        self.uncork();
                        delete self._corking_readers["reject"];
                    }
                }
            });
            this._reject_reader.options = {...this._reject_reader.options||{}, name: this.options.name, port: 'reject'};
            this._reject_reader._readableState.sync = false;
        }
 
        const pipes = Array.isArray(dst)?dst:[dst];
        pipes.forEach(dst=>this._reject_reader.pipe(dst));
        return Array.isArray(dst)?this:dst;
    }
 
    _rule(data) {
        throw 'METHOD NOT IMPLEMENTED _rule()';
    }
 
    _write(payload, encoding, cb) {
        try {throw "a";} catch(e) {}
        var result;
        if(this.goal) {
            payload = payload.getChild(this);
        }
        try {
            this._rulling = true;
            result = this._rule(payload);
            this._rulling = false;
            if(this._needsEnd) {
                for(let i in this._chains) {
                    this._chains[i].push(null);
                }
                this._reject_reader && this._reject_reader.push(null);
                this._resolve_reader && this._resolve_reader.push(null);
            }
        } catch(e) {
            result = e;
        }
 
        if(result !== undefined) {
 
            if(this._resolve_reader && result === true) {
                let goOn = this._resolve_reader.push(payload);
                if(!goOn && !this._corking_readers['resolve']) {
                    this.cork();
                    this._corking_readers['resolve']=true;
                }
            }
    
            if(result !== true && this._reject_reader) {
                payload = result===false?payload:result;
                if(!payload instanceof DataWrapper && this.goal) payload = new DataWrapper(payload, this); 
                let goOn = this._reject_reader.push(payload);
                if(!goOn && !this._corking_readers['reject']) {
                    this.cork();
                    this._corking_readers['reject']=true;
                }
            }
        }
 
        cb();
    }
 
    end() {
        try {throw "a";} catch(e) {}
        if(this._rulling) {
            this._needsEnd = true;
        } else {
            for(let i in this._chains) {
                this._chains[i].push(null);
            }
            this._reject_reader && this._reject_reader.push(null);
            this._resolve_reader && this._resolve_reader.push(null);
        }
    }
 
}
 
 
module.exports.Rule = Rule;