UNPKG

1.73 kBJavaScriptView Raw
1import {Transform, ingest} from 'vega-dataflow';
2import {array, inherits} from 'vega-util';
3
4/**
5 * Load and parse data from an external source. Marshalls parameter
6 * values and then invokes the Dataflow request method.
7 * @constructor
8 * @param {object} params - The parameters for this operator.
9 * @param {string} params.url - The URL to load from.
10 * @param {object} params.format - The data format options.
11 */
12export default function Load(params) {
13 Transform.call(this, [], params);
14 this._pending = null;
15}
16
17inherits(Load, Transform, {
18 transform(_, pulse) {
19 const df = pulse.dataflow;
20
21 if (this._pending) {
22 // update state and return pulse
23 return output(this, pulse, this._pending);
24 }
25
26 if (stop(_)) return pulse.StopPropagation;
27
28 if (_.values) {
29 // parse and ingest values, return output pulse
30 return output(this, pulse, df.parse(_.values, _.format));
31 } else if (_.async) {
32 // return promise for non-blocking async loading
33 const p = df.request(_.url, _.format).then(res => {
34 this._pending = array(res.data);
35 return df => df.touch(this);
36 });
37 return {async: p};
38 } else {
39 // return promise for synchronous loading
40 return df.request(_.url, _.format)
41 .then(res => output(this, pulse, array(res.data)));
42 }
43 }
44});
45
46function stop(_) {
47 return _.modified('async') && !(
48 _.modified('values') || _.modified('url') || _.modified('format')
49 );
50}
51
52function output(op, pulse, data) {
53 data.forEach(ingest);
54 const out = pulse.fork(pulse.NO_FIELDS & pulse.NO_SOURCE);
55 out.rem = op.value;
56 op.value = out.source = out.add = data;
57 op._pending = null;
58 if (out.rem.length) out.clean(true);
59 return out;
60}