UNPKG

1.52 kBJavaScriptView Raw
1import {Transform, derive, tupleid} from 'vega-dataflow';
2import {inherits} from 'vega-util';
3
4/**
5 * Relays a data stream between data processing pipelines.
6 * If the derive parameter is set, this transform will create derived
7 * copies of observed tuples. This provides derived data streams in which
8 * modifications to the tuples do not pollute an upstream data source.
9 * @param {object} params - The parameters for this operator.
10 * @param {number} [params.derive=false] - Boolean flag indicating if
11 * the transform should make derived copies of incoming tuples.
12 * @constructor
13 */
14export default function Relay(params) {
15 Transform.call(this, null, params);
16}
17
18var prototype = inherits(Relay, Transform);
19
20prototype.transform = function(_, pulse) {
21 var out, lut;
22
23 if (this.value) {
24 lut = this.value;
25 } else {
26 out = pulse = pulse.addAll();
27 lut = this.value = {};
28 }
29
30 if (_.derive) {
31 out = pulse.fork(pulse.NO_SOURCE);
32
33 pulse.visit(pulse.REM, t => {
34 var id = tupleid(t);
35 out.rem.push(lut[id]);
36 lut[id] = null;
37 });
38
39 pulse.visit(pulse.ADD, t => {
40 var dt = derive(t);
41 lut[tupleid(t)] = dt;
42 out.add.push(dt);
43 });
44
45 pulse.visit(pulse.MOD, t => {
46 var dt = lut[tupleid(t)], k;
47 for (k in t) {
48 dt[k] = t[k];
49 // down stream writes may overwrite re-derived tuples
50 // conservatively mark all source fields as modified
51 out.modifies(k);
52 }
53 out.mod.push(dt);
54 });
55 }
56
57 return out;
58};