UNPKG

47.1 kBJavaScriptView Raw
1#!/usr/bin/env node
2
3
4/* eslint-disable global-require */
5/* eslint-disable no-console */
6
7const gobble = (s_text, s_indent='') => {
8 let m_pad = /^(\s+)/.exec(s_text.replace(/^([ \t]*\n)/, ''));
9 if(m_pad) {
10 return s_indent+s_text.replace(new RegExp(`\\n${m_pad[1]}`, 'g'), '\n'+s_indent.trim()).trim();
11 }
12 else {
13 return s_indent+s_text.trim();
14 }
15};
16
17const fs =require('fs');
18const path = require('path');
19const yargs = require('yargs');
20const mk_yargs = require('yargs/yargs');
21const graphy = require('graphy');
22const factory = require('@graphy-dev/core.data.factory');
23const stream = require('@graphy-dev/core.iso.stream');
24const dataset_tree = require('@graphy-dev/memory.dataset.fast');
25
26const parse_filter = require('./quad-expression.js').parse;
27const expression_handler = require('./expression-handler.js');
28
29const F_ADAPT_STREAM = function(ds_out) {
30 let ds_dst = ds_out;
31
32 // non-object mode
33 if(!ds_dst._writableState.objectMode) {
34 // transform to JSON
35 ds_out = stream.quads_to_json();
36 }
37 // yes object mode and graphy writable
38 else if(ds_out.isGraphyWritable) {
39 // transform to writable data events
40 ds_out = stream.quads_to_writable();
41 }
42 // forward as-is to super
43 else {
44 return this.constructor.prototype.pipe.call(this, ds_dst);
45 }
46
47 // forward output to super
48 this.constructor.prototype.pipe.call(this, ds_out);
49
50 // pipe output to destination
51 return ds_out.pipe(ds_dst);
52};
53
54const bypass = a_inputs => a_inputs.map((ds_input) => {
55 // intercept pipe
56 ds_input.pipe = F_ADAPT_STREAM;
57
58 return ds_input;
59});
60
61const map_streams = (a_inputs, f_map) => a_inputs.map((ds_input) => {
62 let ds_output = f_map(ds_input);
63
64 // intercept pipe
65 ds_output.pipe = F_ADAPT_STREAM;
66
67 // pipe input to step and return step
68 return ds_input.pipe(ds_output);
69});
70
71const warp_term = (z_term, h_prefixes) => {
72 // c1 string
73 if('string' === typeof z_term) {
74 return factory.c1(z_term, h_prefixes);
75 }
76 // normalize term
77 else {
78 return factory.fromTerm(z_term);
79 }
80};
81
82const interpret_item = (z_item, h_prefixes, ds_transform, fke_transform) => {
83 // array
84 if(Array.isArray(z_item)) {
85 // zero-length, skip
86 if(!z_item.length) return fke_transform();
87
88 // first object is also array
89 if(Array.isArray(z_item[0])) {
90 let nl_subs = z_item.length;
91
92 let c_resolves = 0;
93
94 for(let z_sub of z_item) {
95 interpret_item(z_sub, h_prefixes, ds_transform, () => { // eslint-disable-line no-loop-func
96 if(++c_resolves === nl_subs) {
97 fke_transform();
98 }
99 });
100 }
101
102 // do not consume transform synchronously
103 return;
104 }
105 // triple/quad
106 else if(3 === z_item.length || 4 === z_item.length) {
107 let a_terms = z_item.map(z => warp_term(z, h_prefixes));
108
109 ds_transform.push(factory.quad(...a_terms));
110 }
111 }
112 // string (trig)
113 else if('string' === typeof z_item) {
114 graphy.content.trig.read({
115 input: {
116 string: z_item,
117
118 error(e_read) {
119 warn(`The 'transform' command threw an Error while trying to read the returned TriG string: '${z_item}'\n\nThe reader reported: ${e_read.stack}`);
120
121 // done
122 fke_transform();
123 },
124 },
125
126 data(g_quad_read) {
127 ds_transform.push(g_quad_read);
128 },
129
130 eof() {
131 // done
132 fke_transform();
133 },
134 });
135
136 // do not consume transform synchronously
137 return;
138 }
139 // quad
140 else if(z_item.subject && z_item.predicate && z_item.object) {
141 ds_transform.push(factory.fromQuad(z_item));
142 }
143 // iterable
144 else if(z_item[Symbol.iterator]) {
145 for(let g_quad_it of z_item) {
146 ds_transform.push(g_quad_it);
147 }
148 }
149 // other
150 else {
151 exit(`The callback function supplied to the 'transform' command returned an invalid quad value: '${z_item}'`);
152 }
153
154 // done
155 fke_transform();
156};
157
158const dataset_N1QQ = async(g_argv, a_inputs, fe_command, s_operation) => {
159 let b_canonicalize = !g_argv.strict;
160
161 // create trees
162 let a_trees = a_inputs.map(() => dataset_tree());
163
164 // initial tree
165 let k_tree_out = a_trees[0];
166
167 // pairwise readiness
168 for(let i_input=0, nl_inputs=a_inputs.length; i_input<nl_inputs; i_input++) {
169 let k_tree_b = a_trees[i_input];
170
171 // pipe input stream to tree b
172 a_inputs[i_input].pipe(k_tree_b);
173
174 // wait for input stream to finish writing to b
175 await k_tree_b.until('finish');
176
177 // canonicalize
178 if(b_canonicalize) {
179 k_tree_b = a_trees[i_input] = k_tree_b.canonicalize();
180
181 // update out ref
182 if(!i_input) k_tree_out = k_tree_b;
183 }
184
185 // non-first input
186 if(i_input) {
187 // perform pairwise operation
188 k_tree_out = k_tree_out[s_operation](k_tree_b);
189 }
190 }
191
192 // return readable tree
193 return [k_tree_out];
194};
195
196const dataset_21QQ = (g_argv, a_inputs, fe_command, s_operation) => new Promise((fk_resolve) => {
197 let b_canonicalize = !g_argv.strict;
198
199 let operate = () => [k_tree_a[s_operation](k_tree_b)];
200
201 // wait for a
202 let k_tree_a = dataset_tree();
203 let b_finished_a = false;
204 k_tree_a.on('finish', () => {
205 // canonicalize
206 if(b_canonicalize) k_tree_a = k_tree_a.canonicalize();
207
208 // a is finished now
209 b_finished_a = true;
210
211 // b is already finished
212 if(b_finished_b) fk_resolve(operate());
213 });
214
215 // wait for b
216 let k_tree_b = dataset_tree();
217 let b_finished_b = false;
218 k_tree_b.on('finish', () => {
219 // canonicalize
220 if(b_canonicalize) k_tree_b = k_tree_b.canonicalize();
221
222 // b is finished now
223 b_finished_b = true;
224
225 // a is already finished
226 if(b_finished_a) fk_resolve(operate());
227 });
228
229 // ref both input streams
230 let [ds_input_a, ds_input_b] = a_inputs;
231
232 // pipe each to its tree
233 ds_input_a.pipe(k_tree_a);
234 ds_input_b.pipe(k_tree_b);
235});
236
237const dataset_21QR = (g_argv, a_inputs, fe_command, s_operation) => new Promise((fk_resolve) => {
238 let b_canonicalize = !g_argv.strict;
239
240 let operate = () => [new AnswerSource(k_tree_a[s_operation](k_tree_b))];
241
242 // wait for a
243 let k_tree_a = dataset_tree();
244 let b_finished_a = false;
245 k_tree_a.on('finish', () => {
246 // canonicalize
247 if(b_canonicalize) k_tree_a = k_tree_a.canonicalize();
248
249 // a is finished now
250 b_finished_a = true;
251
252 // b is already finished
253 if(b_finished_b) fk_resolve(operate());
254 });
255
256 // wait for b
257 let k_tree_b = dataset_tree();
258 let b_finished_b = false;
259 k_tree_b.on('finish', () => {
260 // canonicalize
261 if(b_canonicalize) k_tree_b = k_tree_b.canonicalize();
262
263 // b is finished now
264 b_finished_b = true;
265
266 // a is already finished
267 if(b_finished_a) fk_resolve(operate());
268 });
269
270 // ref both input streams
271 let [ds_input_a, ds_input_b] = a_inputs;
272
273 // pipe each to its tree
274 ds_input_a.pipe(k_tree_a);
275 ds_input_b.pipe(k_tree_b);
276});
277
278class AnswerSource extends require('stream').Readable {
279 constructor(w_datum) {
280 super({
281 objectMode: true,
282 });
283
284 this._w_datum = w_datum;
285 }
286
287 // intercept pipe
288 pipe(ds_dst) {
289 // string out
290 if(!ds_dst._writableState.objectMode) {
291 // change read mode; push as JSON
292 this._read = () => {
293 this.push(JSON.stringify(this._w_datum)+'\n', 'utf8');
294 this.push(null);
295 };
296 }
297
298 // forward to super
299 return super.pipe(ds_dst);
300 }
301
302 // push object
303 _read() {
304 this.push(this._w_datum);
305 this.push(null);
306 }
307}
308
309const warn = (s_message) => {
310 console.warn((new Error(s_message)).stack
311 .replace(/\n\s+at [^\n]*\n/, '\n')
312 .replace(/^Error:/, 'Warning:'));
313};
314
315const exit = (s_exit) => {
316 console.error(s_exit);
317 process.exit(1);
318};
319
320
321const S_TRANSFORM_TYPE_NNSQ = 'Transform Type: N-to-N (map); (...Strings) --> [...Quads]';
322const S_TRANSFORM_TYPE_NNQS = 'Transform Type: N-to-N (map); (...Quads) --> [...Strings]';
323const S_TRANSFORM_TYPE_NNQQ = 'Transform Type: N-to-N (map); (...Quads) --> [...Quads]';
324const S_TRANSFORM_TYPE_NNQRB = 'Transform Type: N-to-N (map); (...Quads) --> [...ResultValues<Boolean>]';
325const S_TRANSFORM_TYPE_NNQRN = 'Transform Type: N-to-N (map); (...Quads) --> [...ResultValues<Number>]';
326const S_TRANSFORM_TYPE_21QQ = 'Transform Type: 2-to-1 (join); (Quads, Quads) --> [Quads]';
327const S_TRANSFORM_TYPE_N1QQ = 'Transform Type: N-to-1 (reduce); (...Quads) --> [Quads]';
328const S_TRANSFORM_TYPE_N1AA = 'Transform Type: N-to-1 (reduce); (...Any) --> [Any]';
329const S_TRANSFORM_TYPE_21QRB = 'Transform Type: 2-to-1 (join); (Quads, Quads) --> [ResultValues<Boolean>]';
330const S_TRANSFORM_TYPE_21QRN = 'Transform Type: 2-to-1 (join); (Quads, Quads) --> [ResultValues<Number>]';
331
332const S_CATEGORY_IO = 'Input/Output Commands:';
333const S_CATEGORY_DATA = 'Quad Manipulation Commands:';
334const S_CATEGORY_STREAM = 'Stream Control Commands:';
335const S_CATEGORY_SET = 'Dataset Commands:';
336const S_CATEGORY_STATS = 'Statistics Commands:';
337
338const S_WARN_JAVASCRIPT = `WARNING: the '-j' / '--js' / '--javascript' option evals the given code. DO NOT allow user input into this option as it will grant them arbitrary code execution with whatever privileges the process is running under.`;
339
340const G_OPTIONS_DATASET = {
341 strict: {
342 type: 'boolean',
343 describe: 'if true, forgoes canonicalization before the set operation',
344 },
345};
346
347const content_type_flags = f_verb => ({
348 // nt: {
349 // type: 'boolean',
350 // alias: ['ntriples', 'n-triples'],
351 // conflicts: ['ttl', 'nq', 'trig'],
352 // describe: f_verb('N-Triples'),
353 // },
354
355 // nq: {
356 // type: 'boolean',
357 // alias: ['nquads', 'n-quads'],
358 // conflicts: ['nt', 'ttl', 'trig'],
359 // describe: f_verb('N-Quads'),
360 // },
361
362 // ttl: {
363 // type: 'boolean',
364 // alias: ['turtle'],
365 // conflicts: ['nt', 'nq', 'trig'],
366 // describe: f_verb(' Turtle'),
367 // },
368
369 // trig: {
370 // type: 'boolean',
371 // conflicts: ['nt', 'nq', 'ttl'],
372 // describe: f_verb('TriG'),
373// },
374});
375
376// commands
377let h_commands = { // eslint-disable-next-line quote-props
378/*
379------ Input/Output --------
380*/
381
382 read: {
383 type: S_TRANSFORM_TYPE_NNSQ,
384 category: S_CATEGORY_IO,
385 overview: 'Deserialize RDF content',
386 description: [
387 'Read RDF content, i.e., deserialize it, from 1 or more inputs',
388 ],
389 options: {
390 c: {
391 type: 'string',
392 alias: ['content-type'],
393 default: 'trig',
394 describe: 'either an RDF Content-Type or format selector',
395 group: 'Content Selector Options:',
396 },
397 r: {
398 type: 'boolean',
399 alias: ['relax'],
400 default: undefined, // eslint-disable-line no-undefined
401 describe: 'relax validation of tokens within the RDF document',
402 },
403 b: {
404 type: 'string',
405 alias: ['base', 'base-uri', 'base-iri'],
406 describe: 'set a base URI on the document',
407 },
408
409 ...content_type_flags(s => `read from ${s}`),
410 },
411 examples: [
412 `read -c nt`,
413 `read -c n-triples`,
414 `read -c 'application/n-triples'`,
415 `read -c ttl`,
416 `read -c turtle`,
417 `read -c 'text/turtle'`,
418 ],
419
420 command(g_argv, a_inputs, fe_command) {
421 // select reader
422 let f_reader = graphy.content(g_argv['content-type']).read;
423
424 let gc_read = {
425 relax: g_argv.relax || false,
426 };
427
428 // 'base-uri' => 'baseUri'
429 if(g_argv['base-uri']) {
430 gc_read.baseUri = g_argv['base-uri'];
431 }
432
433 return map_streams(a_inputs, () => f_reader({
434 ...gc_read,
435
436 error: e => fe_command(e),
437 }));
438 },
439 },
440
441 scribe: {
442 type: S_TRANSFORM_TYPE_NNQS,
443 category: S_CATEGORY_IO,
444 overview: 'Serialize RDF content fast',
445 description: [
446 'Scribe RDF content, i.e., serialize it, fast (and possibly ugly) using the given content-type.',
447 ],
448 options: {
449 c: {
450 type: 'string',
451 alias: ['content-type'],
452 default: 'trig',
453 describe: 'either an RDF Content-Type or format selector',
454 group: 'Content Selector Options:',
455 },
456
457 ...content_type_flags(s => `scribe to ${s}`),
458 },
459 examples: [
460 `scribe -c nt`,
461 `scribe -c n-triples`,
462 `scribe -c 'application/n-triples'`,
463 `scribe -c ttl`,
464 `scribe -c turtle`,
465 `scribe -c 'text/turtle'`,
466 ],
467
468 command(g_argv, a_inputs, fe_command, _f_scriber=null) {
469 // select scriber
470 let f_scriber = _f_scriber || graphy.content(g_argv['content-type']).scribe;
471
472 // map input(s) to writer(s)
473 return a_inputs.map(ds_input => ds_input.pipe(f_scriber({
474 error: e => fe_command(e),
475 })));
476 },
477 },
478
479 write: {
480 type: S_TRANSFORM_TYPE_NNQS,
481 category: S_CATEGORY_IO,
482 overview: 'Serialize RDF content in style (pretty-printing)',
483 description: [
484 'Write RDF content, i.e., serialize it, in style (pretty-print) using the given content-type.',
485 ],
486 options() {
487 let s_group_style = 'Style Options:';
488 let s_group_list = 'List Structure Options:';
489
490 return {
491 c: {
492 type: 'string',
493 alias: ['content-type'],
494 describe: `either an RDF Content-Type or format selector (defaults to 'trig')`,
495 group: 'Content Selector Options:',
496 },
497 i: {
498 type: 'string',
499 alias: ['indent'],
500 // default: '\\t', // eslint-disable-line no-undefined
501 describe: `sets the whitespace string to use for indentation. Writers use '\\t' by default`,
502 group: s_group_style,
503 },
504 g: {
505 // type: 'string',
506 alias: ['graph-keyword'],
507 describe: `sets the style to use when serializing the optional 'GRAPH' keyword in TriG. Writers omit this keyword by default.
508 Passing 'true' or empty with this flag on is shorthand for the all-caps 'GRAPH' keyword`.replace(/\n\s*/g, ' '),
509 group: s_group_style,
510 },
511 s: {
512 type: 'boolean',
513 alias: ['simplify-default-graph'],
514 describe: 'if enabled, omits serializing the surrounding optional graph block for the default graph in TriG.',
515 group: s_group_style,
516 },
517 f: {
518 type: 'string',
519 alias: ['first'],
520 describe: `c1 string: sets the predicate to use for the 'first' relation when serializing list structures`,
521 group: s_group_list,
522 },
523 r: {
524 type: 'string',
525 alias: ['rest'],
526 describe: `c1 string: sets the predicate to use for the 'rest' relation when serializing list structures`,
527 group: s_group_list,
528 },
529 n: {
530 type: 'string',
531 alias: ['nil'],
532 describe: `c1 string: sets the predicate to use for the 'nil' relation when serializing list structures`,
533 group: s_group_list,
534 },
535
536 ...content_type_flags(s => `write to ${s}`),
537 };
538 },
539 examples: [
540 `write -c nt`,
541 `write -c n-triples`,
542 `write -c 'application/n-triples'`,
543 `write -c ttl`,
544 `write -c turtle`,
545 `write -c 'text/turtle'`,
546 ],
547
548 command(g_argv, a_inputs, fe_command, _f_writer=null) {
549 // default write config
550 let gc_write = {};
551
552 // extend style options
553 let g_style = gc_write.style || {};
554
555 // content-type selector
556 let s_selector = g_argv['content-type'];
557
558 // no selector specified
559 if(!s_selector) {
560 // default to trig
561 s_selector = 'trig';
562
563 // set simplify default graph so that it might also turtle-compatible
564 g_style.simplify_default_graph = true;
565 }
566
567 // select writer
568 let f_writer = _f_writer || graphy.content(s_selector).write;
569
570 // style options
571 {
572 // indent
573 if(g_argv.indent) g_style.indent = g_argv.indent;
574
575 // graph keyword
576 if(g_argv['graph-keyword']) {
577 let z_graph_keyword = g_argv['graph-keyword'];
578 if('boolean' === typeof z_graph_keyword) {
579 g_style.graph_keyword = z_graph_keyword;
580 }
581 else if(/^true$/i.test(z_graph_keyword)) {
582 g_style.graph_keyword = true;
583 }
584 else if(/^false$/i.test(z_graph_keyword)) {
585 g_style.graph_keyword = false;
586 }
587 else if(/^graph$/i.test(z_graph_keyword)) {
588 g_style.graph_keyword = z_graph_keyword;
589 }
590 else {
591 return fe_command(`The 'write' command reported an invalid value given to the 'graph-keyword' option: '${z_graph_keyword}'`);
592 }
593 }
594
595 // simplify default graph
596 if(g_argv['simplify-default-graph']) g_style.simplify_default_graph = g_argv['simplify-default-graph'];
597 }
598
599 // extend list options
600 let g_lists = gc_write.lists || {};
601 {
602 // first
603 if(g_argv.first) g_lists.first = g_argv.first;
604
605 // rest
606 if(g_argv.rest) g_lists.rest = g_argv.rest;
607
608 // nil
609 if(g_argv.nil) g_lists.nil = g_argv.nil;
610 }
611
612 // map input(s) to writer(s)
613 return a_inputs.map(ds_input => ds_input.pipe(f_writer({
614 ...gc_write,
615
616 style: g_style,
617
618 lists: g_lists,
619
620 error: e => fe_command(e),
621 })));
622 },
623 },
624
625/*
626------ Quad-Level --------
627*/
628
629 skip: {
630 type: S_TRANSFORM_TYPE_NNQQ,
631 category: S_CATEGORY_DATA,
632 overview: 'Skip over some amount of quads in the stream(s)',
633 description: [
634 'Skip over some amount of data (quads by default) for each input stream before piping the remainder as usual.',
635 ],
636 syntax: '[size=1]',
637 positionals: {
638 size: {
639 type: 'number',
640 describe: 'the number of things to skip',
641 },
642 },
643 options: {
644 q: {
645 type: 'boolean',
646 alias: ['quads', 't', 'triples'],
647 describe: 'skip the given number of quads',
648 conflicts: ['s'],
649 },
650
651 s: {
652 type: 'boolean',
653 alias: ['subjects'],
654 describe: 'skip quads until the given number of distinct subjects have been encountered',
655 conflicts: ['q'],
656 },
657
658 // m: {
659 // type: 'number',
660 // alias: ['multiply'],
661 // },
662
663 // d: {
664 // type: 'number',
665 // alias: ['divisions'],
666 // describe: 'rather than counting numbers, use equal divisions of the given size',
667 // example: gobble(`
668 // Skip the first third of data: skip -d 3
669 // `),
670 // },
671
672 // r: {
673 // type: 'number',
674 // alias: ['ratio'],
675 // describe: 'rather than counting numbers, use equal divisions of the given size',
676 // example: gobble(`
677 // Skip the first twothird of data: skip -r '2/3'
678 // `),
679 // },
680 },
681
682 command(g_argv, a_inputs, fe_command) {
683 // size argument
684 let [
685 n_skip=1,
686 ] = g_argv._;
687
688 // count subjects
689 if(g_argv.subjects) {
690 return map_streams(a_inputs, () => {
691 let c_subjects = 0;
692 let kt_prev = null;
693
694 return new stream.Transform.QuadsToOther({
695 error: e => fe_command(e),
696
697 transform(g_quad, s_encoding, fke_transform) {
698 // reached length
699 if(!g_quad.subject.equals(kt_prev) && ++c_subjects > n_skip) {
700 // start pushing
701 this.push(g_quad);
702 }
703
704 // save subject
705 kt_prev = g_quad.subject;
706
707 // done
708 fke_transform();
709 },
710 });
711 });
712 }
713 // count quads
714 else {
715 return map_streams(a_inputs, () => {
716 let c_quads = 0;
717
718 return new stream.Transform.QuadsToOther({
719 error: e => fe_command(e),
720
721 transform(g_quad, s_encoding, fke_transform) {
722 // reached length
723 if(++c_quads > n_skip) {
724 // start pushing
725 this.push(g_quad);
726 }
727
728 // done
729 fke_transform();
730 },
731 });
732 });
733 }
734 },
735 },
736
737 head: {
738 type: S_TRANSFORM_TYPE_NNQQ,
739 category: S_CATEGORY_DATA,
740 overview: 'Limit number of quads from top of stream(s)',
741 description: [
742 'Limit the number of quads that pass through by counting from the top of the stream.',
743 ],
744 syntax: '[size=1]',
745 positionals: {
746 size: {
747 type: 'number',
748 describe: 'the number of things to emit',
749 },
750 },
751 options: {
752 q: {
753 type: 'boolean',
754 alias: ['quads', 't', 'triples'],
755 describe: 'emit only the given number of quads from the top of a stream',
756 conflicts: ['s'],
757 },
758
759 s: {
760 type: 'boolean',
761 alias: ['subjects'],
762 describe: 'emit quads until the given number of distinct subjects have been encountered from the top of a stream',
763 conflicts: ['q'],
764 },
765 },
766
767 command(g_argv, a_inputs, fe_command) {
768 // size argument
769 let [
770 n_head=1,
771 ] = g_argv._;
772
773 // count subjects
774 if(g_argv.subjects) {
775 return map_streams(a_inputs, (ds_input) => {
776 let c_subjects = 0;
777 let kt_prev = null;
778
779 return new stream.Transform.QuadsToOther({
780 error: e => fe_command(e),
781
782 transform(g_quad, s_encoding, fke_transform) {
783 // under limit
784 if(g_quad.subject.equals(kt_prev) || ++c_subjects <= n_head) {
785 this.push(g_quad);
786 }
787 // hit limit
788 else {
789 // push eof
790 this.push(null);
791
792 // destroy source
793 ds_input.destroy();
794 }
795
796 // save subject
797 kt_prev = g_quad.subject;
798
799 // done
800 fke_transform();
801 },
802 });
803 });
804 }
805 // count quads
806 else {
807 return map_streams(a_inputs, (ds_input) => {
808 let c_quads = 0;
809
810 return new stream.Transform.QuadsToOther({
811 error: e => fe_command(e),
812
813 transform(g_quad, s_encoding, fke_transform) {
814 // under limit
815 if(++c_quads <= n_head) {
816 this.push(g_quad);
817 }
818 // hit limit
819 else {
820 // push eof
821 this.push(null);
822
823 // destroy source
824 ds_input.destroy();
825 }
826
827 // done
828 fke_transform();
829 },
830 });
831 });
832 }
833 },
834 },
835
836 tail: {
837 type: S_TRANSFORM_TYPE_NNQQ,
838 category: S_CATEGORY_DATA,
839 overview: 'Limit number of quads from bottom of stream(s)',
840 description: [
841 'Limit the number of quads that pass through by counting from the bottom of the stream.',
842 'WARNING: quads must be buffered in memory until the end of the stream is reached. Specifying a large number of quads or subjects might therefore incur lots of memory.',
843 ],
844 syntax: '<size>',
845 positionals: {
846 size: {
847 type: 'number',
848 describe: 'the number of things to emit',
849 },
850 },
851 options: {
852 q: {
853 type: 'boolean',
854 alias: ['quads', 't', 'triples'],
855 describe: 'emit only the given number of quads from the bottom of a stream',
856 conflicts: ['s'],
857 },
858
859 s: {
860 type: 'boolean',
861 alias: ['subjects'],
862 describe: 'emit quads contained by the given number of distinct subjects from the bottom of a stream',
863 conflicts: ['q'],
864 },
865 },
866
867 command(g_argv, a_inputs, fe_command) {
868 // size argument
869 let [
870 n_tail=1,
871 ] = g_argv._;
872
873 // count subjects
874 if(g_argv.subjects) {
875 return map_streams(a_inputs, () => {
876 let c_subjects = 0;
877 let kt_prev = null;
878 let a_batch = null;
879 let a_fifo = [];
880
881 return new stream.Transform.QuadsToOther({
882 error: e => fe_command(e),
883
884 transform(g_quad, s_encoding, fke_transform) {
885 // different subject
886 if(!g_quad.subject.equals(kt_prev)) {
887 // reset batch
888 a_batch = [];
889
890 // push batch to fifo
891 a_fifo.push(a_batch);
892
893 // hit limit
894 if(++c_subjects > n_tail) {
895 a_fifo.shift();
896 }
897 }
898
899 // save subject
900 kt_prev = g_quad.subject;
901
902 // add quad to batch
903 a_batch.push(g_quad);
904
905 // done
906 fke_transform();
907 },
908
909 flush(fk_flush) {
910 // push queue
911 for(let a_quads of a_fifo) {
912 for(let g_quad of a_quads) {
913 this.push(g_quad);
914 }
915 }
916
917 // free to GC
918 a_fifo.length = 0;
919 a_batch.length = 0;
920
921 // done
922 fk_flush();
923 },
924 });
925 });
926 }
927 // count quads
928 else {
929 return map_streams(a_inputs, () => {
930 let c_quads = 0;
931 let a_fifo = [];
932
933 return new stream.Transform.QuadsToOther({
934 error: e => fe_command(e),
935
936 transform(g_quad, s_encoding, fke_transform) {
937 // under limit
938 if(++c_quads <= n_tail) {
939 a_fifo.push(g_quad);
940 }
941 // hit limit
942 else {
943 // shift off bottom
944 a_fifo.shift();
945
946 // push to top
947 a_fifo.push(g_quad);
948 }
949
950 // done
951 fke_transform();
952 },
953
954 flush(fk_flush) {
955 // push queue
956 for(let g_quad of a_fifo) {
957 this.push(g_quad);
958 }
959
960 // free to GC
961 a_fifo.length = 0;
962
963 // done
964 fk_flush();
965 },
966 });
967 });
968 }
969 },
970 },
971
972 filter: {
973 type: S_TRANSFORM_TYPE_N1QQ,
974 category: S_CATEGORY_DATA,
975 overview: 'Filter quads in the stream(s) via expression',
976 description: [
977 'Filter quads by using either a Quad Filter Expression or a bit of JavaScript.',
978 'For documentation on the Quad Filter Expression syntax, see: https://graphy.link/quad-filter-expression',
979 S_WARN_JAVASCRIPT,
980 ],
981 options: {
982 x: {
983 type: 'string',
984 alias: ['expression'],
985 describe: 'filter quads using the given quad filter expression',
986 conflicts: ['j'],
987 },
988
989 j: {
990 type: 'string',
991 alias: ['js', 'javascript'],
992 describe: 'filter quads using the given JavaScript expression which will be evaluated as a callback function passed the quad and current prefix map as arguments',
993 conflicts: ['x'],
994 },
995
996 v: {
997 type: 'boolean',
998 alias: ['verbose'],
999 describe: 'prints the compiled quad filter expression to stderr',
1000 },
1001 },
1002
1003 examples: [
1004 [
1005 `Filter quads equivalent to the triple pattern: '?s rdf:type dbo:Plant'`,
1006 `filter -x '; a; dbo:Plant'`,
1007 ],
1008 [
1009 `Filter quads equivalent to the SPARQL fragment: 'dbr:Banana ?p ?o. filter(!isLiteral(?o))'`,
1010 `filter -x 'dbr:Banana;; !{literal}'`,
1011 ],
1012 [
1013 `Filter quads equivalent to the SPARQL fragment: '?s ?p ?o. filter(?o > 10e3)]`,
1014 `filter --js 'g => g.object.number > 10e3'`,
1015 ],
1016 [
1017 `Filter quads equivalent to the SPARQL fragment: '?s ?p ?o. filter(strStarts(str(?s), str(?o)))'`,
1018 `filter --js 'g => g.object.value.startsWith(g.subject.value)'`,
1019 ],
1020 // [
1021 // `Filter quads equivalent to the SPARQL fragment: '?s ?p ?o. filter(strStarts(str(?s), str(?o)))'`,
1022 // `filter --js '(g, h) => g.subject.concise(h).startsWith("db")'`,
1023 // ],
1024 ],
1025
1026 command(g_argv, a_inputs, fe_command) {
1027 // quad filter expression
1028 if(g_argv.expression) {
1029 let g_parse = parse_filter(g_argv.expression);
1030
1031 let sj_eval = expression_handler.prepare(g_parse);
1032
1033 if(g_argv.verbose) {
1034 console.warn(`The compiled quad filter expression from 'transform' command: () => {\n${sj_eval.replace(/^|\n/g, '\n\t')}\n}\n`);
1035 }
1036
1037 let f_filter = new Function('factory', 'stream', sj_eval); // eslint-disable-line no-new-func
1038
1039 return map_streams(a_inputs, () => f_filter(factory, stream));
1040 }
1041 // javascript expression
1042 else if(g_argv.javascript) {
1043 let f_build = new Function('factory', /* syntax: js */ `return (${g_argv.javascript}) || null;`); // eslint-disable-line no-new-func
1044
1045 let f_filter = f_build(factory);
1046
1047 // filter exists
1048 if(f_filter) {
1049 // invalid type
1050 if('function' !== typeof f_filter) {
1051 exit(`The 'filter' command expects -j/--javascript expression to evaluate to a function, instead found '${typeof f_filter}'`);
1052 }
1053
1054 return map_streams(a_inputs, () => {
1055 let h_prefixes = {};
1056
1057 return new stream.Transform.QuadsToOther({
1058 error: e => fe_command(e),
1059
1060 prefix(si_prefix, p_iri) {
1061 h_prefixes[si_prefix] = p_iri;
1062 },
1063
1064 transform(g_quad, s_encoding, fke_transform) {
1065 if(f_filter(g_quad, h_prefixes)) {
1066 return fke_transform(null, g_quad);
1067 }
1068
1069 fke_transform();
1070 },
1071 });
1072 });
1073 }
1074 }
1075
1076 // neither used (bypass filter)
1077 warn(`The 'filter' command was not used and is being ignored.`);
1078 return bypass(a_inputs);
1079 },
1080 },
1081
1082 // transform
1083 transform: {
1084 type: S_TRANSFORM_TYPE_N1QQ,
1085 category: S_CATEGORY_DATA,
1086 overview: 'Apply a custom transform function to each quad in the stream(s)',
1087 description: [
1088 '$1',
1089 S_WARN_JAVASCRIPT,
1090 ],
1091 options: {
1092 j: {
1093 type: 'string',
1094 alias: ['js', 'javascript'],
1095 describe: 'transform quads using the given JavaScript expression which will be evaluated as a callback function passed the quad and current prefix map as arguments',
1096 demandOption: true,
1097 example: [
1098 `transform -j 'g => [g.object, g.predicate, g.subject]'`,
1099 `transform -j 'g => ({
1100 [factory.blankNode()]: {
1101 a: 'rdf:Statement',
1102 'rdf:subject': g.subject,
1103 'rdf:predicate': g.predicate,
1104 'rdf:object': g.object,
1105 },
1106 })'`,
1107 ].join('\n'),
1108 },
1109 },
1110
1111 command(g_argv, a_inputs, fe_command) {
1112 // javascript expression
1113 if(g_argv.javascript) {
1114 let f_build = new Function('factory', 'c3', 'c4', /* syntax: js */ `return (${g_argv.javascript}) || null;`); // eslint-disable-line no-new-func
1115
1116 let f_transform = f_build(factory, factory.c3, factory.c4);
1117
1118 // transform exists
1119 if(f_transform) {
1120 // invalid type
1121 if('function' !== typeof f_transform) {
1122 exit(`The 'filter' command expects -j/--javascript expression to evaluate to a function, instead found '${typeof f_filter}'`);
1123 }
1124
1125 return map_streams(a_inputs, () => {
1126 let h_prefixes = {};
1127
1128 return new stream.Transform.QuadsToOther({
1129 error: e => fe_command(e),
1130
1131 prefix(si_prefix, p_iri) {
1132 h_prefixes[si_prefix] = p_iri;
1133 },
1134
1135 transform(g_quad, s_encoding, fke_transform) {
1136 // alias quad property access
1137 g_quad.s = g_quad.subject;
1138 g_quad.p = g_quad.predicate;
1139 g_quad.o = g_quad.object;
1140 g_quad.g = g_quad.graph;
1141
1142 // try to apply transform callback
1143 let z_item;
1144 try {
1145 z_item = f_transform(g_quad, h_prefixes);
1146 }
1147 catch(e_transform) {
1148 warn(`The 'transform' command threw an Error while applying the given callback function:\n${e_transform.stack}`);
1149 return fke_transform();
1150 }
1151
1152 // item was returned
1153 if(z_item) {
1154 return interpret_item(z_item, h_prefixes, this, fke_transform);
1155 }
1156
1157 // done
1158 fke_transform();
1159 },
1160 });
1161 });
1162 }
1163 }
1164
1165 // nothing used (bypass filter)
1166 warn(`The 'transform' command was not used and is being ignored.`);
1167 return bypass(a_inputs);
1168 },
1169 },
1170
1171/*
1172------ Stream Control --------
1173*/
1174
1175 concat: {
1176 type: S_TRANSFORM_TYPE_N1AA,
1177 category: S_CATEGORY_STREAM,
1178 overview: 'Join stream data in order via concatentation',
1179 description: [
1180 'Concatenate quads from all input streams in order.',
1181 ],
1182 options: {},
1183
1184 command(g_argv, a_inputs, fe_command) {
1185 let nl_inputs = a_inputs.length;
1186
1187 // single input, bypass passthrough
1188 if(1 === nl_inputs) return a_inputs;
1189
1190 // input index
1191 let i_input = 0;
1192
1193 // single output stream
1194 let ds_out = new stream.PassThrough();
1195
1196 // stream consumer
1197 let f_next = () => {
1198 // done consuming inputs; end output stream
1199 if(i_input >= nl_inputs) return ds_out.end();
1200
1201 // next input
1202 let ds_input = a_inputs[i_input++];
1203
1204 // once it ends; consume next input
1205 ds_input.on('end', f_next);
1206
1207 // catch stream errors
1208 ds_input.on('error', fe_command);
1209
1210 // pipe to passthrough
1211 ds_input.pipe(ds_out, {end:false});
1212 };
1213
1214 // start concatenating
1215 f_next();
1216
1217 // return stream
1218 return [ds_out];
1219
1220 // return [new stream.Readable({
1221 // objectMode: true,
1222
1223 // read() {
1224 // // while there are inputs
1225 // for(; i_input<nl_inputs; i_input++) {
1226 // // ref input
1227// let ds_input = a_inputs[i_input];
1228
1229 // // read chunk from input and push to output
1230 // let w_chunk;
1231 // while((w_chunk = ds_input.read()) && this.push(w_chunk)) {
1232 // ; // eslint-disable-line no-empty
1233// }
1234
1235 // // input not fully consumed; try again next read
1236 // if(!w_chunk.readableEnded) break;
1237 // }
1238 // },
1239 // })];
1240 },
1241 },
1242
1243 merge: {
1244 type: S_TRANSFORM_TYPE_N1AA,
1245 category: S_CATEGORY_STREAM,
1246 overview: `Join stream data on a 'first come, first serve' basis`,
1247 description: [
1248 'Merge quads from all input streams without order.',
1249 ],
1250 options: {},
1251
1252 command(g_argv, a_inputs, fe_command) {
1253 let nl_inputs = a_inputs.length;
1254
1255 // single input, bypass passthrough
1256 if(1 === nl_inputs) return a_inputs;
1257
1258 // input index
1259 let i_input = 0;
1260
1261 // single output stream
1262 let ds_out = new stream.PassThrough();
1263
1264 // stream consumer
1265 let f_next = () => {
1266 // done consuming inputs; end output stream
1267 if(i_input >= nl_inputs) return ds_out.end();
1268
1269 // next input
1270 let ds_input = a_inputs[i_input++];
1271
1272 // once it ends; consume next input
1273 ds_input.on('end', f_next);
1274
1275 // catch stream errors
1276 ds_input.on('error', fe_command);
1277
1278 // pipe to passthrough
1279 ds_input.pipe(ds_out, {end:false});
1280 };
1281
1282 // start concatenating
1283 f_next();
1284
1285 // return stream
1286 return [ds_out];
1287 },
1288 },
1289
1290/*
1291------ Dataset --------
1292*/
1293
1294 tree: {
1295 type: S_TRANSFORM_TYPE_NNQQ,
1296 category: S_CATEGORY_SET,
1297 overview: 'Put all quads into a tree data structure to remove duplicates',
1298 description: [
1299 '$1',
1300 ],
1301 options: {},
1302
1303 command(g_argv, a_inputs, fe_command) {
1304 return map_streams(a_inputs, () => dataset_tree());
1305 },
1306 },
1307
1308 canonical: {
1309 alias: 'canonicalize',
1310 type: S_TRANSFORM_TYPE_NNQQ,
1311 category: S_CATEGORY_SET,
1312 overview: 'Canonicalize a set of quads using RDF Dataset Normalization Algorithm (URDNA2015)',
1313 description: [
1314 '$1',
1315 ],
1316 options: {},
1317
1318 command(g_argv, a_inputs, fe_command) {
1319 return map_streams(a_inputs, () => dataset_tree({canonicalize:true}));
1320 },
1321 },
1322
1323 union: {
1324 type: S_TRANSFORM_TYPE_N1QQ,
1325 category: S_CATEGORY_SET,
1326 overview: 'Compute the set union of 1 or more inputs',
1327 description: [
1328 '$1',
1329 ],
1330 options: {
1331 ...G_OPTIONS_DATASET,
1332 },
1333
1334 command(g_argv, a_inputs, fe_command) {
1335 return dataset_N1QQ(g_argv, a_inputs, fe_command, 'union');
1336 },
1337 },
1338
1339 intersect: {
1340 alias: 'intersection',
1341 type: S_TRANSFORM_TYPE_N1QQ,
1342 category: S_CATEGORY_SET,
1343 overview: 'Compute the set intersection of 1 or more inputs',
1344 description: [
1345 '$1',
1346 ],
1347 options: {
1348 ...G_OPTIONS_DATASET,
1349 },
1350
1351 command(g_argv, a_inputs, fe_command) {
1352 return dataset_N1QQ(g_argv, a_inputs, fe_command, 'intersection');
1353 },
1354 },
1355
1356 diff: {
1357 alias: 'difference',
1358 type: S_TRANSFORM_TYPE_21QQ,
1359 category: S_CATEGORY_SET,
1360 overview: 'Compute the set difference between 2 inputs',
1361 description: [
1362 '$1',
1363 ],
1364 options: {
1365 ...G_OPTIONS_DATASET,
1366 },
1367
1368 command(g_argv, a_inputs, fe_command) {
1369 return dataset_21QQ(g_argv, a_inputs, fe_command, 'difference');
1370 },
1371 },
1372
1373 minus: {
1374 alias: ['subtract', 'subtraction'],
1375 type: S_TRANSFORM_TYPE_21QQ,
1376 category: S_CATEGORY_SET,
1377 overview: 'Subtract the second input from the first: A - B',
1378 description: [
1379 '$1',
1380 ],
1381 options: {
1382 ...G_OPTIONS_DATASET,
1383 },
1384
1385 command(g_argv, a_inputs, fe_command) {
1386 return dataset_21QQ(g_argv, a_inputs, fe_command, 'minus');
1387 },
1388 },
1389
1390 equals: {
1391 alias: 'equal',
1392 type: S_TRANSFORM_TYPE_21QRB,
1393 category: S_CATEGORY_SET,
1394 overview: 'Test if 2 inputs are equivalent',
1395 description: [
1396 '$1',
1397 ],
1398 options: {
1399 ...G_OPTIONS_DATASET,
1400 },
1401
1402 command(g_argv, a_inputs, fe_command) {
1403 return dataset_21QR(g_argv, a_inputs, fe_command, 'equals');
1404 },
1405 },
1406
1407 disjoint: {
1408 type: S_TRANSFORM_TYPE_21QRB,
1409 category: S_CATEGORY_SET,
1410 overview: 'Test if 2 inputs are completely disjoint from one another',
1411 description: [
1412 '$1',
1413 ],
1414 options: {
1415 ...G_OPTIONS_DATASET,
1416 },
1417
1418 command(g_argv, a_inputs, fe_command) {
1419 return dataset_21QR(g_argv, a_inputs, fe_command, 'disjoint');
1420 },
1421 },
1422
1423 contains: {
1424 alias: 'contain',
1425 type: S_TRANSFORM_TYPE_21QRB,
1426 category: S_CATEGORY_SET,
1427 overview: 'Test if the first input completely contains the second',
1428 description: [
1429 '$1',
1430 ],
1431 options: {
1432 ...G_OPTIONS_DATASET,
1433 },
1434
1435 command(g_argv, a_inputs, fe_command) {
1436 return dataset_21QR(g_argv, a_inputs, fe_command, 'contains');
1437 },
1438 },
1439
1440
1441/*
1442------ Statistics --------
1443*/
1444
1445 count: {
1446 type: S_TRANSFORM_TYPE_NNQRN,
1447 category: S_CATEGORY_STATS,
1448 overview: 'Count the number of events',
1449 description: [
1450 'Count the number of events in each steam',
1451 ],
1452
1453 command(g_argv, a_inputs, fe_command) {
1454 return a_inputs.map(ds_input => new Promise((fk_resolve) => {
1455 let c_items = 0;
1456
1457 ds_input.on('data', () => {
1458 c_items += 1;
1459 });
1460
1461 ds_input.on('error', fe_command);
1462
1463 ds_input.on('end', () => {
1464 fk_resolve(new AnswerSource(c_items));
1465 });
1466 }));
1467 },
1468 },
1469
1470 distinct: {
1471 type: S_TRANSFORM_TYPE_NNQRN,
1472 category: S_CATEGORY_STATS,
1473 overview: 'Count the number of distinct things',
1474 description: [
1475 'Count the number of distinct things, such as quads, triples, subjects, etc.',
1476 ],
1477 options() {
1478 let h_options = {
1479 q: {
1480 alias: ['quads'],
1481 type: 'boolean',
1482 describe: 'count the number of distinct quads',
1483 },
1484 t: {
1485 alias: ['triples'],
1486 type: 'boolean',
1487 describe: 'count the number of distinct triples by ignoring the graph component',
1488 },
1489 s: {
1490 alias: ['subjects'],
1491 type: 'boolean',
1492 describe: 'count the number of distinct subjects',
1493 },
1494 p: {
1495 alias: ['predicates'],
1496 type: 'boolean',
1497 describe: 'count the number of distinct predicates',
1498 },
1499 o: {
1500 alias: ['objects'],
1501 type: 'boolean',
1502 describe: 'count the number of distinct objects',
1503 },
1504 g: {
1505 alias: ['graphs'],
1506 type: 'boolean',
1507 describe: 'count the number of distinct graphs',
1508 },
1509 // l: {
1510 // alias: ['literals'],
1511 // type: 'boolean',
1512 // describe: 'count the number of distinct literals',
1513 // },
1514 };
1515
1516 let a_others = Object.keys(h_options);
1517 for(let [si_option, g_option] of Object.entries(h_options)) {
1518 let as_conflicts = new Set(a_others);
1519 as_conflicts.delete(si_option);
1520 g_option.conflicts = [...as_conflicts];
1521 }
1522
1523 return h_options;
1524 },
1525
1526 command(g_argv, a_inputs, fe_command) {
1527 // quad component
1528 let s_component = null;
1529 {
1530 if(g_argv.subjects) s_component = 'subject';
1531 if(g_argv.predicates) s_component = 'predicate';
1532 if(g_argv.objects) s_component = 'object';
1533 if(g_argv.graphs) s_component = 'graph';
1534 }
1535
1536 // distinct number of a certain component
1537 if(s_component) {
1538 return a_inputs.map(ds_input => new Promise((fk_resolve) => {
1539 // term set
1540 let as_terms = new Set();
1541
1542 // simply count
1543 ds_input.on('data', (g_quad) => {
1544 // concise term
1545 let sc1_term = g_quad[s_component].concise();
1546
1547 // add to set
1548 as_terms.add(sc1_term);
1549 });
1550
1551 // error handling
1552 ds_input.on('error', fe_command);
1553
1554 // once it ends
1555 ds_input.on('end', () => {
1556 fk_resolve(new AnswerSource(as_terms.size));
1557 });
1558 }));
1559 }
1560 // distinct number of triples
1561 else if(g_argv.triples) {
1562 return a_inputs.map(async(ds_input) => {
1563 // remove graph component
1564 let ds_explode = new stream.Transform.QuadsToOther({
1565 transform(g_quad) {
1566 return factory.quad(g_quad.subject, g_quad.predicate, g_quad.object);
1567 },
1568 });
1569
1570 // create dataset
1571 let k_dataset = dataset_tree();
1572
1573 // create pipeline
1574 ds_input.pipe(ds_explode)
1575 .pipe(k_dataset);
1576
1577 // await dataset finish
1578 await k_dataset.until('finish');
1579
1580 // return size
1581 return new AnswerSource(k_dataset.size);
1582 });
1583 }
1584 // distinct number of quads
1585 else {
1586 return a_inputs.map(async(ds_input) => {
1587 let k_dataset = dataset_tree();
1588
1589 ds_input.pipe(k_dataset);
1590
1591 await k_dataset.until('finish');
1592
1593 return new AnswerSource(k_dataset.size);
1594 });
1595 }
1596 },
1597 },
1598
1599 // boilerplate: {
1600 // type: S_TRANSFORM_TYPE_N1QQ,
1601 // overview: '',
1602 // description: [
1603 // 'Some decsription',
1604 // ],
1605// options: {},
1606
1607 // command(g_argv, a_inputs, fe_command) {
1608 // return map_streams(a_inputs, () => new Transform({
1609 // error: e => fe_command(e),
1610 // }));
1611 // },
1612 // },
1613
1614};
1615
1616// command aliases
1617let h_aliases = {};
1618
1619let n_width_column = Object.keys(h_commands)
1620 .reduce((n, s) => Math.max(n, s.length), 0);
1621
1622// group command by category
1623let h_categories = {};
1624for(let [si_command, g_command] of Object.entries(h_commands)) {
1625 let s_category = g_command.category;
1626
1627 let g_category = (h_categories[s_category] = h_categories[s_category] || {
1628 commands: [],
1629 overview: [],
1630 });
1631
1632 g_category.commands.push(g_command);
1633 let s_aliases = '';
1634 if(g_command.alias) {
1635 let z_aliases = g_command.alias;
1636 if(Array.isArray(z_aliases)) {
1637 s_aliases = ` [aliases: ${z_aliases.join(', ')}]`;
1638
1639 // add aliases
1640 for(let s_alias of z_aliases) {
1641 h_aliases[s_alias] = si_command;
1642 }
1643
1644 g_command.aliases = z_aliases;
1645 }
1646 else {
1647 s_aliases = ` [alias: ${z_aliases}]`;
1648
1649 // add alias
1650 h_aliases[z_aliases] = si_command;
1651
1652 g_command.aliases = [z_aliases];
1653 }
1654 }
1655 else {
1656 g_command.aliases = [];
1657 }
1658
1659 g_category.overview.push(` ${si_command.padEnd(n_width_column, ' ')} ${g_command.overview}${s_aliases}`);
1660}
1661
1662// terminal width
1663let n_width_terminal = Math.max(80, yargs.terminalWidth()-10);
1664
1665// args
1666let a_argv = process.argv.slice(2);
1667let n_args = a_argv.length;
1668
1669// no arguments
1670if(!a_argv.length) {
1671 exit('no arguments given');
1672}
1673
1674// inputs
1675let a_inputs = [];
1676
1677// pipeline
1678let a_pipeline = [];
1679{
1680 let a_series = [];
1681
1682 for(let i_argv=0; i_argv<n_args; i_argv++) {
1683 let s_arg = a_argv[i_argv];
1684
1685 // after first arg
1686 if(i_argv) {
1687 // internal pipe
1688 if('--pipe' === s_arg) {
1689 a_pipeline.push(a_series);
1690 if(i_argv === n_args) {
1691 exit(`was expecting pipe destination after --pipe: ${a_argv}`);
1692 }
1693 a_series = [];
1694 continue;
1695 }
1696 // shorthand internal pipe
1697 else if('/' === s_arg) {
1698 a_pipeline.push(a_series);
1699 if(i_argv === n_args) {
1700 exit(`was expecting pipe destination after internal pipe character '/': ${a_argv}`);
1701 }
1702 a_series = [];
1703 continue;
1704 }
1705 // inputs follow
1706 else if('--inputs' === s_arg) {
1707 // convert to readable streams
1708 a_inputs.push(...a_argv.slice(i_argv+1).map(p => fs.createReadStream(p)));
1709 break;
1710 }
1711 }
1712 // first arg, main option
1713 else if('-h' === s_arg || '--help' === s_arg) {
1714 // command overview
1715 let s_overview = '';
1716 for(let [s_category, g_category] of Object.entries(h_categories)) {
1717 s_overview += `${s_category}\n${g_category.overview.join('\n')}\n\n`;
1718 }
1719
1720 // eslint-disable-next-line no-console
1721 console.log(`\nUsage: graphy [OPTIONS] COMMAND [ / COMMAND]* [--inputs FILES...]\n\n`
1722 +`Tip: Use the internal pipe operator ' / ' to string together a series of commands.\n\n`
1723 +s_overview
1724 +gobble(`
1725 Options:
1726 -e, --examples Print some examples and exit
1727 -h, --help Print this help message and exit
1728 -v, --version Print the version info and exit
1729 `)+'\n\n'
1730 +`\nRun 'graphy COMMAND --help' for more information on a command.\n`
1731 +`\nRun 'graphy --examples' to see some examples.\n`,
1732 );
1733
1734 process.exit(0);
1735 }
1736 // version
1737 else if('-v' === s_arg || '--version' === s_arg) {
1738 // eslint-disable-next-line no-console
1739 console.log(require(path.join(__dirname, './package.json')).version);
1740
1741 process.exit(0);
1742 }
1743 // examples
1744 else if('-e' === s_arg || '--examples' === s_arg) {
1745 console.log(gobble(`
1746 Examples:
1747 1) Count the number of distinct triples in a Turtle file:
1748
1749 graphy read -c ttl / distinct --triples < input.ttl
1750
1751 2) Count the distinct number of subjects that are of type dbo:Place in an N-Quads file:
1752
1753 graphy read -c nq / filter -x '; a; dbo:Place' / distinct --subjects < input.nq
1754
1755 3) Compute the difference between two RDF datasets 'a.ttl' and 'b.ttl':
1756
1757 graphy read / diff / write --inputs a.ttl b.ttl > diff.trig
1758
1759 4) Compute the canonicalized union of a bunch of RDF datasets in the 'data/' directory:
1760
1761 graphy read / union / write --inputs data/*.{nt,nq,ttl,trig} > output.trig
1762
1763 5) Extract the middle third quads of a Turtle file:
1764
1765 graphy read -c ttl / divide --into 3 --select 2 / write -c ttl < in.ttl > part-2.ttl
1766
1767 6) Find all owl:sameAs triples where the object is a node and different from
1768 the subject, then swap the subject and object:
1769
1770 graphy read / filter -x '!$object; owl:sameAs; {node}' / transform -j \\
1771 't => [t.o, t.p, t.s]' / write -c ttl < input.ttl > output.ttl
1772
1773 `)+'\n');
1774
1775 process.exit(0);
1776 }
1777
1778 a_series.push(s_arg);
1779 }
1780
1781 // empty series
1782 if(a_series.length) {
1783 a_pipeline.push(a_series);
1784 }
1785}
1786
1787// empty command list
1788if(!a_pipeline.length) {
1789 exit('no commands given');
1790}
1791
1792(async() => {
1793 // failure handler
1794 let fe_command = (z_error) => {
1795 let e_command = 'string' === typeof z_error? new Error(z_error): z_error;
1796 exit(e_command.message);
1797 };
1798
1799 // starting inputs default to stdin if no explicit inputs given
1800 let a_prev = a_inputs.length? a_inputs: [process.stdin];
1801
1802 // each series in pipeline
1803 for(let a_series of a_pipeline) {
1804 // start with command string
1805 let s_command = a_series[0];
1806
1807 // command not found
1808 if(!(s_command in h_commands)) {
1809 // command alias
1810 if(s_command in h_aliases) {
1811 s_command = h_aliases[s_command];
1812 }
1813 // no such command
1814 else {
1815 exit(`no such command '${s_command}'`);
1816 }
1817 }
1818
1819 try {
1820 // ref command
1821 let g_command = h_commands[s_command];
1822
1823 let g_options = 'function' === typeof g_command.options
1824 ? g_command.options()
1825 : (g_command.options || {});
1826
1827 let a_decsribes = g_command.description;
1828 let s_describe = '';
1829 if(a_decsribes.length) {
1830 s_describe = '\nDescription:'+a_decsribes
1831 .map(s => ' '+s.replace(/^\$1$/, g_command.overview+'.'))
1832 .join('\n\n');
1833 }
1834
1835 let s_usage = [s_command, ...g_command.aliases]
1836 .reduce((s_out, s, i) => `${s_out}\n${i? 'Or': 'Usage'}: $0${S_TRANSFORM_TYPE_NNSQ === g_command.type? '': ' [...]'} ${s}${g_command.syntax? ' '+g_command.syntax: ''} [OPTIONS] [ / COMMAND]*`, '');
1837
1838 let s_positionals = '\n'+g_command.type+'\n';
1839 if(g_command.positionals) {
1840 let n_width_positionals = Object.entries(g_command.positionals)
1841 .reduce((n, [s]) => Math.max(n, s.length), 10);
1842
1843 s_positionals += '\nArguments:';
1844 for(let [si_pos, g_pos] of Object.entries(g_command.positionals)) {
1845 s_positionals += `\n ${si_pos.padEnd(n_width_positionals, ' ')} [${g_pos.type}] ${g_pos.describe}`;
1846 }
1847 }
1848
1849 let s_examples = '';
1850 if(g_command.examples && g_command.examples.length) {
1851 s_examples = `Examples:\n`;
1852
1853 let a_egs = g_command.examples;
1854 for(let i_eg=0, nl_egs=a_egs.length; i_eg<nl_egs; i_eg++) {
1855 let z_eg = a_egs[i_eg];
1856
1857 s_examples += ` ${i_eg+1}) `;
1858 if('string' === typeof z_eg) {
1859 s_examples += z_eg+'\n';
1860 }
1861 else {
1862 s_examples += z_eg[0]+'\n '+z_eg[1]+'\n';
1863 }
1864 }
1865 }
1866
1867 // build yargs
1868 let g_argv = mk_yargs()
1869 .strict()
1870 .usage(s_usage+'\n'+s_describe+'\n'+s_positionals)
1871 .options(g_options)
1872 .help()
1873 .epilog(s_examples)
1874 .version(false)
1875 .wrap(n_width_terminal)
1876 .parse(a_series.slice(1));
1877
1878 // no inputs
1879 if(!a_prev.length) {
1880 return fe_command(`The '${s_command}' command requires at least 1 input stream but 0 were piped in.`);
1881 }
1882
1883 // check input cardinality
1884 switch(g_command.type) {
1885 case S_TRANSFORM_TYPE_21QRB:
1886 case S_TRANSFORM_TYPE_21QRN:
1887 case S_TRANSFORM_TYPE_21QQ: {
1888 if(2 !== a_prev.length) {
1889 let nl_inputs = a_inputs.length;
1890 return fe_command(`The '${s_command}' command expects exactly 2 input streams but ${1 === nl_inputs? 'only 1 was': nl_inputs+' were'} piped in.`);
1891 }
1892
1893 break;
1894 }
1895
1896 default: {
1897 break;
1898 }
1899 }
1900
1901 // eval command with its args
1902 let a_curr = await g_command.command(g_argv, a_prev, fe_command);
1903
1904 // advance inputs
1905 a_prev = await Promise.all(a_curr);
1906 }
1907 catch(e_command) {
1908 exit(e_command.message);
1909 }
1910 }
1911
1912 // expect single output
1913 if(1 !== a_prev.length) {
1914 exit(`expected a single output stream but last command produces ${a_prev.length} streams`);
1915 }
1916
1917 // pipe output to stdout
1918 a_prev[0].pipe(process.stdout);
1919})();