UNPKG

5.02 kBJavaScriptView Raw
1"use strict";
2var _a;
3Object.defineProperty(exports, "__esModule", { value: true });
4exports.transformFrom = exports.TransformFromAsyncIterable = void 0;
5const process_1 = require("process");
6const stream_1 = require("stream");
7/**
8 * Prior to Node.js v14, there is a bug where the `finish` event is fired before the callback passed to the `transform._flush()` method is called.
9 * This bug has been fixed in Node.js v15.
10 * @see https://github.com/nodejs/node/issues/34274
11 * @see https://github.com/nodejs/node/pull/34314
12 */
13const HAS_FLUSH_BUG = !(Number((_a = process_1.version.match(/(?<=^v)\d+/)) === null || _a === void 0 ? void 0 : _a[0]) >= 15);
14const DISALLOW_OPTION_NAMES = [
15 'construct',
16 'read',
17 'write',
18 'writev',
19 'final',
20 'destroy',
21 'transform',
22 'flush',
23];
24function removeProp(obj, props) {
25 if (obj === null || obj === undefined)
26 return obj;
27 return Object.fromEntries(Object.entries(obj)
28 .filter(([p]) => !props.includes(p)));
29}
30class TransformFromAsyncIterable extends stream_1.Transform {
31 constructor(transformFn, opts) {
32 super(removeProp(opts, DISALLOW_OPTION_NAMES));
33 Object.defineProperty(this, "transformCallback", {
34 enumerable: true,
35 configurable: true,
36 writable: true,
37 value: void 0
38 });
39 Object.defineProperty(this, "isFinished", {
40 enumerable: true,
41 configurable: true,
42 writable: true,
43 value: false
44 });
45 Object.defineProperty(this, "receiveData", {
46 enumerable: true,
47 configurable: true,
48 writable: true,
49 value: void 0
50 });
51 Object.defineProperty(this, "receivedDataList", {
52 enumerable: true,
53 configurable: true,
54 writable: true,
55 value: []
56 });
57 const source = this.createSource();
58 const result = transformFn(source);
59 (async () => {
60 for await (const chunk of result) {
61 this.push(chunk);
62 }
63 })()
64 .then(() => this.finish())
65 .catch(error => this.finish(error));
66 }
67 _transform(chunk, encoding, callback) {
68 if (this.isFinished) {
69 callback();
70 }
71 else {
72 this.transformCallback = callback;
73 this.emitToSource({ chunk, encoding });
74 }
75 }
76 _flush(callback) {
77 if (this.isFinished) {
78 callback();
79 }
80 else {
81 this.transformCallback = HAS_FLUSH_BUG
82 ? this.emitFinishEventAfterCallback(callback)
83 : callback;
84 this.emitToSource({ done: true });
85 }
86 }
87 finish(error) {
88 this.isFinished = true;
89 if (error) {
90 if (!this.callTransformCallback(error)) {
91 this.destroy(error);
92 }
93 }
94 else {
95 this.push(null);
96 this.callTransformCallback();
97 }
98 }
99 callTransformCallback(...args) {
100 const { transformCallback } = this;
101 if (transformCallback) {
102 this.transformCallback = undefined;
103 transformCallback(...args);
104 return true;
105 }
106 return false;
107 }
108 async *createSource() {
109 var _a;
110 while (true) {
111 const data = ((_a = this.receivedDataList.shift()) !== null && _a !== void 0 ? _a : await new Promise(resolve => {
112 this.receiveData = resolve;
113 this.callTransformCallback();
114 }));
115 if (data.done)
116 break;
117 const { done: _, ...chunkData } = data;
118 yield chunkData;
119 }
120 }
121 emitToSource(data) {
122 const { receiveData } = this;
123 if (receiveData) {
124 this.receiveData = undefined;
125 receiveData(data);
126 }
127 else {
128 this.receivedDataList.push(data);
129 }
130 }
131 emitFinishEventAfterCallback(flushCallback) {
132 const finishEventName = 'finish';
133 const finishEventList = [];
134 this.emit = (event, ...args) => {
135 if (finishEventName === event) {
136 finishEventList.push(args);
137 return false;
138 }
139 return super.emit(event, ...args);
140 };
141 return (...args) => {
142 flushCallback(...args);
143 // @ts-expect-error TS2790: The operand of a 'delete' operator must be optional.
144 delete this.emit;
145 const [error] = args;
146 if (!error) {
147 for (const args of finishEventList) {
148 this.emit(finishEventName, ...args);
149 }
150 }
151 };
152 }
153}
154exports.TransformFromAsyncIterable = TransformFromAsyncIterable;
155function transformFrom(transformFn, options) {
156 return new TransformFromAsyncIterable(transformFn, options);
157}
158exports.transformFrom = transformFrom;
159//# sourceMappingURL=index.js.map
\No newline at end of file