1 | ;
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | var index_1 = require("../index");
|
4 | var FCAMIL = /** @class */ (function () {
|
5 | function FCAMIL(out, op) {
|
6 | this.out = out;
|
7 | this.op = op;
|
8 | }
|
9 | FCAMIL.prototype._n = function (t) {
|
10 | this.out._n(t);
|
11 | };
|
12 | FCAMIL.prototype._e = function (err) {
|
13 | this.out._e(err);
|
14 | };
|
15 | FCAMIL.prototype._c = function () {
|
16 | this.op.less();
|
17 | };
|
18 | return FCAMIL;
|
19 | }());
|
20 | var FlattenConcAMOperator = /** @class */ (function () {
|
21 | function FlattenConcAMOperator(n, ins) {
|
22 | this.n = n;
|
23 | this.ins = ins;
|
24 | this.type = 'flattenConcurrentlyAtMost';
|
25 | this.out = null;
|
26 | this._l = 0;
|
27 | this._d = false;
|
28 | this._seq = [];
|
29 | }
|
30 | FlattenConcAMOperator.prototype._start = function (out) {
|
31 | this.out = out;
|
32 | this.ins._add(this);
|
33 | };
|
34 | FlattenConcAMOperator.prototype._stop = function () {
|
35 | this.ins._remove(this);
|
36 | this._l = 0;
|
37 | this.out = null;
|
38 | this._seq = [];
|
39 | };
|
40 | FlattenConcAMOperator.prototype.less = function () {
|
41 | var seq = this._seq;
|
42 | if (--this._l === 0 && seq.length === 0 && this._d) {
|
43 | var u = this.out;
|
44 | if (!u)
|
45 | return;
|
46 | u._c();
|
47 | }
|
48 | if (this._l < this.n && seq.length > 0) {
|
49 | this._n(seq.shift());
|
50 | }
|
51 | };
|
52 | FlattenConcAMOperator.prototype._n = function (s) {
|
53 | var u = this.out;
|
54 | if (!u)
|
55 | return;
|
56 | if (this._l < this.n) {
|
57 | this._l++;
|
58 | s._add(new FCAMIL(u, this));
|
59 | }
|
60 | else {
|
61 | this._seq.push(s);
|
62 | }
|
63 | };
|
64 | FlattenConcAMOperator.prototype._e = function (err) {
|
65 | var u = this.out;
|
66 | if (!u)
|
67 | return;
|
68 | u._e(err);
|
69 | };
|
70 | FlattenConcAMOperator.prototype._c = function () {
|
71 | var seq = this._seq;
|
72 | this._d = true;
|
73 | if (this._l === 0 && seq.length === 0) {
|
74 | var u = this.out;
|
75 | if (!u)
|
76 | return;
|
77 | u._c();
|
78 | }
|
79 | };
|
80 | return FlattenConcAMOperator;
|
81 | }());
|
82 | exports.FlattenConcAMOperator = FlattenConcAMOperator;
|
83 | /**
|
84 | * Flattens a "stream of streams", handling multiple concurrent nested streams
|
85 | * simultaneously, up to some limit `n`.
|
86 | *
|
87 | * If the input stream is a stream that emits streams, then this operator will
|
88 | * return an output stream which is a flat stream: emits regular events. The
|
89 | * flattening happens concurrently, up to the configured limit. It works like
|
90 | * this: when the input stream emits a nested stream,
|
91 | * *flattenConcurrentlyAtMost* will start imitating that nested one. When the
|
92 | * next nested stream is emitted on the input stream,
|
93 | * *flattenConcurrentlyAtMost* will check to see how many streams it is connected
|
94 | * to. If it is connected to a number of streams less than the limit, it will also
|
95 | * imitate that new one, but will continue to imitate the previous nested streams
|
96 | * as well.
|
97 | *
|
98 | * If the limit has already been reached, *flattenConcurrentlyAtMost* will put the
|
99 | * stream in a queue. When any of the streams it is listening to completes, a stream
|
100 | * is taken out of the queue and `flattenConcurrentlyAtMost` will connect to it.
|
101 | *
|
102 | * This process continues until the metastream completes and there are no more
|
103 | * connected streams or streams in the queue.
|
104 | *
|
105 | * Marble diagrams:
|
106 | *
|
107 | * ```text
|
108 | * --+--------+---------------
|
109 | * \ \
|
110 | * \ ----1----2---3--|
|
111 | * --a--b----c----|
|
112 | * flattenConcurrentlyAtMost(1)
|
113 | * -----a--b----c-1----2---3--|
|
114 | * ```
|
115 | *
|
116 | * ```text
|
117 | * --+---+---+-|
|
118 | * \ \ \
|
119 | * \ \ ---fgh----i-----jh--|
|
120 | * \ -----1----2----3--|
|
121 | * ---a--b-----c--|
|
122 | * flattenConcurrentlyAtMost(2)
|
123 | * ---------a--b-1---c2--i-3------fgh----i-----jh--|
|
124 | * ```
|
125 | *
|
126 | * @return {Stream}
|
127 | */
|
128 | function flattenConcurrentlyAtMost(n) {
|
129 | return function flattenConcAMOperator(ins) {
|
130 | return new index_1.Stream(new FlattenConcAMOperator(n, ins));
|
131 | };
|
132 | }
|
133 | exports.default = flattenConcurrentlyAtMost;
|
134 | //# sourceMappingURL=data:application/json;base64,{"version":3,"file":"flattenConcurrentlyAtMost.js","sourceRoot":"","sources":["../src/extra/flattenConcurrentlyAtMost.ts"],"names":[],"mappings":";;AAAA,kCAAuF;AAEvF;IACE,gBAAmB,GAAc,EACb,EAA4B;QAD7B,QAAG,GAAH,GAAG,CAAW;QACb,OAAE,GAAF,EAAE,CAA0B;IAChD,CAAC;IAED,mBAAE,GAAF,UAAG,CAAI;QACL,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,CAAC,CAAC,CAAC;IACjB,CAAC;IAED,mBAAE,GAAF,UAAG,GAAQ;QACT,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,GAAG,CAAC,CAAC;IACnB,CAAC;IAED,mBAAE,GAAF;QACE,IAAI,CAAC,EAAE,CAAC,IAAI,EAAE,CAAC;IACjB,CAAC;IACH,aAAC;AAAD,CAAC,AAhBD,IAgBC;AAED;IAOE,+BAAmB,CAAS,EAAS,GAAsB;QAAxC,MAAC,GAAD,CAAC,CAAQ;QAAS,QAAG,GAAH,GAAG,CAAmB;QANpD,SAAI,GAAG,2BAA2B,CAAC;QACnC,QAAG,GAAc,IAAW,CAAC;QAC5B,OAAE,GAAW,CAAC,CAAC;QACf,OAAE,GAAY,KAAK,CAAC;QACpB,SAAI,GAAqB,EAAE,CAAC;IAGpC,CAAC;IAED,sCAAM,GAAN,UAAO,GAAc;QACnB,IAAI,CAAC,GAAG,GAAG,GAAG,CAAC;QACf,IAAI,CAAC,GAAG,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC;IACtB,CAAC;IAED,qCAAK,GAAL;QACE,IAAI,CAAC,GAAG,CAAC,OAAO,CAAC,IAAI,CAAC,CAAC;QACvB,IAAI,CAAC,EAAE,GAAG,CAAC,CAAC;QACZ,IAAI,CAAC,GAAG,GAAG,IAAW,CAAC;QACvB,IAAI,CAAC,IAAI,GAAG,EAAE,CAAC;IACjB,CAAC;IAED,oCAAI,GAAJ;QACE,IAAM,GAAG,GAAG,IAAI,CAAC,IAAI,CAAC;QACtB,IAAI,EAAE,IAAI,CAAC,EAAE,KAAK,CAAC,IAAI,GAAG,CAAC,MAAM,KAAK,CAAC,IAAI,IAAI,CAAC,EAAE,EAAE;YAClD,IAAM,CAAC,GAAG,IAAI,CAAC,GAAG,CAAC;YACnB,IAAI,CAAC,CAAC;gBAAE,OAAO;YACf,CAAC,CAAC,EAAE,EAAE,CAAC;SACR;QACD,IAAI,IAAI,CAAC,EAAE,GAAG,IAAI,CAAC,CAAC,IAAI,GAAG,CAAC,MAAM,GAAG,CAAC,EAAE;YACtC,IAAI,CAAC,EAAE,CAAC,GAAG,CAAC,KAAK,EAAe,CAAC,CAAC;SACnC;IACH,CAAC;IAED,kCAAE,GAAF,UAAG,CAAY;QACb,IAAM,CAAC,GAAG,IAAI,CAAC,GAAG,CAAC;QACnB,IAAI,CAAC,CAAC;YAAE,OAAO;QACf,IAAI,IAAI,CAAC,EAAE,GAAG,IAAI,CAAC,CAAC,EAAE;YACpB,IAAI,CAAC,EAAE,EAAE,CAAC;YACV,CAAC,CAAC,IAAI,CAAC,IAAI,MAAM,CAAC,CAAC,EAAE,IAAI,CAAC,CAAC,CAAC;SAC7B;aAAM;YACL,IAAI,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC,CAAC,CAAC;SACnB;IACH,CAAC;IAED,kCAAE,GAAF,UAAG,GAAQ;QACT,IAAM,CAAC,GAAG,IAAI,CAAC,GAAG,CAAC;QACnB,IAAI,CAAC,CAAC;YAAE,OAAO;QACf,CAAC,CAAC,EAAE,CAAC,GAAG,CAAC,CAAC;IACZ,CAAC;IAED,kCAAE,GAAF;QACE,IAAM,GAAG,GAAG,IAAI,CAAC,IAAI,CAAC;QACtB,IAAI,CAAC,EAAE,GAAG,IAAI,CAAC;QACf,IAAI,IAAI,CAAC,EAAE,KAAK,CAAC,IAAI,GAAG,CAAC,MAAM,KAAK,CAAC,EAAE;YACrC,IAAM,CAAC,GAAG,IAAI,CAAC,GAAG,CAAC;YACnB,IAAI,CAAC,CAAC;gBAAE,OAAO;YACf,CAAC,CAAC,EAAE,EAAE,CAAC;SACR;IACH,CAAC;IACH,4BAAC;AAAD,CAAC,AA5DD,IA4DC;AA5DY,sDAAqB;AA8DlC;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;GA4CG;AACH,SAAwB,yBAAyB,CAAI,CAAS;IAC5D,OAAO,SAAS,qBAAqB,CAAC,GAAwC;QAC5E,OAAO,IAAI,cAAM,CAAI,IAAI,qBAAqB,CAAC,CAAC,EAAE,GAAG,CAAC,CAAC,CAAC;IAC1D,CAAC,CAAC;AACJ,CAAC;AAJD,4CAIC","sourcesContent":["import { Operator, Stream, MemoryStream, OutSender, InternalListener } from '../index';\n\nclass FCAMIL<T> implements InternalListener<T>, OutSender<T> {\n  constructor(public out: Stream<T>,\n              private op: FlattenConcAMOperator<T>) {\n  }\n\n  _n(t: T) {\n    this.out._n(t);\n  }\n\n  _e(err: any) {\n    this.out._e(err);\n  }\n\n  _c() {\n    this.op.less();\n  }\n}\n\nexport class FlattenConcAMOperator<T> implements Operator<Stream<T>, T> {\n  public type = 'flattenConcurrentlyAtMost';\n  public out: Stream<T> = null as any;\n  private _l: number = 0;\n  private _d: boolean = false;\n  private _seq: Array<Stream<T>> = [];\n\n  constructor(public n: number, public ins: Stream<Stream<T>>) {\n  }\n\n  _start(out: Stream<T>): void {\n    this.out = out;\n    this.ins._add(this);\n  }\n\n  _stop(): void {\n    this.ins._remove(this);\n    this._l = 0;\n    this.out = null as any;\n    this._seq = [];\n  }\n\n  less(): void {\n    const seq = this._seq;\n    if (--this._l === 0 && seq.length === 0 && this._d) {\n      const u = this.out;\n      if (!u) return;\n      u._c();\n    }\n    if (this._l < this.n && seq.length > 0) {\n      this._n(seq.shift() as Stream<T>);\n    }\n  }\n\n  _n(s: Stream<T>) {\n    const u = this.out;\n    if (!u) return;\n    if (this._l < this.n) {\n      this._l++;\n      s._add(new FCAMIL(u, this));\n    } else {\n      this._seq.push(s);\n    }\n  }\n\n  _e(err: any) {\n    const u = this.out;\n    if (!u) return;\n    u._e(err);\n  }\n\n  _c() {\n    const seq = this._seq;\n    this._d = true;\n    if (this._l === 0 && seq.length === 0) {\n      const u = this.out;\n      if (!u) return;\n      u._c();\n    }\n  }\n}\n\n/**\n * Flattens a \"stream of streams\", handling multiple concurrent nested streams\n * simultaneously, up to some limit `n`.\n *\n * If the input stream is a stream that emits streams, then this operator will\n * return an output stream which is a flat stream: emits regular events. The\n * flattening happens concurrently, up to the configured limit. It works like\n * this: when the input stream emits a nested stream,\n * *flattenConcurrentlyAtMost* will start imitating that nested one. When the\n * next nested stream is emitted on the input stream,\n * *flattenConcurrentlyAtMost* will check to see how many streams it is connected\n * to. If it is connected to a number of streams less than the limit, it will also\n * imitate that new one, but will continue to imitate the previous nested streams\n * as well.\n *\n * If the limit has already been reached, *flattenConcurrentlyAtMost* will put the\n * stream in a queue. When any of the streams it is listening to completes, a stream\n * is taken out of the queue and `flattenConcurrentlyAtMost` will connect to it.\n *\n * This process continues until the metastream completes and there are no more\n * connected streams or streams in the queue.\n *\n * Marble diagrams:\n *\n * ```text\n * --+--------+---------------\n *   \\        \\\n *    \\       ----1----2---3--|\n *    --a--b----c----|\n *     flattenConcurrentlyAtMost(1)\n * -----a--b----c-1----2---3--|\n * ```\n *\n * ```text\n * --+---+---+-|\n *    \\   \\   \\\n *     \\   \\   ---fgh----i-----jh--|\n *      \\   -----1----2----3--|\n *       ---a--b-----c--|\n *     flattenConcurrentlyAtMost(2)\n * ---------a--b-1---c2--i-3------fgh----i-----jh--|\n * ```\n *\n * @return {Stream}\n */\nexport default function flattenConcurrentlyAtMost<T>(n: number): (ins: Stream<Stream<T> | MemoryStream<T>>) => Stream<T> {\n  return function flattenConcAMOperator(ins: Stream<Stream<T> | MemoryStream<T>>) {\n    return new Stream<T>(new FlattenConcAMOperator(n, ins));\n  };\n}\n"]} |
\ | No newline at end of file |