UNPKG

51.2 kBJavaScriptView Raw
1#!/usr/bin/env node
2
3
4/* eslint-disable global-require */
5/* eslint-disable no-console */
6
7const gobble = (s_text, s_indent='') => {
8 let m_pad = /^(\s+)/.exec(s_text.replace(/^([ \t]*\n)/, ''));
9 if(m_pad) {
10 return s_indent+s_text.replace(new RegExp(`\\n${m_pad[1]}`, 'g'), '\n'+s_indent.trim()).trim();
11 }
12 else {
13 return s_indent+s_text.trim();
14 }
15};
16
17const fs =require('fs');
18const path = require('path');
19const yargs = require('yargs');
20const mk_yargs = require('yargs/yargs');
21const graphy = require('./api.js');
22const factory = require('@graphy/core.data.factory');
23const stream = require('@graphy/core.iso.stream');
24const dataset_tree = require('@graphy/memory.dataset.fast');
25
26const parse_filter = require('./quad-expression.js').parse;
27const expression_handler = require('./expression-handler.js');
28
29const F_ADAPT_STREAM = function(ds_out) {
30 let ds_dst = ds_out;
31
32 // non-object mode
33 if(!ds_dst._writableState.objectMode) {
34 // transform to JSON
35 ds_out = stream.quads_to_json();
36 }
37 // yes object mode and graphy writable
38 else if(ds_out.isGraphyWritable) {
39 // transform to writable data events
40 ds_out = stream.quads_to_writable();
41 }
42 // forward as-is to super
43 else {
44 return this.constructor.prototype.pipe.call(this, ds_dst);
45 }
46
47 // forward output to super
48 this.constructor.prototype.pipe.call(this, ds_out);
49
50 // pipe output to destination
51 return ds_out.pipe(ds_dst);
52};
53
54const bypass = a_inputs => a_inputs.map((ds_input) => {
55 // intercept pipe
56 ds_input.pipe = F_ADAPT_STREAM;
57
58 return ds_input;
59});
60
61const map_streams = (a_inputs, f_map) => a_inputs.map((ds_input) => {
62 let ds_output = f_map(ds_input);
63
64 // intercept pipe
65 ds_output.pipe = F_ADAPT_STREAM;
66
67 // pipe input to step and return step
68 return ds_input.pipe(ds_output);
69});
70
71const warp_term = (z_term, h_prefixes) => {
72 // c1 string
73 if('string' === typeof z_term) {
74 return factory.c1(z_term, h_prefixes);
75 }
76 // normalize term
77 else {
78 return factory.fromTerm(z_term);
79 }
80};
81
82const interpret_item = (z_item, h_prefixes, ds_transform, fke_transform) => {
83 // array
84 if(Array.isArray(z_item)) {
85 // zero-length, skip
86 if(!z_item.length) return fke_transform();
87
88 // first object is also array
89 if(Array.isArray(z_item[0])) {
90 let nl_subs = z_item.length;
91
92 let c_resolves = 0;
93
94 for(let z_sub of z_item) {
95 interpret_item(z_sub, h_prefixes, ds_transform, () => { // eslint-disable-line no-loop-func
96 if(++c_resolves === nl_subs) {
97 fke_transform();
98 }
99 });
100 }
101
102 // do not consume transform synchronously
103 return;
104 }
105 // triple/quad
106 else if(3 === z_item.length || 4 === z_item.length) {
107 let a_terms = z_item.map(z => warp_term(z, h_prefixes));
108
109 ds_transform.push(factory.quad(...a_terms));
110 }
111 }
112 // string (trig)
113 else if('string' === typeof z_item) {
114 graphy.content.trig.read({
115 input: {
116 string: z_item,
117
118 error(e_read) {
119 warn(`The 'transform' command threw an Error while trying to read the returned TriG string: '${z_item}'\n\nThe reader reported: ${e_read.stack}`);
120
121 // done
122 fke_transform();
123 },
124 },
125
126 data(g_quad_read) {
127 ds_transform.push(g_quad_read);
128 },
129
130 eof() {
131 // done
132 fke_transform();
133 },
134 });
135
136 // do not consume transform synchronously
137 return;
138 }
139 // quad
140 else if(z_item.subject && z_item.predicate && z_item.object) {
141 ds_transform.push(factory.fromQuad(z_item));
142 }
143 // iterable
144 else if(z_item[Symbol.iterator]) {
145 for(let g_quad_it of z_item) {
146 ds_transform.push(g_quad_it);
147 }
148 }
149 // other
150 else {
151 exit(`The callback function supplied to the 'transform' command returned an invalid quad value: '${z_item}'`);
152 }
153
154 // done
155 fke_transform();
156};
157
158const dataset_N1QQ = async(g_argv, a_inputs, fe_command, s_operation) => {
159 let b_canonicalize = !g_argv.strict;
160
161 // create trees
162 let a_trees = a_inputs.map(() => dataset_tree());
163
164 // initial tree
165 let k_tree_out = a_trees[0];
166
167 // pairwise readiness
168 for(let i_input=0, nl_inputs=a_inputs.length; i_input<nl_inputs; i_input++) {
169 let k_tree_b = a_trees[i_input];
170
171 // pipe input stream to tree b
172 a_inputs[i_input].pipe(k_tree_b);
173
174 // wait for input stream to finish writing to b
175 await k_tree_b.until('finish');
176
177 // canonicalize
178 if(b_canonicalize) {
179 k_tree_b = a_trees[i_input] = k_tree_b.canonicalize();
180
181 // update out ref
182 if(!i_input) k_tree_out = k_tree_b;
183 }
184
185 // non-first input
186 if(i_input) {
187 // perform pairwise operation
188 k_tree_out = k_tree_out[s_operation](k_tree_b);
189 }
190 }
191
192 // return readable tree
193 return [k_tree_out];
194};
195
196const dataset_21QQ = (g_argv, a_inputs, fe_command, s_operation) => new Promise((fk_resolve) => {
197 let b_canonicalize = !g_argv.strict;
198
199 let operate = () => [k_tree_a[s_operation](k_tree_b)];
200
201 // wait for a
202 let k_tree_a = dataset_tree();
203 let b_finished_a = false;
204 k_tree_a.on('finish', () => {
205 // canonicalize
206 if(b_canonicalize) k_tree_a = k_tree_a.canonicalize();
207
208 // a is finished now
209 b_finished_a = true;
210
211 // b is already finished
212 if(b_finished_b) fk_resolve(operate());
213 });
214
215 // wait for b
216 let k_tree_b = dataset_tree();
217 let b_finished_b = false;
218 k_tree_b.on('finish', () => {
219 // canonicalize
220 if(b_canonicalize) k_tree_b = k_tree_b.canonicalize();
221
222 // b is finished now
223 b_finished_b = true;
224
225 // a is already finished
226 if(b_finished_a) fk_resolve(operate());
227 });
228
229 // ref both input streams
230 let [ds_input_a, ds_input_b] = a_inputs;
231
232 // pipe each to its tree
233 ds_input_a.pipe(k_tree_a);
234 ds_input_b.pipe(k_tree_b);
235});
236
237const dataset_21QR = (g_argv, a_inputs, fe_command, s_operation) => new Promise((fk_resolve) => {
238 let b_canonicalize = !g_argv.strict;
239
240 let operate = () => [new AnswerSource(k_tree_a[s_operation](k_tree_b))];
241
242 // wait for a
243 let k_tree_a = dataset_tree();
244 let b_finished_a = false;
245 k_tree_a.on('finish', () => {
246 // canonicalize
247 if(b_canonicalize) k_tree_a = k_tree_a.canonicalize();
248
249 // a is finished now
250 b_finished_a = true;
251
252 // b is already finished
253 if(b_finished_b) fk_resolve(operate());
254 });
255
256 // wait for b
257 let k_tree_b = dataset_tree();
258 let b_finished_b = false;
259 k_tree_b.on('finish', () => {
260 // canonicalize
261 if(b_canonicalize) k_tree_b = k_tree_b.canonicalize();
262
263 // b is finished now
264 b_finished_b = true;
265
266 // a is already finished
267 if(b_finished_a) fk_resolve(operate());
268 });
269
270 // ref both input streams
271 let [ds_input_a, ds_input_b] = a_inputs;
272
273 // pipe each to its tree
274 ds_input_a.pipe(k_tree_a);
275 ds_input_b.pipe(k_tree_b);
276});
277
278class AnswerSource extends require('stream').Readable {
279 constructor(w_datum) {
280 super({
281 objectMode: true,
282 });
283
284 this._w_datum = w_datum;
285 }
286
287 // intercept pipe
288 pipe(ds_dst) {
289 // string out
290 if(!ds_dst._writableState.objectMode) {
291 // change read mode; push as JSON
292 this._read = () => {
293 this.push(JSON.stringify(this._w_datum)+'\n', 'utf8');
294 this.push(null);
295 };
296 }
297
298 // forward to super
299 return super.pipe(ds_dst);
300 }
301
302 // push object
303 _read() {
304 this.push(this._w_datum);
305 this.push(null);
306 }
307}
308
309function ignore_pipe_warnings(ds_src) {
310 let f_pipe = ds_src.pipe;
311
312 // override pipe
313 ds_src.pipe = function pipe(ds_dst) {
314 // pipe without warning because of worker threads
315 let n_maxl_src = this._maxListeners || 10;
316 let n_maxl_dst = ds_dst._maxListeners || 10;
317
318 // set infinite max listeners
319 this.setMaxListeners(Infinity);
320 ds_dst.setMaxListeners(Infinity);
321
322 // call pipe
323 let w_ret = f_pipe.call(this, ds_dst);
324
325 // reset max listeners
326 this._maxListeners = n_maxl_src;
327 ds_dst._maxListeners = n_maxl_dst;
328
329 // return value
330 return w_ret;
331 };
332
333 // return self
334 return ds_src;
335}
336
337let b_show_stack_trace = false;
338const warn = (s_message) => {
339 let s_msg = (new Error(s_message)).stack;
340
341 if(!b_show_stack_trace) {
342 s_msg = s_msg.replace(/\n\s+at [^\n]*\n/, '\n')
343 .replace(/^Error:/, 'Warning:');
344 }
345
346 console.warn(s_msg);
347};
348
349const exit = (s_exit) => {
350 console.error(s_exit);
351 process.exit(1);
352};
353
354
355const S_TRANSFORM_TYPE_NNSQ = 'Transform Type: N-to-N (map); (...Strings) --> [...Quads]';
356const S_TRANSFORM_TYPE_NNQS = 'Transform Type: N-to-N (map); (...Quads) --> [...Strings]';
357const S_TRANSFORM_TYPE_NNQQ = 'Transform Type: N-to-N (map); (...Quads) --> [...Quads]';
358const S_TRANSFORM_TYPE_NNQRB = 'Transform Type: N-to-N (map); (...Quads) --> [...ResultValues<Boolean>]';
359const S_TRANSFORM_TYPE_NNQRN = 'Transform Type: N-to-N (map); (...Quads) --> [...ResultValues<Number>]';
360const S_TRANSFORM_TYPE_21QQ = 'Transform Type: 2-to-1 (join); (Quads, Quads) --> [Quads]';
361const S_TRANSFORM_TYPE_N1QQ = 'Transform Type: N-to-1 (reduce); (...Quads) --> [Quads]';
362const S_TRANSFORM_TYPE_N1AA = 'Transform Type: N-to-1 (reduce); (...Any) --> [Any]';
363const S_TRANSFORM_TYPE_21QRB = 'Transform Type: 2-to-1 (join); (Quads, Quads) --> [ResultValues<Boolean>]';
364const S_TRANSFORM_TYPE_21QRN = 'Transform Type: 2-to-1 (join); (Quads, Quads) --> [ResultValues<Number>]';
365
366const S_CATEGORY_IO = 'Input/Output Commands:';
367const S_CATEGORY_DATA = 'Quad Manipulation Commands:';
368const S_CATEGORY_STREAM = 'Stream Control Commands:';
369const S_CATEGORY_SET = 'Dataset Commands:';
370const S_CATEGORY_STATS = 'Statistics Commands:';
371
372const S_WARN_JAVASCRIPT = `WARNING: the '-j' / '--js' / '--javascript' option evals the given code. DO NOT allow user input into this option as it will grant them arbitrary code execution with whatever privileges the process is running under.`;
373
374const G_OPTIONS_DATASET = {
375 strict: {
376 type: 'boolean',
377 describe: 'if true, forgoes canonicalization before the set operation',
378 },
379};
380
381const content_type_flags = f_verb => ({
382 // nt: {
383 // type: 'boolean',
384 // alias: ['ntriples', 'n-triples'],
385 // conflicts: ['ttl', 'nq', 'trig'],
386 // describe: f_verb('N-Triples'),
387 // },
388
389 // nq: {
390 // type: 'boolean',
391 // alias: ['nquads', 'n-quads'],
392 // conflicts: ['nt', 'ttl', 'trig'],
393 // describe: f_verb('N-Quads'),
394 // },
395
396 // ttl: {
397 // type: 'boolean',
398 // alias: ['turtle'],
399 // conflicts: ['nt', 'nq', 'trig'],
400 // describe: f_verb(' Turtle'),
401 // },
402
403 // trig: {
404 // type: 'boolean',
405 // conflicts: ['nt', 'nq', 'ttl'],
406 // describe: f_verb('TriG'),
407// },
408});
409
410// commands
411let h_commands = { // eslint-disable-next-line quote-props
412/*
413------ Input/Output --------
414*/
415
416 read: {
417 type: S_TRANSFORM_TYPE_NNSQ,
418 category: S_CATEGORY_IO,
419 overview: 'Deserialize RDF content',
420 description: [
421 'Read RDF content, i.e., deserialize it, from 1 or more inputs using a single thread',
422 ],
423 options: {
424 c: {
425 type: 'string',
426 alias: ['content-type'],
427 default: 'trig',
428 describe: 'either an RDF Content-Type or format selector',
429 group: 'Content Selector Options:',
430 },
431 r: {
432 type: 'boolean',
433 alias: ['relax'],
434 default: undefined, // eslint-disable-line no-undefined
435 describe: 'relax validation of tokens within the RDF document',
436 },
437 b: {
438 type: 'string',
439 alias: ['base', 'base-uri', 'base-iri'],
440 describe: 'set a base URI on the document',
441 },
442
443 ...content_type_flags(s => `read from ${s}`),
444 },
445 examples: [
446 `read -c nt`,
447 `read -c n-triples`,
448 `read -c 'application/n-triples'`,
449 `read -c ttl`,
450 `read -c turtle`,
451 `read -c 'text/turtle'`,
452 ],
453
454 command(g_argv, a_inputs, fe_command) {
455 // select reader
456 let f_reader = graphy.content(g_argv['content-type']).read;
457
458 let gc_read = {
459 relax: g_argv.relax || false,
460 };
461
462 // 'base-uri' => 'baseUri'
463 if(g_argv['base-uri']) {
464 gc_read.baseUri = g_argv['base-uri'];
465 }
466
467 return map_streams(a_inputs, () => f_reader({
468 ...gc_read,
469
470 error: e => fe_command(e),
471 }));
472 },
473 },
474
475 scan: {
476 type: S_TRANSFORM_TYPE_NNSQ,
477 category: S_CATEGORY_IO,
478 overview: 'Deserialize RDF content using multiple threads',
479 description: [
480 'Scan RDF content, i.e., deserialize it, from 1 or more inputs using multiple threads',
481 ],
482 options: {
483 c: {
484 type: 'string',
485 alias: ['content-type'],
486 default: 'nt',
487 describe: 'either an RDF Content-Type or format selector',
488 group: 'Content Selector Options:',
489 },
490 r: {
491 type: 'boolean',
492 alias: ['relax'],
493 default: undefined, // eslint-disable-line no-undefined
494 describe: 'relax validation of tokens within the RDF document',
495 group: 'Read Options:',
496 },
497 // b: {
498 // type: 'string',
499 // alias: ['base', 'base-uri', 'base-iri'],
500 // describe: 'set a base URI on the document',
501 // },
502 threads: {
503 type: 'number',
504 describe: 'manually set the total number of threads to use (including the main thread)',
505 group: 'Scan Options:',
506 },
507
508 ...content_type_flags(s => `scan from ${s}`),
509 },
510 examples: [
511 `scan -c nt`,
512 `scan -c n-triples`,
513 `scan -c 'application/n-triples'`,
514 `scan -c nq`,
515 `scan -c nquads`,
516 `scan -c 'application/n-quads'`,
517 ],
518
519 command(g_argv, a_inputs, fe_command, a_rest) {
520 // select scanner
521 let f_scanner = graphy.content(g_argv['content-type']).scan;
522
523 let gc_scan = {
524 relax: g_argv.relax || false,
525 threads: g_argv.threads || 0,
526 };
527
528 // next command
529 let a_series = a_rest.shift();
530 let si_next = a_series? a_series[0]: null;
531
532 // command
533 switch(si_next) {
534 // count task
535 case 'count': {
536 return a_inputs.map(ds_input => new Promise((fk_resolve) => {
537 ds_input.on('error', fe_command);
538
539 f_scanner(ds_input, {
540 ...gc_scan,
541
542 preset: 'count',
543
544 error: fe_command,
545
546 report(c_quads) {
547 fk_resolve(ignore_pipe_warnings(new AnswerSource(c_quads)));
548 },
549 });
550 }));
551 }
552
553 // scribe / ndjson
554 case 'scribe':
555 case null: {
556 return a_inputs.map((ds_input) => {
557 ds_input.on('error', fe_command);
558
559 let ds_output = ignore_pipe_warnings(new stream.PassThrough());
560
561 f_scanner(ds_input, {
562 ...gc_scan,
563
564 preset: si_next || 'ndjson',
565
566 error: fe_command,
567
568 update(z_scribe, i_worker) {
569 ds_output.write(i_worker? Buffer.from(z_scribe).toString(): z_scribe);
570 },
571
572 report() {
573 ds_output.end();
574 },
575 });
576
577 return ds_output;
578 });
579 }
580
581 // other
582 default: {
583 fe_command(`The 'scan' command only supports a limited set of commands to follow it; '${si_next}' is not supported`);
584 }
585 }
586 },
587 },
588
589 scribe: {
590 type: S_TRANSFORM_TYPE_NNQS,
591 category: S_CATEGORY_IO,
592 overview: 'Serialize RDF content fast',
593 description: [
594 'Scribe RDF content, i.e., serialize it, fast (and possibly ugly) using the given content-type.',
595 ],
596 options: {
597 c: {
598 type: 'string',
599 alias: ['content-type'],
600 default: 'trig',
601 describe: 'either an RDF Content-Type or format selector',
602 group: 'Content Selector Options:',
603 },
604
605 ...content_type_flags(s => `scribe to ${s}`),
606 },
607 examples: [
608 `scribe -c nt`,
609 `scribe -c n-triples`,
610 `scribe -c 'application/n-triples'`,
611 `scribe -c ttl`,
612 `scribe -c turtle`,
613 `scribe -c 'text/turtle'`,
614 `scribe -c xml`,
615 ],
616
617 command(g_argv, a_inputs, fe_command) {
618 // select scriber
619 let f_scriber = graphy.content(g_argv['content-type']).scribe;
620
621 // map input(s) to writer(s)
622 return a_inputs.map(ds_input => ds_input.pipe(f_scriber({
623 error: e => fe_command(e),
624 })));
625 },
626 },
627
628 write: {
629 type: S_TRANSFORM_TYPE_NNQS,
630 category: S_CATEGORY_IO,
631 overview: 'Serialize RDF content in style (pretty-printing)',
632 description: [
633 'Write RDF content, i.e., serialize it, in style (pretty-print) using the given content-type.',
634 ],
635 options() {
636 let s_group_style = 'Style Options:';
637 let s_group_list = 'List Structure Options:';
638
639 return {
640 c: {
641 type: 'string',
642 alias: ['content-type'],
643 describe: `either an RDF Content-Type or format selector (defaults to 'trig')`,
644 group: 'Content Selector Options:',
645 },
646 i: {
647 type: 'string',
648 alias: ['indent'],
649 // default: '\\t', // eslint-disable-line no-undefined
650 describe: `sets the whitespace string to use for indentation. Writers use '\\t' by default`,
651 group: s_group_style,
652 },
653 g: {
654 // type: 'string',
655 alias: ['graph-keyword'],
656 describe: `sets the style to use when serializing the optional 'GRAPH' keyword in TriG. Writers omit this keyword by default.
657 Passing 'true' or empty with this flag on is shorthand for the all-caps 'GRAPH' keyword`.replace(/\n\s*/g, ' '),
658 group: s_group_style,
659 },
660 s: {
661 type: 'boolean',
662 alias: ['simplify-default-graph'],
663 describe: 'if enabled, omits serializing the surrounding optional graph block for the default graph in TriG.',
664 group: s_group_style,
665 },
666 f: {
667 type: 'string',
668 alias: ['first'],
669 describe: `c1 string: sets the predicate to use for the 'first' relation when serializing list structures`,
670 group: s_group_list,
671 },
672 r: {
673 type: 'string',
674 alias: ['rest'],
675 describe: `c1 string: sets the predicate to use for the 'rest' relation when serializing list structures`,
676 group: s_group_list,
677 },
678 n: {
679 type: 'string',
680 alias: ['nil'],
681 describe: `c1 string: sets the predicate to use for the 'nil' relation when serializing list structures`,
682 group: s_group_list,
683 },
684
685 ...content_type_flags(s => `write to ${s}`),
686 };
687 },
688 examples: [
689 `write -c nt`,
690 `write -c n-triples`,
691 `write -c 'application/n-triples'`,
692 `write -c ttl`,
693 `write -c turtle`,
694 `write -c 'text/turtle'`,
695 ],
696
697 command(g_argv, a_inputs, fe_command) {
698 // default write config
699 let gc_write = {};
700
701 // extend style options
702 let g_style = gc_write.style || {};
703
704 // content-type selector
705 let s_selector = g_argv['content-type'];
706
707 // no selector specified
708 if(!s_selector) {
709 // default to trig
710 s_selector = 'trig';
711
712 // set simplify default graph so that it might also turtle-compatible
713 g_style.simplify_default_graph = true;
714 }
715
716 // select writer
717 let f_writer = graphy.content(s_selector).write;
718
719 // style options
720 {
721 // indent
722 if(g_argv.indent) g_style.indent = g_argv.indent;
723
724 // graph keyword
725 if(g_argv['graph-keyword']) {
726 let z_graph_keyword = g_argv['graph-keyword'];
727 if('boolean' === typeof z_graph_keyword) {
728 g_style.graph_keyword = z_graph_keyword;
729 }
730 else if(/^true$/i.test(z_graph_keyword)) {
731 g_style.graph_keyword = true;
732 }
733 else if(/^false$/i.test(z_graph_keyword)) {
734 g_style.graph_keyword = false;
735 }
736 else if(/^graph$/i.test(z_graph_keyword)) {
737 g_style.graph_keyword = z_graph_keyword;
738 }
739 else {
740 return fe_command(`The 'write' command reported an invalid value given to the 'graph-keyword' option: '${z_graph_keyword}'`);
741 }
742 }
743
744 // simplify default graph
745 if(g_argv['simplify-default-graph']) g_style.simplify_default_graph = g_argv['simplify-default-graph'];
746 }
747
748 // extend list options
749 let g_lists = gc_write.lists || {};
750 {
751 // first
752 if(g_argv.first) g_lists.first = g_argv.first;
753
754 // rest
755 if(g_argv.rest) g_lists.rest = g_argv.rest;
756
757 // nil
758 if(g_argv.nil) g_lists.nil = g_argv.nil;
759 }
760
761 // map input(s) to writer(s)
762 return a_inputs.map(ds_input => ds_input.pipe(f_writer({
763 ...gc_write,
764
765 style: g_style,
766
767 lists: g_lists,
768
769 error: e => fe_command(e),
770 })));
771 },
772 },
773
774/*
775------ Quad-Level --------
776*/
777
778 skip: {
779 type: S_TRANSFORM_TYPE_NNQQ,
780 category: S_CATEGORY_DATA,
781 overview: 'Skip over some amount of quads in the stream(s)',
782 description: [
783 'Skip over some amount of data (quads by default) for each input stream before piping the remainder as usual.',
784 ],
785 syntax: '[size=1]',
786 positionals: {
787 size: {
788 type: 'number',
789 describe: 'the number of things to skip',
790 },
791 },
792 options: {
793 q: {
794 type: 'boolean',
795 alias: ['quads', 't', 'triples'],
796 describe: 'skip the given number of quads',
797 conflicts: ['s'],
798 },
799
800 s: {
801 type: 'boolean',
802 alias: ['subjects'],
803 describe: 'skip quads until the given number of distinct subjects have been encountered',
804 conflicts: ['q'],
805 },
806
807 // m: {
808 // type: 'number',
809 // alias: ['multiply'],
810 // },
811
812 // d: {
813 // type: 'number',
814 // alias: ['divisions'],
815 // describe: 'rather than counting numbers, use equal divisions of the given size',
816 // example: gobble(`
817 // Skip the first third of data: skip -d 3
818 // `),
819 // },
820
821 // r: {
822 // type: 'number',
823 // alias: ['ratio'],
824 // describe: 'rather than counting numbers, use equal divisions of the given size',
825 // example: gobble(`
826 // Skip the first twothird of data: skip -r '2/3'
827 // `),
828 // },
829 },
830
831 command(g_argv, a_inputs, fe_command) {
832 // size argument
833 let [
834 n_skip=1,
835 ] = g_argv._;
836
837 // count subjects
838 if(g_argv.subjects) {
839 return map_streams(a_inputs, () => {
840 let c_subjects = 0;
841 let kt_prev = null;
842
843 return new stream.Transform.QuadsToOther({
844 error: e => fe_command(e),
845
846 transform(g_quad, s_encoding, fke_transform) {
847 // reached length
848 if(!g_quad.subject.equals(kt_prev) && ++c_subjects > n_skip) {
849 // start pushing
850 this.push(g_quad);
851 }
852
853 // save subject
854 kt_prev = g_quad.subject;
855
856 // done
857 fke_transform();
858 },
859 });
860 });
861 }
862 // count quads
863 else {
864 return map_streams(a_inputs, () => {
865 let c_quads = 0;
866
867 return new stream.Transform.QuadsToOther({
868 error: e => fe_command(e),
869
870 transform(g_quad, s_encoding, fke_transform) {
871 // reached length
872 if(++c_quads > n_skip) {
873 // start pushing
874 this.push(g_quad);
875 }
876
877 // done
878 fke_transform();
879 },
880 });
881 });
882 }
883 },
884 },
885
886 head: {
887 type: S_TRANSFORM_TYPE_NNQQ,
888 category: S_CATEGORY_DATA,
889 overview: 'Limit number of quads from top of stream(s)',
890 description: [
891 'Limit the number of quads that pass through by counting from the top of the stream.',
892 ],
893 syntax: '[size=1]',
894 positionals: {
895 size: {
896 type: 'number',
897 describe: 'the number of things to emit',
898 },
899 },
900 options: {
901 q: {
902 type: 'boolean',
903 alias: ['quads', 't', 'triples'],
904 describe: 'emit only the given number of quads from the top of a stream',
905 conflicts: ['s'],
906 },
907
908 s: {
909 type: 'boolean',
910 alias: ['subjects'],
911 describe: 'emit quads until the given number of distinct subjects have been encountered from the top of a stream',
912 conflicts: ['q'],
913 },
914 },
915
916 command(g_argv, a_inputs, fe_command) {
917 // size argument
918 let [
919 n_head=1,
920 ] = g_argv._;
921
922 // count subjects
923 if(g_argv.subjects) {
924 return map_streams(a_inputs, (ds_input) => {
925 let c_subjects = 0;
926 let kt_prev = null;
927
928 return new stream.Transform.QuadsToOther({
929 error: e => fe_command(e),
930
931 transform(g_quad, s_encoding, fke_transform) {
932 // under limit
933 if(g_quad.subject.equals(kt_prev) || ++c_subjects <= n_head) {
934 this.push(g_quad);
935 }
936 // hit limit
937 else {
938 // push eof
939 this.push(null);
940
941 // destroy source
942 ds_input.destroy();
943 }
944
945 // save subject
946 kt_prev = g_quad.subject;
947
948 // done
949 fke_transform();
950 },
951 });
952 });
953 }
954 // count quads
955 else {
956 return map_streams(a_inputs, (ds_input) => {
957 let c_quads = 0;
958
959 return new stream.Transform.QuadsToOther({
960 error: e => fe_command(e),
961
962 transform(g_quad, s_encoding, fke_transform) {
963 // under limit
964 if(++c_quads <= n_head) {
965 this.push(g_quad);
966 }
967 // hit limit
968 else {
969 // push eof
970 this.push(null);
971
972 // destroy source
973 ds_input.destroy();
974 }
975
976 // done
977 fke_transform();
978 },
979 });
980 });
981 }
982 },
983 },
984
985 tail: {
986 type: S_TRANSFORM_TYPE_NNQQ,
987 category: S_CATEGORY_DATA,
988 overview: 'Limit number of quads from bottom of stream(s)',
989 description: [
990 'Limit the number of quads that pass through by counting from the bottom of the stream.',
991 'WARNING: quads must be buffered in memory until the end of the stream is reached. Specifying a large number of quads or subjects might therefore incur lots of memory.',
992 ],
993 syntax: '[size=1]',
994 positionals: {
995 size: {
996 type: 'number',
997 describe: 'the number of things to emit',
998 },
999 },
1000 options: {
1001 q: {
1002 type: 'boolean',
1003 alias: ['quads', 't', 'triples'],
1004 describe: 'emit only the given number of quads from the bottom of a stream',
1005 conflicts: ['s'],
1006 },
1007
1008 s: {
1009 type: 'boolean',
1010 alias: ['subjects'],
1011 describe: 'emit quads contained by the given number of distinct subjects from the bottom of a stream',
1012 conflicts: ['q'],
1013 },
1014 },
1015
1016 command(g_argv, a_inputs, fe_command) {
1017 // size argument
1018 let [
1019 n_tail=1,
1020 ] = g_argv._;
1021
1022 // count subjects
1023 if(g_argv.subjects) {
1024 return map_streams(a_inputs, () => {
1025 let c_subjects = 0;
1026 let kt_prev = null;
1027 let a_batch = null;
1028 let a_fifo = [];
1029
1030 return new stream.Transform.QuadsToOther({
1031 error: e => fe_command(e),
1032
1033 transform(g_quad, s_encoding, fke_transform) {
1034 // different subject
1035 if(!g_quad.subject.equals(kt_prev)) {
1036 // reset batch
1037 a_batch = [];
1038
1039 // push batch to fifo
1040 a_fifo.push(a_batch);
1041
1042 // hit limit
1043 if(++c_subjects > n_tail) {
1044 a_fifo.shift();
1045 }
1046 }
1047
1048 // save subject
1049 kt_prev = g_quad.subject;
1050
1051 // add quad to batch
1052 a_batch.push(g_quad);
1053
1054 // done
1055 fke_transform();
1056 },
1057
1058 flush(fk_flush) {
1059 // push queue
1060 for(let a_quads of a_fifo) {
1061 for(let g_quad of a_quads) {
1062 this.push(g_quad);
1063 }
1064 }
1065
1066 // free to GC
1067 a_fifo.length = 0;
1068 a_batch.length = 0;
1069
1070 // done
1071 fk_flush();
1072 },
1073 });
1074 });
1075 }
1076 // count quads
1077 else {
1078 return map_streams(a_inputs, () => {
1079 let c_quads = 0;
1080 let a_fifo = [];
1081
1082 return new stream.Transform.QuadsToOther({
1083 error: e => fe_command(e),
1084
1085 transform(g_quad, s_encoding, fke_transform) {
1086 // under limit
1087 if(++c_quads <= n_tail) {
1088 a_fifo.push(g_quad);
1089 }
1090 // hit limit
1091 else {
1092 // shift off bottom
1093 a_fifo.shift();
1094
1095 // push to top
1096 a_fifo.push(g_quad);
1097 }
1098
1099 // done
1100 fke_transform();
1101 },
1102
1103 flush(fk_flush) {
1104 // push queue
1105 for(let g_quad of a_fifo) {
1106 this.push(g_quad);
1107 }
1108
1109 // free to GC
1110 a_fifo.length = 0;
1111
1112 // done
1113 fk_flush();
1114 },
1115 });
1116 });
1117 }
1118 },
1119 },
1120
1121 filter: {
1122 type: S_TRANSFORM_TYPE_NNQQ,
1123 category: S_CATEGORY_DATA,
1124 overview: 'Filter quads in the stream(s) via expression',
1125 description: [
1126 'Filter quads by using either a Quad Filter Expression or a bit of JavaScript.',
1127 'For documentation on the Quad Filter Expression syntax, see: https://graphy.link/quad-filter-expressions',
1128 S_WARN_JAVASCRIPT,
1129 ],
1130 options: {
1131 x: {
1132 type: 'string',
1133 alias: ['expression'],
1134 describe: 'filter quads using the given quad filter expression',
1135 conflicts: ['j'],
1136 },
1137
1138 j: {
1139 type: 'string',
1140 alias: ['js', 'javascript'],
1141 describe: 'filter quads using the given JavaScript expression which will be evaluated as a callback function passed the quad and current prefix map as arguments',
1142 conflicts: ['x'],
1143 },
1144
1145 v: {
1146 type: 'boolean',
1147 alias: ['verbose'],
1148 describe: 'prints the compiled quad filter expression to stderr',
1149 },
1150 },
1151
1152 examples: [
1153 [
1154 `Filter quads equivalent to the triple pattern: '?s rdf:type dbo:Plant'`,
1155 `filter -x '; a; dbo:Plant'`,
1156 ],
1157 [
1158 `Filter quads equivalent to the SPARQL fragment: 'dbr:Banana ?p ?o. filter(!isLiteral(?o))'`,
1159 `filter -x 'dbr:Banana;; !{literal}'`,
1160 ],
1161 [
1162 `Filter quads equivalent to the SPARQL fragment: '?s ?p ?o. filter(?o > 10e3)]`,
1163 `filter --js 'g => g.object.number > 10e3'`,
1164 ],
1165 [
1166 `Filter quads equivalent to the SPARQL fragment: '?s ?p ?o. filter(strStarts(str(?s), str(?o)))'`,
1167 `filter --js 'g => g.object.value.startsWith(g.subject.value)'`,
1168 ],
1169 // [
1170 // `Filter quads equivalent to the SPARQL fragment: '?s ?p ?o. filter(strStarts(str(?s), str(?o)))'`,
1171 // `filter --js '(g, h) => g.subject.concise(h).startsWith("db")'`,
1172 // ],
1173 ],
1174
1175 command(g_argv, a_inputs, fe_command) {
1176 // quad filter expression
1177 if(g_argv.expression) {
1178 let g_parse = parse_filter(g_argv.expression);
1179
1180 let sj_eval = expression_handler.prepare(g_parse);
1181
1182 if(g_argv.verbose) {
1183 console.warn(`The compiled quad filter expression from 'transform' command: () => {\n${sj_eval.replace(/^|\n/g, '\n\t')}\n}\n`);
1184 }
1185
1186 let f_filter = new Function('factory', 'stream', sj_eval); // eslint-disable-line no-new-func
1187
1188 return map_streams(a_inputs, () => f_filter(factory, stream));
1189 }
1190 // javascript expression
1191 else if(g_argv.javascript) {
1192 let f_build = new Function('factory', /* syntax: js */ `return (${g_argv.javascript}) || null;`); // eslint-disable-line no-new-func
1193
1194 let f_filter = f_build(factory);
1195
1196 // filter exists
1197 if(f_filter) {
1198 // invalid type
1199 if('function' !== typeof f_filter) {
1200 exit(`The 'filter' command expects -j/--javascript expression to evaluate to a function, instead found '${typeof f_filter}'`);
1201 }
1202
1203 return map_streams(a_inputs, () => {
1204 let h_prefixes = {};
1205
1206 return new stream.Transform.QuadsToOther({
1207 error: e => fe_command(e),
1208
1209 prefix(si_prefix, p_iri) {
1210 h_prefixes[si_prefix] = p_iri;
1211 },
1212
1213 transform(g_quad, s_encoding, fke_transform) {
1214 if(f_filter(g_quad, h_prefixes)) {
1215 return fke_transform(null, g_quad);
1216 }
1217
1218 fke_transform();
1219 },
1220 });
1221 });
1222 }
1223 }
1224
1225 // neither used (bypass filter)
1226 warn(`The 'filter' command was not used and is being ignored.`);
1227 return bypass(a_inputs);
1228 },
1229 },
1230
1231 // transform
1232 transform: {
1233 type: S_TRANSFORM_TYPE_NNQQ,
1234 category: S_CATEGORY_DATA,
1235 overview: 'Apply a custom transform function to each quad in the stream(s)',
1236 description: [
1237 '$1',
1238 S_WARN_JAVASCRIPT,
1239 ],
1240 options: {
1241 j: {
1242 type: 'string',
1243 alias: ['js', 'javascript'],
1244 describe: 'transform quads using the given JavaScript expression which will be evaluated as a callback function passed the quad and current prefix map as arguments',
1245 demandOption: true,
1246 example: [
1247 `transform -j 'g => [g.object, g.predicate, g.subject]'`,
1248 `transform -j 'g => ({
1249 [factory.blankNode()]: {
1250 a: 'rdf:Statement',
1251 'rdf:subject': g.subject,
1252 'rdf:predicate': g.predicate,
1253 'rdf:object': g.object,
1254 },
1255 })'`,
1256 ].join('\n'),
1257 },
1258 },
1259
1260 command(g_argv, a_inputs, fe_command) {
1261 // javascript expression
1262 if(g_argv.javascript) {
1263 let f_build = new Function('factory', 'c3', 'c4', /* syntax: js */ `return (${g_argv.javascript}) || null;`); // eslint-disable-line no-new-func
1264
1265 let f_transform = f_build(factory, factory.c3, factory.c4);
1266
1267 // transform exists
1268 if(f_transform) {
1269 // invalid type
1270 if('function' !== typeof f_transform) {
1271 exit(`The 'filter' command expects -j/--javascript expression to evaluate to a function, instead found '${typeof f_filter}'`);
1272 }
1273
1274 return map_streams(a_inputs, () => {
1275 let h_prefixes = {};
1276
1277 return new stream.Transform.QuadsToOther({
1278 error: e => fe_command(e),
1279
1280 prefix(si_prefix, p_iri) {
1281 h_prefixes[si_prefix] = p_iri;
1282 },
1283
1284 transform(g_quad, s_encoding, fke_transform) {
1285 // alias quad property access
1286 g_quad.s = g_quad.subject;
1287 g_quad.p = g_quad.predicate;
1288 g_quad.o = g_quad.object;
1289 g_quad.g = g_quad.graph;
1290
1291 // try to apply transform callback
1292 let z_item;
1293 try {
1294 z_item = f_transform(g_quad, h_prefixes);
1295 }
1296 catch(e_transform) {
1297 warn(`The 'transform' command threw an Error while applying the given callback function:\n${e_transform.stack}`);
1298 return fke_transform();
1299 }
1300
1301 // item was returned
1302 if(z_item) {
1303 return interpret_item(z_item, h_prefixes, this, fke_transform);
1304 }
1305
1306 // done
1307 fke_transform();
1308 },
1309 });
1310 });
1311 }
1312 }
1313
1314 // nothing used (bypass filter)
1315 warn(`The 'transform' command was not used and is being ignored.`);
1316 return bypass(a_inputs);
1317 },
1318 },
1319
1320/*
1321------ Stream Control --------
1322*/
1323
1324 concat: {
1325 type: S_TRANSFORM_TYPE_N1AA,
1326 category: S_CATEGORY_STREAM,
1327 overview: 'Join stream data in order via concatentation',
1328 description: [
1329 'Concatenate quads from all input streams in order.',
1330 ],
1331 options: {},
1332
1333 command(g_argv, a_inputs, fe_command) {
1334 let nl_inputs = a_inputs.length;
1335
1336 // single input, bypass passthrough
1337 if(1 === nl_inputs) return a_inputs;
1338
1339 // input index
1340 let i_input = 0;
1341
1342 // single output stream
1343 let ds_out = new stream.PassThrough();
1344
1345 // stream consumer
1346 let f_next = () => {
1347 // done consuming inputs; end output stream
1348 if(i_input >= nl_inputs) return ds_out.end();
1349
1350 // next input
1351 let ds_input = a_inputs[i_input++];
1352
1353 // once it ends; consume next input
1354 ds_input.on('end', f_next);
1355
1356 // catch stream errors
1357 ds_input.on('error', fe_command);
1358
1359 // pipe to passthrough
1360 ds_input.pipe(ds_out, {end:false});
1361 };
1362
1363 // start concatenating
1364 f_next();
1365
1366 // return stream
1367 return [ds_out];
1368
1369 // return [new stream.Readable({
1370 // objectMode: true,
1371
1372 // read() {
1373 // // while there are inputs
1374 // for(; i_input<nl_inputs; i_input++) {
1375 // // ref input
1376// let ds_input = a_inputs[i_input];
1377
1378 // // read chunk from input and push to output
1379 // let w_chunk;
1380 // while((w_chunk = ds_input.read()) && this.push(w_chunk)) {
1381 // ; // eslint-disable-line no-empty
1382// }
1383
1384 // // input not fully consumed; try again next read
1385 // if(!w_chunk.readableEnded) break;
1386 // }
1387 // },
1388 // })];
1389 },
1390 },
1391
1392 merge: {
1393 type: S_TRANSFORM_TYPE_N1AA,
1394 category: S_CATEGORY_STREAM,
1395 overview: `Join stream data on a 'first come, first serve' basis`,
1396 description: [
1397 'Merge quads from all input streams without order.',
1398 ],
1399 options: {},
1400
1401 command(g_argv, a_inputs, fe_command) {
1402 let nl_inputs = a_inputs.length;
1403
1404 // single input, bypass passthrough
1405 if(1 === nl_inputs) return a_inputs;
1406
1407 // input index
1408 let i_input = 0;
1409
1410 // single output stream
1411 let ds_out = new stream.PassThrough();
1412
1413 // stream consumer
1414 let f_next = () => {
1415 // done consuming inputs; end output stream
1416 if(i_input >= nl_inputs) return ds_out.end();
1417
1418 // next input
1419 let ds_input = a_inputs[i_input++];
1420
1421 // once it ends; consume next input
1422 ds_input.on('end', f_next);
1423
1424 // catch stream errors
1425 ds_input.on('error', fe_command);
1426
1427 // pipe to passthrough
1428 ds_input.pipe(ds_out, {end:false});
1429 };
1430
1431 // start concatenating
1432 f_next();
1433
1434 // return stream
1435 return [ds_out];
1436 },
1437 },
1438
1439/*
1440------ Dataset --------
1441*/
1442
1443 tree: {
1444 type: S_TRANSFORM_TYPE_NNQQ,
1445 category: S_CATEGORY_SET,
1446 overview: 'Put all quads into a tree data structure to remove duplicates',
1447 description: [
1448 '$1',
1449 ],
1450 options: {},
1451
1452 command(g_argv, a_inputs, fe_command) {
1453 return a_inputs.map(ds_input => ds_input.pipe(dataset_tree()));
1454 },
1455 },
1456
1457 canonical: {
1458 alias: 'canonicalize',
1459 type: S_TRANSFORM_TYPE_NNQQ,
1460 category: S_CATEGORY_SET,
1461 overview: 'Canonicalize a set of quads using RDF Dataset Normalization Algorithm (URDNA2015)',
1462 description: [
1463 '$1',
1464 ],
1465 options: {},
1466
1467 command(g_argv, a_inputs, fe_command) {
1468 return a_inputs.map(ds_input => ds_input.pipe(dataset_tree({canonicalize:true})));
1469 },
1470 },
1471
1472 union: {
1473 type: S_TRANSFORM_TYPE_N1QQ,
1474 category: S_CATEGORY_SET,
1475 overview: 'Compute the set union of 1 or more inputs',
1476 description: [
1477 '$1',
1478 ],
1479 options: {
1480 ...G_OPTIONS_DATASET,
1481 },
1482
1483 command(g_argv, a_inputs, fe_command) {
1484 return dataset_N1QQ(g_argv, a_inputs, fe_command, 'union');
1485 },
1486 },
1487
1488 intersect: {
1489 alias: 'intersection',
1490 type: S_TRANSFORM_TYPE_N1QQ,
1491 category: S_CATEGORY_SET,
1492 overview: 'Compute the set intersection of 1 or more inputs',
1493 description: [
1494 '$1',
1495 ],
1496 options: {
1497 ...G_OPTIONS_DATASET,
1498 },
1499
1500 command(g_argv, a_inputs, fe_command) {
1501 return dataset_N1QQ(g_argv, a_inputs, fe_command, 'intersection');
1502 },
1503 },
1504
1505 diff: {
1506 alias: 'difference',
1507 type: S_TRANSFORM_TYPE_21QQ,
1508 category: S_CATEGORY_SET,
1509 overview: 'Compute the set difference between 2 inputs',
1510 description: [
1511 '$1',
1512 ],
1513 options: {
1514 ...G_OPTIONS_DATASET,
1515 },
1516
1517 command(g_argv, a_inputs, fe_command) {
1518 return dataset_21QQ(g_argv, a_inputs, fe_command, 'difference');
1519 },
1520 },
1521
1522 minus: {
1523 alias: ['subtract', 'subtraction'],
1524 type: S_TRANSFORM_TYPE_21QQ,
1525 category: S_CATEGORY_SET,
1526 overview: 'Subtract the second input from the first: A - B',
1527 description: [
1528 '$1',
1529 ],
1530 options: {
1531 ...G_OPTIONS_DATASET,
1532 },
1533
1534 command(g_argv, a_inputs, fe_command) {
1535 return dataset_21QQ(g_argv, a_inputs, fe_command, 'minus');
1536 },
1537 },
1538
1539 equals: {
1540 alias: 'equal',
1541 type: S_TRANSFORM_TYPE_21QRB,
1542 category: S_CATEGORY_SET,
1543 overview: 'Test if 2 inputs are equivalent',
1544 description: [
1545 '$1',
1546 ],
1547 options: {
1548 ...G_OPTIONS_DATASET,
1549 },
1550
1551 command(g_argv, a_inputs, fe_command) {
1552 return dataset_21QR(g_argv, a_inputs, fe_command, 'equals');
1553 },
1554 },
1555
1556 disjoint: {
1557 type: S_TRANSFORM_TYPE_21QRB,
1558 category: S_CATEGORY_SET,
1559 overview: 'Test if 2 inputs are completely disjoint from one another',
1560 description: [
1561 '$1',
1562 ],
1563 options: {
1564 ...G_OPTIONS_DATASET,
1565 },
1566
1567 command(g_argv, a_inputs, fe_command) {
1568 return dataset_21QR(g_argv, a_inputs, fe_command, 'disjoint');
1569 },
1570 },
1571
1572 contains: {
1573 alias: 'contain',
1574 type: S_TRANSFORM_TYPE_21QRB,
1575 category: S_CATEGORY_SET,
1576 overview: 'Test if the first input completely contains the second',
1577 description: [
1578 '$1',
1579 ],
1580 options: {
1581 ...G_OPTIONS_DATASET,
1582 },
1583
1584 command(g_argv, a_inputs, fe_command) {
1585 return dataset_21QR(g_argv, a_inputs, fe_command, 'contains');
1586 },
1587 },
1588
1589
1590/*
1591------ Statistics --------
1592*/
1593
1594 count: {
1595 type: S_TRANSFORM_TYPE_NNQRN,
1596 category: S_CATEGORY_STATS,
1597 overview: 'Count the number of events',
1598 description: [
1599 'Count the number of events in each steam',
1600 ],
1601
1602 command(g_argv, a_inputs, fe_command) {
1603 return a_inputs.map(ds_input => new Promise((fk_resolve) => {
1604 let c_items = 0;
1605
1606 ds_input.on('data', () => {
1607 c_items += 1;
1608 });
1609
1610 ds_input.on('error', fe_command);
1611
1612 ds_input.on('end', () => {
1613 fk_resolve(new AnswerSource(c_items));
1614 });
1615 }));
1616 },
1617 },
1618
1619 distinct: {
1620 type: S_TRANSFORM_TYPE_NNQRN,
1621 category: S_CATEGORY_STATS,
1622 overview: 'Count the number of distinct things',
1623 description: [
1624 'Count the number of distinct things, such as quads, triples, subjects, etc.',
1625 ],
1626 options() {
1627 let h_options = {
1628 q: {
1629 alias: ['quads'],
1630 type: 'boolean',
1631 describe: 'count the number of distinct quads',
1632 },
1633 t: {
1634 alias: ['triples'],
1635 type: 'boolean',
1636 describe: 'count the number of distinct triples by ignoring the graph component',
1637 },
1638 s: {
1639 alias: ['subjects'],
1640 type: 'boolean',
1641 describe: 'count the number of distinct subjects',
1642 },
1643 p: {
1644 alias: ['predicates'],
1645 type: 'boolean',
1646 describe: 'count the number of distinct predicates',
1647 },
1648 o: {
1649 alias: ['objects'],
1650 type: 'boolean',
1651 describe: 'count the number of distinct objects',
1652 },
1653 g: {
1654 alias: ['graphs'],
1655 type: 'boolean',
1656 describe: 'count the number of distinct graphs',
1657 },
1658 // l: {
1659 // alias: ['literals'],
1660 // type: 'boolean',
1661 // describe: 'count the number of distinct literals',
1662 // },
1663 };
1664
1665 let a_others = Object.keys(h_options);
1666 for(let [si_option, g_option] of Object.entries(h_options)) {
1667 let as_conflicts = new Set(a_others);
1668 as_conflicts.delete(si_option);
1669 g_option.conflicts = [...as_conflicts];
1670 }
1671
1672 return h_options;
1673 },
1674
1675 command(g_argv, a_inputs, fe_command) {
1676 // quad component
1677 let s_component = null;
1678 {
1679 if(g_argv.subjects) s_component = 'subject';
1680 if(g_argv.predicates) s_component = 'predicate';
1681 if(g_argv.objects) s_component = 'object';
1682 if(g_argv.graphs) s_component = 'graph';
1683 }
1684
1685 // distinct number of a certain component
1686 if(s_component) {
1687 return a_inputs.map(ds_input => new Promise((fk_resolve) => {
1688 // term set
1689 let as_terms = new Set();
1690
1691 // simply count
1692 ds_input.on('data', (g_quad) => {
1693 // concise term
1694 let sc1_term = g_quad[s_component].concise();
1695
1696 // add to set
1697 as_terms.add(sc1_term);
1698 });
1699
1700 // error handling
1701 ds_input.on('error', fe_command);
1702
1703 // once it ends
1704 ds_input.on('end', () => {
1705 fk_resolve(new AnswerSource(as_terms.size));
1706 });
1707 }));
1708 }
1709 // distinct number of triples
1710 else if(g_argv.triples) {
1711 return a_inputs.map(async(ds_input) => {
1712 // remove graph component
1713 let ds_explode = new stream.Transform.QuadsToOther({
1714 transform(g_quad, s_encoding, fke_transform) {
1715 // push quad
1716 this.push(factory.quad(g_quad.subject, g_quad.predicate, g_quad.object));
1717
1718 // done
1719 fke_transform();
1720 },
1721 });
1722
1723 // create dataset
1724 let k_dataset = dataset_tree();
1725
1726 // create pipeline
1727 ds_input.pipe(ds_explode)
1728 .pipe(k_dataset);
1729
1730 // await dataset finish
1731 await k_dataset.until('finish');
1732
1733 // return size
1734 return new AnswerSource(k_dataset.size);
1735 });
1736 }
1737 // distinct number of quads
1738 else {
1739 return a_inputs.map(async(ds_input) => {
1740 let k_dataset = dataset_tree();
1741
1742 ds_input.pipe(k_dataset);
1743
1744 await k_dataset.until('finish');
1745
1746 return new AnswerSource(k_dataset.size);
1747 });
1748 }
1749 },
1750 },
1751
1752 // boilerplate: {
1753 // type: S_TRANSFORM_TYPE_N1QQ,
1754 // overview: '',
1755 // description: [
1756 // 'Some decsription',
1757 // ],
1758// options: {},
1759
1760 // command(g_argv, a_inputs, fe_command) {
1761 // return map_streams(a_inputs, () => new Transform({
1762 // error: e => fe_command(e),
1763 // }));
1764 // },
1765 // },
1766
1767};
1768
1769// command aliases
1770let h_aliases = {};
1771
1772let n_width_column = Object.keys(h_commands)
1773 .reduce((n, s) => Math.max(n, s.length), 0);
1774
1775// group command by category
1776let h_categories = {};
1777for(let [si_command, g_command] of Object.entries(h_commands)) {
1778 let s_category = g_command.category;
1779
1780 let g_category = (h_categories[s_category] = h_categories[s_category] || {
1781 commands: [],
1782 overview: [],
1783 });
1784
1785 g_category.commands.push(g_command);
1786 let s_aliases = '';
1787 if(g_command.alias) {
1788 let z_aliases = g_command.alias;
1789 if(Array.isArray(z_aliases)) {
1790 s_aliases = ` [aliases: ${z_aliases.join(', ')}]`;
1791
1792 // add aliases
1793 for(let s_alias of z_aliases) {
1794 h_aliases[s_alias] = si_command;
1795 }
1796
1797 g_command.aliases = z_aliases;
1798 }
1799 else {
1800 s_aliases = ` [alias: ${z_aliases}]`;
1801
1802 // add alias
1803 h_aliases[z_aliases] = si_command;
1804
1805 g_command.aliases = [z_aliases];
1806 }
1807 }
1808 else {
1809 g_command.aliases = [];
1810 }
1811
1812 g_category.overview.push(` ${si_command.padEnd(n_width_column, ' ')} ${g_command.overview}${s_aliases}`);
1813}
1814
1815// terminal width
1816let n_width_terminal = Math.max(80, yargs.terminalWidth()-10);
1817
1818// args
1819let a_argv = process.argv.slice(2);
1820let n_args = a_argv.length;
1821
1822// no arguments
1823if(!a_argv.length) {
1824 exit('no arguments given');
1825}
1826
1827// inputs
1828let a_inputs = [];
1829
1830// pipeline
1831let a_pipeline = [];
1832{
1833 let a_series = [];
1834
1835 for(let i_argv=0; i_argv<n_args; i_argv++) {
1836 let s_arg = a_argv[i_argv];
1837
1838 // after first arg
1839 if(i_argv) {
1840 // internal pipe
1841 if('--pipe' === s_arg) {
1842 a_pipeline.push(a_series);
1843 if(i_argv === n_args) {
1844 exit(`was expecting pipe destination after --pipe: ${a_argv}`);
1845 }
1846 a_series = [];
1847 continue;
1848 }
1849 // shorthand internal pipe
1850 else if('/' === s_arg) {
1851 a_pipeline.push(a_series);
1852 if(i_argv === n_args) {
1853 exit(`was expecting pipe destination after internal pipe character '/': ${a_argv}`);
1854 }
1855 a_series = [];
1856 continue;
1857 }
1858 // inputs follow
1859 else if('--inputs' === s_arg) {
1860 // convert to readable streams
1861 a_inputs.push(...a_argv.slice(i_argv+1).map(p => fs.createReadStream(p)));
1862 break;
1863 }
1864 }
1865 // first arg, main option
1866 else if('-h' === s_arg || '--help' === s_arg || 'help' === s_arg) {
1867 // command overview
1868 let s_overview = '';
1869 for(let [s_category, g_category] of Object.entries(h_categories)) {
1870 s_overview += `${s_category}\n${g_category.overview.join('\n')}\n\n`;
1871 }
1872
1873 // eslint-disable-next-line no-console
1874 console.log(`\nUsage: graphy [OPTIONS] COMMAND [ / COMMAND]* [--inputs FILES...]\n\n`
1875 +`Tip: Use the internal pipe operator ' / ' to string together a series of commands.\n\n`
1876 +s_overview
1877 +gobble(`
1878 Graphy Commands:
1879 examples Alias for '--examples'
1880 help Alias for '--help'
1881 version Alias for '--version'
1882
1883 Options:
1884 -e, --examples Print some examples and exit
1885 -h, --help Print this help message and exit
1886 -v, --version Print the version info and exit
1887
1888 More Options:
1889 --show-stack-trace Show the stack trace when printing error messages
1890 `)+'\n\n'
1891 +`\nRun 'graphy COMMAND --help' for more information on a command.\n`
1892 +`\nRun 'graphy --examples' to see some examples.\n`,
1893 );
1894
1895 process.exit(0);
1896 }
1897 // version
1898 else if('-v' === s_arg || '--version' === s_arg || 'version' === s_arg) {
1899 // eslint-disable-next-line no-console
1900 console.log(require(path.join(__dirname, './package.json')).version);
1901
1902 process.exit(0);
1903 }
1904 // examples
1905 else if('-e' === s_arg || '--examples' === s_arg || 'examples' === s_arg) {
1906 console.log(gobble(`
1907 Examples:
1908 1) Count the number of distinct triples in a Turtle file:
1909
1910 graphy read -c ttl / distinct --triples < input.ttl
1911
1912 2) Count the distinct number of subjects that are of type dbo:Place in an N-Quads file:
1913
1914 graphy read -c nq / filter -x '; a; dbo:Place' / distinct --subjects < input.nq
1915
1916 3) Compute the difference between two RDF datasets 'a.ttl' and 'b.ttl':
1917
1918 graphy read / diff / write --inputs a.ttl b.ttl > diff.trig
1919
1920 4) Compute the canonicalized union of a bunch of RDF datasets in the 'data/' directory:
1921
1922 graphy read / union / write --inputs data/*.{nt,nq,ttl,trig} > output.trig
1923
1924 5) Extract the first 2 million quads of a Turtle file:
1925
1926 graphy read -c ttl / head 2e6 / write -c ttl < in.ttl > view-2M.ttl
1927
1928 6) Find all owl:sameAs triples where the object is a node and different from
1929 the subject, then swap the subject and object:
1930
1931 graphy read / filter -x '!$object; owl:sameAs; {node}' / transform -j \\
1932 't => [t.o, t.p, t.s]' / write -c ttl < input.ttl > output.ttl
1933
1934 `)+'\n');
1935
1936 process.exit(0);
1937 }
1938 // print stack trace in error messages
1939 else if('--show-stack-trace' === s_arg) {
1940 b_show_stack_trace = true;
1941
1942 // consume arg
1943 continue;
1944 }
1945
1946 a_series.push(s_arg);
1947 }
1948
1949 // empty series
1950 if(a_series.length) {
1951 a_pipeline.push(a_series);
1952 }
1953}
1954
1955// empty command list
1956if(!a_pipeline.length) {
1957 exit('no commands given');
1958}
1959
1960(async() => {
1961 // failure handler
1962 let fe_command = (z_error) => {
1963 let e_command = 'string' === typeof z_error? new Error(z_error): z_error;
1964 debugger;
1965 exit(e_command.stack);
1966 };
1967
1968 // starting inputs default to stdin if no explicit inputs given
1969 let a_prev = a_inputs.length? a_inputs: [process.stdin];
1970
1971 // // each series in pipeline
1972 // for(let a_series of a_pipeline) {
1973
1974 // each series in pipeline
1975 while(a_pipeline.length) {
1976 let a_series = a_pipeline.shift();
1977
1978 // start with command string
1979 let s_command = a_series[0];
1980
1981 // command not found
1982 if(!(s_command in h_commands)) {
1983 // command alias
1984 if(s_command in h_aliases) {
1985 s_command = h_aliases[s_command];
1986 }
1987 // no such command
1988 else {
1989 exit(`no such command '${s_command}'`);
1990 }
1991 }
1992
1993 try {
1994 // ref command
1995 let g_command = h_commands[s_command];
1996
1997 let g_options = 'function' === typeof g_command.options
1998 ? g_command.options()
1999 : (g_command.options || {});
2000
2001 let a_decsribes = g_command.description;
2002 let s_describe = '';
2003 if(a_decsribes.length) {
2004 s_describe = '\nDescription:'+a_decsribes
2005 .map(s => ' '+s.replace(/^\$1$/, g_command.overview+'.'))
2006 .join('\n\n');
2007 }
2008
2009 let s_usage = [s_command, ...g_command.aliases]
2010 .reduce((s_out, s, i) => `${s_out}\n${i? 'Or': 'Usage'}: $0${S_TRANSFORM_TYPE_NNSQ === g_command.type? '': ' [...]'} ${s}${g_command.syntax? ' '+g_command.syntax: ''} [OPTIONS] [ / COMMAND]*`, '');
2011
2012 let s_positionals = '\n'+g_command.type+'\n';
2013 if(g_command.positionals) {
2014 let n_width_positionals = Object.entries(g_command.positionals)
2015 .reduce((n, [s]) => Math.max(n, s.length), 10);
2016
2017 s_positionals += '\nArguments:';
2018 for(let [si_pos, g_pos] of Object.entries(g_command.positionals)) {
2019 s_positionals += `\n ${si_pos.padEnd(n_width_positionals, ' ')} [${g_pos.type}] ${g_pos.describe}`;
2020 }
2021 }
2022
2023 let s_examples = '';
2024 if(g_command.examples && g_command.examples.length) {
2025 s_examples = `Examples:\n`;
2026
2027 let a_egs = g_command.examples;
2028 for(let i_eg=0, nl_egs=a_egs.length; i_eg<nl_egs; i_eg++) {
2029 let z_eg = a_egs[i_eg];
2030
2031 s_examples += ` ${i_eg+1}) `;
2032 if('string' === typeof z_eg) {
2033 s_examples += z_eg+'\n';
2034 }
2035 else {
2036 s_examples += z_eg[0]+'\n '+z_eg[1]+'\n';
2037 }
2038 }
2039 }
2040
2041 // build yargs
2042 let g_argv = mk_yargs()
2043 .strict()
2044 .usage(s_usage+'\n'+s_describe+'\n'+s_positionals)
2045 .options(g_options)
2046 .help()
2047 .epilog(s_examples)
2048 .version(false)
2049 .wrap(n_width_terminal)
2050 .parse(a_series.slice(1));
2051
2052 // no inputs
2053 if(!a_prev.length) {
2054 return fe_command(`The '${s_command}' command requires at least 1 input stream but 0 were piped in.`);
2055 }
2056
2057 // check input cardinality
2058 switch(g_command.type) {
2059 case S_TRANSFORM_TYPE_21QRB:
2060 case S_TRANSFORM_TYPE_21QRN:
2061 case S_TRANSFORM_TYPE_21QQ: {
2062 if(2 !== a_prev.length) {
2063 let nl_inputs = a_inputs.length;
2064 return fe_command(`The '${s_command}' command expects exactly 2 input streams but ${1 === nl_inputs? 'only 1 was': nl_inputs+' were'} piped in.`);
2065 }
2066
2067 break;
2068 }
2069
2070 default: {
2071 break;
2072 }
2073 }
2074
2075 // eval command with its args
2076 let a_curr = await g_command.command(g_argv, a_prev, fe_command, a_pipeline);
2077
2078 // advance inputs
2079 a_prev = await Promise.all(a_curr);
2080 }
2081 catch(e_command) {
2082 if(b_show_stack_trace) {
2083 exit(e_command.stack);
2084 }
2085 else {
2086 exit(e_command.message);
2087 }
2088 }
2089 }
2090
2091 // expect single output
2092 if(1 !== a_prev.length) {
2093 exit(`expected a single output stream but last command produces ${a_prev.length} streams`);
2094 }
2095
2096 // pipe output to stdout
2097 a_prev[0].pipe(process.stdout);
2098})();