UNPKG

17 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3var index_1 = require("../index");
4var NO = {};
5var SampleCombineListener = /** @class */ (function () {
6 function SampleCombineListener(i, p) {
7 this.i = i;
8 this.p = p;
9 p.ils[i] = this;
10 }
11 SampleCombineListener.prototype._n = function (t) {
12 var p = this.p;
13 if (p.out === NO)
14 return;
15 p.up(t, this.i);
16 };
17 SampleCombineListener.prototype._e = function (err) {
18 this.p._e(err);
19 };
20 SampleCombineListener.prototype._c = function () {
21 this.p.down(this.i, this);
22 };
23 return SampleCombineListener;
24}());
25exports.SampleCombineListener = SampleCombineListener;
26var SampleCombineOperator = /** @class */ (function () {
27 function SampleCombineOperator(ins, streams) {
28 this.type = 'sampleCombine';
29 this.ins = ins;
30 this.others = streams;
31 this.out = NO;
32 this.ils = [];
33 this.Nn = 0;
34 this.vals = [];
35 }
36 SampleCombineOperator.prototype._start = function (out) {
37 this.out = out;
38 var s = this.others;
39 var n = this.Nn = s.length;
40 var vals = this.vals = new Array(n);
41 for (var i = 0; i < n; i++) {
42 vals[i] = NO;
43 s[i]._add(new SampleCombineListener(i, this));
44 }
45 this.ins._add(this);
46 };
47 SampleCombineOperator.prototype._stop = function () {
48 var s = this.others;
49 var n = s.length;
50 var ils = this.ils;
51 this.ins._remove(this);
52 for (var i = 0; i < n; i++) {
53 s[i]._remove(ils[i]);
54 }
55 this.out = NO;
56 this.vals = [];
57 this.ils = [];
58 };
59 SampleCombineOperator.prototype._n = function (t) {
60 var out = this.out;
61 if (out === NO)
62 return;
63 if (this.Nn > 0)
64 return;
65 out._n([t].concat(this.vals));
66 };
67 SampleCombineOperator.prototype._e = function (err) {
68 var out = this.out;
69 if (out === NO)
70 return;
71 out._e(err);
72 };
73 SampleCombineOperator.prototype._c = function () {
74 var out = this.out;
75 if (out === NO)
76 return;
77 out._c();
78 };
79 SampleCombineOperator.prototype.up = function (t, i) {
80 var v = this.vals[i];
81 if (this.Nn > 0 && v === NO) {
82 this.Nn--;
83 }
84 this.vals[i] = t;
85 };
86 SampleCombineOperator.prototype.down = function (i, l) {
87 this.others[i]._remove(l);
88 };
89 return SampleCombineOperator;
90}());
91exports.SampleCombineOperator = SampleCombineOperator;
92var sampleCombine;
93/**
94 *
95 * Combines a source stream with multiple other streams. The result stream
96 * will emit the latest events from all input streams, but only when the
97 * source stream emits.
98 *
99 * If the source, or any input stream, throws an error, the result stream
100 * will propagate the error. If any input streams end, their final emitted
101 * value will remain in the array of any subsequent events from the result
102 * stream.
103 *
104 * The result stream will only complete upon completion of the source stream.
105 *
106 * Marble diagram:
107 *
108 * ```text
109 * --1----2-----3--------4--- (source)
110 * ----a-----b-----c--d------ (other)
111 * sampleCombine
112 * -------2a----3b-------4d--
113 * ```
114 *
115 * Examples:
116 *
117 * ```js
118 * import sampleCombine from 'xstream/extra/sampleCombine'
119 * import xs from 'xstream'
120 *
121 * const sampler = xs.periodic(1000).take(3)
122 * const other = xs.periodic(100)
123 *
124 * const stream = sampler.compose(sampleCombine(other))
125 *
126 * stream.addListener({
127 * next: i => console.log(i),
128 * error: err => console.error(err),
129 * complete: () => console.log('completed')
130 * })
131 * ```
132 *
133 * ```text
134 * > [0, 8]
135 * > [1, 18]
136 * > [2, 28]
137 * ```
138 *
139 * ```js
140 * import sampleCombine from 'xstream/extra/sampleCombine'
141 * import xs from 'xstream'
142 *
143 * const sampler = xs.periodic(1000).take(3)
144 * const other = xs.periodic(100).take(2)
145 *
146 * const stream = sampler.compose(sampleCombine(other))
147 *
148 * stream.addListener({
149 * next: i => console.log(i),
150 * error: err => console.error(err),
151 * complete: () => console.log('completed')
152 * })
153 * ```
154 *
155 * ```text
156 * > [0, 1]
157 * > [1, 1]
158 * > [2, 1]
159 * ```
160 *
161 * @param {...Stream} streams One or more streams to combine with the sampler
162 * stream.
163 * @return {Stream}
164 */
165sampleCombine = function sampleCombine() {
166 var streams = [];
167 for (var _i = 0; _i < arguments.length; _i++) {
168 streams[_i] = arguments[_i];
169 }
170 return function sampleCombineOperator(sampler) {
171 return new index_1.Stream(new SampleCombineOperator(sampler, streams));
172 };
173};
174exports.default = sampleCombine;
175//# sourceMappingURL=data:application/json;base64,{"version":3,"file":"sampleCombine.js","sourceRoot":"","sources":["../src/extra/sampleCombine.ts"],"names":[],"mappings":";;AAAA,kCAA4D;AAkD5D,IAAM,EAAE,GAAG,EAAE,CAAC;AAEd;IACE,+BAAoB,CAAS,EAAU,CAA6B;QAAhD,MAAC,GAAD,CAAC,CAAQ;QAAU,MAAC,GAAD,CAAC,CAA4B;QAClE,CAAC,CAAC,GAAG,CAAC,CAAC,CAAC,GAAG,IAAI,CAAC;IAClB,CAAC;IAED,kCAAE,GAAF,UAAG,CAAI;QACL,IAAM,CAAC,GAAG,IAAI,CAAC,CAAC,CAAC;QACjB,IAAI,CAAC,CAAC,GAAG,KAAK,EAAE;YAAE,OAAO;QACzB,CAAC,CAAC,EAAE,CAAC,CAAC,EAAE,IAAI,CAAC,CAAC,CAAC,CAAC;IAClB,CAAC;IAED,kCAAE,GAAF,UAAG,GAAQ;QACT,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,GAAG,CAAC,CAAC;IACjB,CAAC;IAED,kCAAE,GAAF;QACE,IAAI,CAAC,CAAC,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC,EAAE,IAAI,CAAC,CAAC;IAC5B,CAAC;IACH,4BAAC;AAAD,CAAC,AAlBD,IAkBC;AAlBY,sDAAqB;AAoBlC;IASE,+BAAY,GAAc,EAAE,OAA2B;QARhD,SAAI,GAAG,eAAe,CAAC;QAS5B,IAAI,CAAC,GAAG,GAAG,GAAG,CAAC;QACf,IAAI,CAAC,MAAM,GAAG,OAAO,CAAC;QACtB,IAAI,CAAC,GAAG,GAAG,EAAwB,CAAC;QACpC,IAAI,CAAC,GAAG,GAAG,EAAE,CAAC;QACd,IAAI,CAAC,EAAE,GAAG,CAAC,CAAC;QACZ,IAAI,CAAC,IAAI,GAAG,EAAE,CAAC;IACjB,CAAC;IAED,sCAAM,GAAN,UAAO,GAAuB;QAC5B,IAAI,CAAC,GAAG,GAAG,GAAG,CAAC;QACf,IAAM,CAAC,GAAG,IAAI,CAAC,MAAM,CAAC;QACtB,IAAM,CAAC,GAAG,IAAI,CAAC,EAAE,GAAG,CAAC,CAAC,MAAM,CAAC;QAC7B,IAAM,IAAI,GAAG,IAAI,CAAC,IAAI,GAAG,IAAI,KAAK,CAAC,CAAC,CAAC,CAAC;QACtC,KAAK,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE,EAAE;YAC1B,IAAI,CAAC,CAAC,CAAC,GAAG,EAAE,CAAC;YACb,CAAC,CAAC,CAAC,CAAC,CAAC,IAAI,CAAC,IAAI,qBAAqB,CAAM,CAAC,EAAE,IAAI,CAAC,CAAC,CAAC;SACpD;QACD,IAAI,CAAC,GAAG,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC;IACtB,CAAC;IAED,qCAAK,GAAL;QACE,IAAM,CAAC,GAAG,IAAI,CAAC,MAAM,CAAC;QACtB,IAAM,CAAC,GAAG,CAAC,CAAC,MAAM,CAAC;QACnB,IAAM,GAAG,GAAG,IAAI,CAAC,GAAG,CAAC;QACrB,IAAI,CAAC,GAAG,CAAC,OAAO,CAAC,IAAI,CAAC,CAAC;QACvB,KAAK,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE,EAAE;YAC1B,CAAC,CAAC,CAAC,CAAC,CAAC,OAAO,CAAC,GAAG,CAAC,CAAC,CAAC,CAAC,CAAC;SACtB;QACD,IAAI,CAAC,GAAG,GAAG,EAAwB,CAAC;QACpC,IAAI,CAAC,IAAI,GAAG,EAAE,CAAC;QACf,IAAI,CAAC,GAAG,GAAG,EAAE,CAAC;IAChB,CAAC;IAED,kCAAE,GAAF,UAAG,CAAI;QACL,IAAM,GAAG,GAAG,IAAI,CAAC,GAAG,CAAC;QACrB,IAAI,GAAG,KAAK,EAAE;YAAE,OAAO;QACvB,IAAI,IAAI,CAAC,EAAE,GAAG,CAAC;YAAE,OAAO;QACxB,GAAG,CAAC,EAAE,EAAE,CAAC,SAAK,IAAI,CAAC,IAAI,EAAE,CAAC;IAC5B,CAAC;IAED,kCAAE,GAAF,UAAG,GAAQ;QACT,IAAM,GAAG,GAAG,IAAI,CAAC,GAAG,CAAC;QACrB,IAAI,GAAG,KAAK,EAAE;YAAE,OAAO;QACvB,GAAG,CAAC,EAAE,CAAC,GAAG,CAAC,CAAC;IACd,CAAC;IAED,kCAAE,GAAF;QACE,IAAM,GAAG,GAAG,IAAI,CAAC,GAAG,CAAC;QACrB,IAAI,GAAG,KAAK,EAAE;YAAE,OAAO;QACvB,GAAG,CAAC,EAAE,EAAE,CAAC;IACX,CAAC;IAED,kCAAE,GAAF,UAAG,CAAM,EAAE,CAAS;QAClB,IAAM,CAAC,GAAG,IAAI,CAAC,IAAI,CAAC,CAAC,CAAC,CAAC;QACvB,IAAI,IAAI,CAAC,EAAE,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,EAAE;YAC3B,IAAI,CAAC,EAAE,EAAE,CAAC;SACX;QACD,IAAI,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,CAAC;IACnB,CAAC;IAED,oCAAI,GAAJ,UAAK,CAAS,EAAE,CAA6B;QAC3C,IAAI,CAAC,MAAM,CAAC,CAAC,CAAC,CAAC,OAAO,CAAC,CAAC,CAAC,CAAC;IAC5B,CAAC;IACH,4BAAC;AAAD,CAAC,AAzED,IAyEC;AAzEY,sDAAqB;AA2ElC,IAAI,aAAqC,CAAC;AAE1C;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;GAuEG;AACH,aAAa,GAAG;IAAuB,iBAA8B;SAA9B,UAA8B,EAA9B,qBAA8B,EAA9B,IAA8B;QAA9B,4BAA8B;;IACnE,OAAO,+BAA+B,OAAoB;QACxD,OAAO,IAAI,cAAM,CAAa,IAAI,qBAAqB,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC,CAAC;IAC7E,CAAC,CAAC;AACJ,CAA2B,CAAC;AAE5B,kBAAe,aAAa,CAAC","sourcesContent":["import {InternalListener, Operator, Stream} from '../index';\n\nexport interface SampleCombineSignature {\n  (): <T>(s: Stream<T>) => Stream<[T]>;\n  <T1>(s1: Stream<T1>): <T>(s: Stream<T>) => Stream<[T, T1]>;\n  <T1, T2>(\n    s1: Stream<T1>,\n    s2: Stream<T2>): <T>(s: Stream<T>) => Stream<[T, T1, T2]>;\n  <T1, T2, T3>(\n    s1: Stream<T1>,\n    s2: Stream<T2>,\n    s3: Stream<T3>): <T>(s: Stream<T>) => Stream<[T, T1, T2, T3]>;\n  <T1, T2, T3, T4>(\n    s1: Stream<T1>,\n    s2: Stream<T2>,\n    s3: Stream<T3>,\n    s4: Stream<T4>): <T>(s: Stream<T>) => Stream<[T, T1, T2, T3, T4]>;\n  <T1, T2, T3, T4, T5>(\n    s1: Stream<T1>,\n    s2: Stream<T2>,\n    s3: Stream<T3>,\n    s4: Stream<T4>,\n    s5: Stream<T5>): <T>(s: Stream<T>) => Stream<[T, T1, T2, T3, T4, T5]>;\n  <T1, T2, T3, T4, T5, T6>(\n    s1: Stream<T1>,\n    s2: Stream<T2>,\n    s3: Stream<T3>,\n    s4: Stream<T4>,\n    s5: Stream<T5>,\n    s6: Stream<T6>): <T>(s: Stream<T>) => Stream<[T, T1, T2, T3, T4, T5, T6]>;\n  <T1, T2, T3, T4, T5, T6, T7>(\n    s1: Stream<T1>,\n    s2: Stream<T2>,\n    s3: Stream<T3>,\n    s4: Stream<T4>,\n    s5: Stream<T5>,\n    s6: Stream<T6>,\n    s7: Stream<T7>): <T>(s: Stream<T>) => Stream<[T, T1, T2, T3, T4, T5, T6, T7]>;\n  <T1, T2, T3, T4, T5, T6, T7, T8>(\n    s1: Stream<T1>,\n    s2: Stream<T2>,\n    s3: Stream<T3>,\n    s4: Stream<T4>,\n    s5: Stream<T5>,\n    s6: Stream<T6>,\n    s7: Stream<T7>,\n    s8: Stream<T8>): <T>(s: Stream<T>) => Stream<[T, T1, T2, T3, T4, T5, T6, T7, T8]>;\n  (...streams: Array<Stream<any>>): (s: Stream<any>) => Stream<Array<any>>;\n}\n\nconst NO = {};\n\nexport class SampleCombineListener<T> implements InternalListener<T> {\n  constructor(private i: number, private p: SampleCombineOperator<any>) {\n    p.ils[i] = this;\n  }\n\n  _n(t: T): void {\n    const p = this.p;\n    if (p.out === NO) return;\n    p.up(t, this.i);\n  }\n\n  _e(err: any): void {\n    this.p._e(err);\n  }\n\n  _c(): void {\n    this.p.down(this.i, this);\n  }\n}\n\nexport class SampleCombineOperator<T> implements Operator<T, Array<any>> {\n  public type = 'sampleCombine';\n  public ins: Stream<T>;\n  public others: Array<Stream<any>>;\n  public out: Stream<Array<any>>;\n  public ils: Array<SampleCombineListener<any>>;\n  public Nn: number; // *N*umber of streams still to send *n*ext\n  public vals: Array<any>;\n\n  constructor(ins: Stream<T>, streams: Array<Stream<any>>) {\n    this.ins = ins;\n    this.others = streams;\n    this.out = NO as Stream<Array<any>>;\n    this.ils = [];\n    this.Nn = 0;\n    this.vals = [];\n  }\n\n  _start(out: Stream<Array<any>>): void {\n    this.out = out;\n    const s = this.others;\n    const n = this.Nn = s.length;\n    const vals = this.vals = new Array(n);\n    for (let i = 0; i < n; i++) {\n      vals[i] = NO;\n      s[i]._add(new SampleCombineListener<any>(i, this));\n    }\n    this.ins._add(this);\n  }\n\n  _stop(): void {\n    const s = this.others;\n    const n = s.length;\n    const ils = this.ils;\n    this.ins._remove(this);\n    for (let i = 0; i < n; i++) {\n      s[i]._remove(ils[i]);\n    }\n    this.out = NO as Stream<Array<any>>;\n    this.vals = [];\n    this.ils = [];\n  }\n\n  _n(t: T): void {\n    const out = this.out;\n    if (out === NO) return;\n    if (this.Nn > 0) return;\n    out._n([t, ...this.vals]);\n  }\n\n  _e(err: any): void {\n    const out = this.out;\n    if (out === NO) return;\n    out._e(err);\n  }\n\n  _c(): void {\n    const out = this.out;\n    if (out === NO) return;\n    out._c();\n  }\n\n  up(t: any, i: number): void {\n    const v = this.vals[i];\n    if (this.Nn > 0 && v === NO) {\n      this.Nn--;\n    }\n    this.vals[i] = t;\n  }\n\n  down(i: number, l: SampleCombineListener<any>): void {\n    this.others[i]._remove(l);\n  }\n}\n\nlet sampleCombine: SampleCombineSignature;\n\n/**\n *\n * Combines a source stream with multiple other streams. The result stream\n * will emit the latest events from all input streams, but only when the\n * source stream emits.\n *\n * If the source, or any input stream, throws an error, the result stream\n * will propagate the error. If any input streams end, their final emitted\n * value will remain in the array of any subsequent events from the result\n * stream.\n *\n * The result stream will only complete upon completion of the source stream.\n *\n * Marble diagram:\n *\n * ```text\n * --1----2-----3--------4--- (source)\n * ----a-----b-----c--d------ (other)\n *      sampleCombine\n * -------2a----3b-------4d--\n * ```\n *\n * Examples:\n *\n * ```js\n * import sampleCombine from 'xstream/extra/sampleCombine'\n * import xs from 'xstream'\n *\n * const sampler = xs.periodic(1000).take(3)\n * const other = xs.periodic(100)\n *\n * const stream = sampler.compose(sampleCombine(other))\n *\n * stream.addListener({\n *   next: i => console.log(i),\n *   error: err => console.error(err),\n *   complete: () => console.log('completed')\n * })\n * ```\n *\n * ```text\n * > [0, 8]\n * > [1, 18]\n * > [2, 28]\n * ```\n *\n * ```js\n * import sampleCombine from 'xstream/extra/sampleCombine'\n * import xs from 'xstream'\n *\n * const sampler = xs.periodic(1000).take(3)\n * const other = xs.periodic(100).take(2)\n *\n * const stream = sampler.compose(sampleCombine(other))\n *\n * stream.addListener({\n *   next: i => console.log(i),\n *   error: err => console.error(err),\n *   complete: () => console.log('completed')\n * })\n * ```\n *\n * ```text\n * > [0, 1]\n * > [1, 1]\n * > [2, 1]\n * ```\n *\n * @param {...Stream} streams One or more streams to combine with the sampler\n * stream.\n * @return {Stream}\n */\nsampleCombine = function sampleCombine(...streams: Array<Stream<any>>) {\n  return function sampleCombineOperator(sampler: Stream<any>): Stream<Array<any>> {\n    return new Stream<Array<any>>(new SampleCombineOperator(sampler, streams));\n  };\n} as SampleCombineSignature;\n\nexport default sampleCombine;"]}
\No newline at end of file