'use strict'; var stream = require('stream'); var util = require('util'); /* Stream Transform Please look at the [project documentation](https://csv.js.org/transform/) for additional information. */ const Transformer = function (options = {}, handler) { this.options = options; if (options.consume === undefined || options.consume === null) { this.options.consume = false; } this.options.objectMode = true; if (options.parallel === undefined || options.parallel === null) { this.options.parallel = 100; } if (options.params === undefined || options.params === null) { options.params = null; } this.handler = handler; stream.Transform.call(this, this.options); this.state = { running: 0, started: 0, finished: 0, paused: false, }; return this; }; util.inherits(Transformer, stream.Transform); Transformer.prototype._transform = function (chunk, _, cb) { this.state.started++; this.state.running++; // Accept additionnal chunks to be processed in parallel if (!this.state.paused && this.state.running < this.options.parallel) { cb(); cb = null; // Cancel further callback execution } try { let l = this.handler.length; if (this.options.params !== null) { l--; } if (l === 1) { // sync const result = this.handler.call(this, chunk, this.options.params); if (result && result.then) { result.then((result) => { this.__done(null, [result], cb); }); result.catch((err) => { this.__done(err); }); } else { this.__done(null, [result], cb); } } else if (l === 2) { // async const callback = (err, ...chunks) => this.__done(err, chunks, cb); this.handler.call(this, chunk, callback, this.options.params); } else { throw Error("Invalid handler arguments"); } return false; } catch (err) { this.__done(err); } }; Transformer.prototype._flush = function (cb) { if (this.state.running === 0) { cb(); } else { this._ending = function () { cb(); }; } }; Transformer.prototype.__done = function (err, chunks, cb) { this.state.running--; if (err) { return this.destroy(err); // return this.emit('error', err); } this.state.finished++; for (let chunk of chunks) { if (typeof chunk === "number") { chunk = `${chunk}`; } // We dont push empty string // See https://nodejs.org/api/stream.html#stream_readable_push if (chunk !== undefined && chunk !== null && chunk !== "") { this.state.paused = !this.push(chunk); } } // Chunk has been processed if (cb) { cb(); } if (this._ending && this.state.running === 0) { this._ending(); } }; const transform = function () { let options = {}; let callback, handler, records; for (let i = 0; i < arguments.length; i++) { const argument = arguments[i]; let type = typeof argument; if (argument === null) { type = "null"; } else if (type === "object" && Array.isArray(argument)) { type = "array"; } if (type === "array") { records = argument; } else if (type === "object") { options = { ...argument }; } else if (type === "function") { if (handler && i === arguments.length - 1) { callback = argument; } else { handler = argument; } } else if (type !== "null") { throw new Error( `Invalid Arguments: got ${JSON.stringify(argument)} at position ${i}`, ); } } const transformer = new Transformer(options, handler); let error = false; if (records) { const writer = function () { for (const record of records) { if (error) break; transformer.write(record); } transformer.end(); }; // Support Deno, Rollup doesnt provide a shim for setImmediate if (typeof setImmediate === "function") { setImmediate(writer); } else { setTimeout(writer, 0); } } if (callback || options.consume) { const result = []; transformer.on("readable", function () { let record; while ((record = transformer.read()) !== null) { if (callback) { result.push(record); } } }); transformer.on("error", function (err) { error = true; if (callback) callback(err); }); transformer.on("end", function () { if (callback && !error) callback(null, result); }); } return transformer; }; exports.Transformer = Transformer; exports.transform = transform;