UNPKG

1.56 kBJavaScriptView Raw
1import {Transform, derive, tupleid} from 'vega-dataflow';
2import {inherits} from 'vega-util';
3
4/**
5 * Relays a data stream between data processing pipelines.
6 * If the derive parameter is set, this transform will create derived
7 * copies of observed tuples. This provides derived data streams in which
8 * modifications to the tuples do not pollute an upstream data source.
9 * @param {object} params - The parameters for this operator.
10 * @param {number} [params.derive=false] - Boolean flag indicating if
11 * the transform should make derived copies of incoming tuples.
12 * @constructor
13 */
14export default function Relay(params) {
15 Transform.call(this, null, params);
16}
17
18inherits(Relay, Transform, {
19 transform(_, pulse) {
20 let out, lut;
21
22 if (this.value) {
23 lut = this.value;
24 } else {
25 out = pulse = pulse.addAll();
26 lut = this.value = {};
27 }
28
29 if (_.derive) {
30 out = pulse.fork(pulse.NO_SOURCE);
31
32 pulse.visit(pulse.REM, t => {
33 const id = tupleid(t);
34 out.rem.push(lut[id]);
35 lut[id] = null;
36 });
37
38 pulse.visit(pulse.ADD, t => {
39 const dt = derive(t);
40 lut[tupleid(t)] = dt;
41 out.add.push(dt);
42 });
43
44 pulse.visit(pulse.MOD, t => {
45 const dt = lut[tupleid(t)];
46 for (const k in t) {
47 dt[k] = t[k];
48 // down stream writes may overwrite re-derived tuples
49 // conservatively mark all source fields as modified
50 out.modifies(k);
51 }
52 out.mod.push(dt);
53 });
54 }
55
56 return out;
57 }
58});