1 | import {Transform, ingest, tupleid} from 'vega-dataflow';
|
2 | import {error, fastmap, inherits, isArray} from 'vega-util';
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 | export default function DataJoin(params) {
|
12 | Transform.call(this, null, params);
|
13 | }
|
14 |
|
15 | var prototype = inherits(DataJoin, Transform);
|
16 |
|
17 | function defaultItemCreate() {
|
18 | return ingest({});
|
19 | }
|
20 |
|
21 | function isExit(t) {
|
22 | return t.exit;
|
23 | }
|
24 |
|
25 | prototype.transform = function(_, pulse) {
|
26 | var df = pulse.dataflow,
|
27 | out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS),
|
28 | item = _.item || defaultItemCreate,
|
29 | key = _.key || tupleid,
|
30 | map = this.value;
|
31 |
|
32 |
|
33 |
|
34 | if (isArray(out.encode)) {
|
35 | out.encode = null;
|
36 | }
|
37 |
|
38 | if (map && (_.modified('key') || pulse.modified(key))) {
|
39 | error('DataJoin does not support modified key function or fields.');
|
40 | }
|
41 |
|
42 | if (!map) {
|
43 | pulse = pulse.addAll();
|
44 | this.value = map = fastmap().test(isExit);
|
45 | map.lookup = function(t) { return map.get(key(t)); };
|
46 | }
|
47 |
|
48 | pulse.visit(pulse.ADD, function(t) {
|
49 | var k = key(t),
|
50 | x = map.get(k);
|
51 |
|
52 | if (x) {
|
53 | if (x.exit) {
|
54 | map.empty--;
|
55 | out.add.push(x);
|
56 | } else {
|
57 | out.mod.push(x);
|
58 | }
|
59 | } else {
|
60 | map.set(k, (x = item(t)));
|
61 | out.add.push(x);
|
62 | }
|
63 |
|
64 | x.datum = t;
|
65 | x.exit = false;
|
66 | });
|
67 |
|
68 | pulse.visit(pulse.MOD, function(t) {
|
69 | var k = key(t),
|
70 | x = map.get(k);
|
71 |
|
72 | if (x) {
|
73 | x.datum = t;
|
74 | out.mod.push(x);
|
75 | }
|
76 | });
|
77 |
|
78 | pulse.visit(pulse.REM, function(t) {
|
79 | var k = key(t),
|
80 | x = map.get(k);
|
81 |
|
82 | if (t === x.datum && !x.exit) {
|
83 | out.rem.push(x);
|
84 | x.exit = true;
|
85 | ++map.empty;
|
86 | }
|
87 | });
|
88 |
|
89 | if (pulse.changed(pulse.ADD_MOD)) out.modifies('datum');
|
90 |
|
91 | if (_.clean && map.empty > df.cleanThreshold) df.runAfter(map.clean);
|
92 |
|
93 | return out;
|
94 | };
|