1 | import {Transform, derive, tupleid} from 'vega-dataflow';
|
2 | import {inherits} from 'vega-util';
|
3 |
|
4 | /**
|
5 | * Relays a data stream between data processing pipelines.
|
6 | * If the derive parameter is set, this transform will create derived
|
7 | * copies of observed tuples. This provides derived data streams in which
|
8 | * modifications to the tuples do not pollute an upstream data source.
|
9 | * @param {object} params - The parameters for this operator.
|
10 | * @param {number} [params.derive=false] - Boolean flag indicating if
|
11 | * the transform should make derived copies of incoming tuples.
|
12 | * @constructor
|
13 | */
|
14 | export default function Relay(params) {
|
15 | Transform.call(this, null, params);
|
16 | }
|
17 |
|
18 | inherits(Relay, Transform, {
|
19 | transform(_, pulse) {
|
20 | let out, lut;
|
21 |
|
22 | if (this.value) {
|
23 | lut = this.value;
|
24 | } else {
|
25 | out = pulse = pulse.addAll();
|
26 | lut = this.value = {};
|
27 | }
|
28 |
|
29 | if (_.derive) {
|
30 | out = pulse.fork(pulse.NO_SOURCE);
|
31 |
|
32 | pulse.visit(pulse.REM, t => {
|
33 | const id = tupleid(t);
|
34 | out.rem.push(lut[id]);
|
35 | lut[id] = null;
|
36 | });
|
37 |
|
38 | pulse.visit(pulse.ADD, t => {
|
39 | const dt = derive(t);
|
40 | lut[tupleid(t)] = dt;
|
41 | out.add.push(dt);
|
42 | });
|
43 |
|
44 | pulse.visit(pulse.MOD, t => {
|
45 | const dt = lut[tupleid(t)];
|
46 | for (const k in t) {
|
47 | dt[k] = t[k];
|
48 | // down stream writes may overwrite re-derived tuples
|
49 | // conservatively mark all source fields as modified
|
50 | out.modifies(k);
|
51 | }
|
52 | out.mod.push(dt);
|
53 | });
|
54 | }
|
55 |
|
56 | return out;
|
57 | }
|
58 | });
|