UNPKG

17.1 kBJavaScriptView Raw
1"use strict";
2// *****************************************************************************
3// Copyright (C) 2020 TypeFox and others.
4//
5// This program and the accompanying materials are made available under the
6// terms of the Eclipse Public License v. 2.0 which is available at
7// http://www.eclipse.org/legal/epl-2.0.
8//
9// This Source Code may also be made available under the following Secondary
10// Licenses when the conditions for such availability set forth in the Eclipse
11// Public License v. 2.0 are satisfied: GNU General Public License, version 2
12// with the GNU Classpath Exception which is available at
13// https://www.gnu.org/software/classpath/license.html.
14//
15// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-only WITH Classpath-exception-2.0
16// *****************************************************************************
17/*---------------------------------------------------------------------------------------------
18 * Copyright (c) Microsoft Corporation. All rights reserved.
19 * Licensed under the MIT License. See License.txt in the project root for license information.
20 *--------------------------------------------------------------------------------------------*/
21Object.defineProperty(exports, "__esModule", { value: true });
22exports.transform = exports.toReadable = exports.toStream = exports.consumeStreamWithLimit = exports.peekStream = exports.consumeStream = exports.peekReadable = exports.consumeReadableWithLimit = exports.consumeReadable = exports.newWriteableStream = exports.isReadableBufferedStream = exports.isReadableStream = exports.Readable = void 0;
23// based on https://github.com/microsoft/vscode/blob/04c36be045a94fee58e5f8992d3e3fd980294a84/src/vs/base/common/stream.ts
24/* eslint-disable max-len */
25/* eslint-disable no-null/no-null */
26/* eslint-disable @typescript-eslint/tslint/config */
27/* eslint-disable @typescript-eslint/no-explicit-any */
28const disposable_1 = require("./disposable");
29var Readable;
30(function (Readable) {
31 function fromString(value) {
32 let done = false;
33 return {
34 read() {
35 if (!done) {
36 done = true;
37 return value;
38 }
39 return null;
40 }
41 };
42 }
43 Readable.fromString = fromString;
44 function toString(readable) {
45 let result = '';
46 let chunk;
47 while ((chunk = readable.read()) != null) {
48 result += chunk;
49 }
50 return result;
51 }
52 Readable.toString = toString;
53})(Readable = exports.Readable || (exports.Readable = {}));
54function isReadableStream(obj) {
55 const candidate = obj;
56 return candidate && [candidate.on, candidate.pause, candidate.resume, candidate.destroy].every(fn => typeof fn === 'function');
57}
58exports.isReadableStream = isReadableStream;
59function isReadableBufferedStream(obj) {
60 const candidate = obj;
61 return candidate && isReadableStream(candidate.stream) && Array.isArray(candidate.buffer) && typeof candidate.ended === 'boolean';
62}
63exports.isReadableBufferedStream = isReadableBufferedStream;
64function newWriteableStream(reducer, options) {
65 return new WriteableStreamImpl(reducer);
66}
67exports.newWriteableStream = newWriteableStream;
68class WriteableStreamImpl {
69 constructor(reducer, options) {
70 this.reducer = reducer;
71 this.options = options;
72 this.state = {
73 flowing: false,
74 ended: false,
75 destroyed: false
76 };
77 this.buffer = {
78 data: [],
79 error: []
80 };
81 this.listeners = {
82 data: [],
83 error: [],
84 end: []
85 };
86 this.pendingWritePromises = [];
87 }
88 pause() {
89 if (this.state.destroyed) {
90 return;
91 }
92 this.state.flowing = false;
93 }
94 resume() {
95 if (this.state.destroyed) {
96 return;
97 }
98 if (!this.state.flowing) {
99 this.state.flowing = true;
100 // emit buffered events
101 this.flowData();
102 this.flowErrors();
103 this.flowEnd();
104 }
105 }
106 write(data) {
107 var _a;
108 if (this.state.destroyed) {
109 return;
110 }
111 // flowing: directly send the data to listeners
112 if (this.state.flowing) {
113 this.listeners.data.forEach(listener => listener(data));
114 }
115 // not yet flowing: buffer data until flowing
116 else {
117 this.buffer.data.push(data);
118 // highWaterMark: if configured, signal back when buffer reached limits
119 if (typeof ((_a = this.options) === null || _a === void 0 ? void 0 : _a.highWaterMark) === 'number' && this.buffer.data.length > this.options.highWaterMark) {
120 return new Promise(resolve => this.pendingWritePromises.push(resolve));
121 }
122 }
123 }
124 error(error) {
125 if (this.state.destroyed) {
126 return;
127 }
128 // flowing: directly send the error to listeners
129 if (this.state.flowing) {
130 this.listeners.error.forEach(listener => listener(error));
131 }
132 // not yet flowing: buffer errors until flowing
133 else {
134 this.buffer.error.push(error);
135 }
136 }
137 end(result) {
138 if (this.state.destroyed) {
139 return;
140 }
141 // end with data or error if provided
142 if (result instanceof Error) {
143 this.error(result);
144 }
145 else if (result) {
146 this.write(result);
147 }
148 // flowing: send end event to listeners
149 if (this.state.flowing) {
150 this.listeners.end.forEach(listener => listener());
151 this.destroy();
152 }
153 // not yet flowing: remember state
154 else {
155 this.state.ended = true;
156 }
157 }
158 on(event, callback) {
159 if (this.state.destroyed) {
160 return;
161 }
162 switch (event) {
163 case 'data':
164 this.listeners.data.push(callback);
165 // switch into flowing mode as soon as the first 'data'
166 // listener is added and we are not yet in flowing mode
167 this.resume();
168 break;
169 case 'end':
170 this.listeners.end.push(callback);
171 // emit 'end' event directly if we are flowing
172 // and the end has already been reached
173 //
174 // finish() when it went through
175 if (this.state.flowing && this.flowEnd()) {
176 this.destroy();
177 }
178 break;
179 case 'error':
180 this.listeners.error.push(callback);
181 // emit buffered 'error' events unless done already
182 // now that we know that we have at least one listener
183 if (this.state.flowing) {
184 this.flowErrors();
185 }
186 break;
187 }
188 }
189 removeListener(event, callback) {
190 if (this.state.destroyed) {
191 return;
192 }
193 let listeners = undefined;
194 switch (event) {
195 case 'data':
196 listeners = this.listeners.data;
197 break;
198 case 'end':
199 listeners = this.listeners.end;
200 break;
201 case 'error':
202 listeners = this.listeners.error;
203 break;
204 }
205 if (listeners) {
206 const index = listeners.indexOf(callback);
207 if (index >= 0) {
208 listeners.splice(index, 1);
209 }
210 }
211 }
212 flowData() {
213 if (this.buffer.data.length > 0) {
214 const fullDataBuffer = this.reducer(this.buffer.data);
215 this.listeners.data.forEach(listener => listener(fullDataBuffer));
216 this.buffer.data.length = 0;
217 // When the buffer is empty, resolve all pending writers
218 const pendingWritePromises = [...this.pendingWritePromises];
219 this.pendingWritePromises.length = 0;
220 pendingWritePromises.forEach(pendingWritePromise => pendingWritePromise());
221 }
222 }
223 flowErrors() {
224 if (this.listeners.error.length > 0) {
225 for (const error of this.buffer.error) {
226 this.listeners.error.forEach(listener => listener(error));
227 }
228 this.buffer.error.length = 0;
229 }
230 }
231 flowEnd() {
232 if (this.state.ended) {
233 this.listeners.end.forEach(listener => listener());
234 return this.listeners.end.length > 0;
235 }
236 return false;
237 }
238 destroy() {
239 if (!this.state.destroyed) {
240 this.state.destroyed = true;
241 this.state.ended = true;
242 this.buffer.data.length = 0;
243 this.buffer.error.length = 0;
244 this.listeners.data.length = 0;
245 this.listeners.error.length = 0;
246 this.listeners.end.length = 0;
247 this.pendingWritePromises.length = 0;
248 }
249 }
250}
251/**
252 * Helper to fully read a T readable into a T.
253 */
254function consumeReadable(readable, reducer) {
255 const chunks = [];
256 let chunk;
257 while ((chunk = readable.read()) !== null) {
258 chunks.push(chunk);
259 }
260 return reducer(chunks);
261}
262exports.consumeReadable = consumeReadable;
263/**
264 * Helper to read a T readable up to a maximum of chunks. If the limit is
265 * reached, will return a readable instead to ensure all data can still
266 * be read.
267 */
268function consumeReadableWithLimit(readable, reducer, maxChunks) {
269 const chunks = [];
270 let chunk = undefined;
271 while ((chunk = readable.read()) !== null && chunks.length < maxChunks) {
272 chunks.push(chunk);
273 }
274 // If the last chunk is null, it means we reached the end of
275 // the readable and return all the data at once
276 if (chunk === null && chunks.length > 0) {
277 return reducer(chunks);
278 }
279 // Otherwise, we still have a chunk, it means we reached the maxChunks
280 // value and as such we return a new Readable that first returns
281 // the existing read chunks and then continues with reading from
282 // the underlying readable.
283 return {
284 read: () => {
285 // First consume chunks from our array
286 if (chunks.length > 0) {
287 return chunks.shift();
288 }
289 // Then ensure to return our last read chunk
290 if (typeof chunk !== 'undefined') {
291 const lastReadChunk = chunk;
292 // explicitly use undefined here to indicate that we consumed
293 // the chunk, which could have either been null or valued.
294 chunk = undefined;
295 return lastReadChunk;
296 }
297 // Finally delegate back to the Readable
298 return readable.read();
299 }
300 };
301}
302exports.consumeReadableWithLimit = consumeReadableWithLimit;
303/**
304 * Helper to read a T readable up to a maximum of chunks. If the limit is
305 * reached, will return a readable instead to ensure all data can still
306 * be read.
307 */
308function peekReadable(readable, reducer, maxChunks) {
309 const chunks = [];
310 let chunk = undefined;
311 while ((chunk = readable.read()) !== null && chunks.length < maxChunks) {
312 chunks.push(chunk);
313 }
314 // If the last chunk is null, it means we reached the end of
315 // the readable and return all the data at once
316 if (chunk === null && chunks.length > 0) {
317 return reducer(chunks);
318 }
319 // Otherwise, we still have a chunk, it means we reached the maxChunks
320 // value and as such we return a new Readable that first returns
321 // the existing read chunks and then continues with reading from
322 // the underlying readable.
323 return {
324 read: () => {
325 // First consume chunks from our array
326 if (chunks.length > 0) {
327 return chunks.shift();
328 }
329 // Then ensure to return our last read chunk
330 if (typeof chunk !== 'undefined') {
331 const lastReadChunk = chunk;
332 // explicitly use undefined here to indicate that we consumed
333 // the chunk, which could have either been null or valued.
334 chunk = undefined;
335 return lastReadChunk;
336 }
337 // Finally delegate back to the Readable
338 return readable.read();
339 }
340 };
341}
342exports.peekReadable = peekReadable;
343/**
344 * Helper to fully read a T stream into a T.
345 */
346function consumeStream(stream, reducer) {
347 return new Promise((resolve, reject) => {
348 const chunks = [];
349 stream.on('data', data => chunks.push(data));
350 stream.on('error', error => reject(error));
351 stream.on('end', () => resolve(reducer(chunks)));
352 });
353}
354exports.consumeStream = consumeStream;
355/**
356 * Helper to peek up to `maxChunks` into a stream. The return type signals if
357 * the stream has ended or not. If not, caller needs to add a `data` listener
358 * to continue reading.
359 */
360function peekStream(stream, maxChunks) {
361 return new Promise((resolve, reject) => {
362 const streamListeners = new disposable_1.DisposableCollection();
363 // Data Listener
364 const buffer = [];
365 const dataListener = (chunk) => {
366 // Add to buffer
367 buffer.push(chunk);
368 // We reached maxChunks and thus need to return
369 if (buffer.length > maxChunks) {
370 // Dispose any listeners and ensure to pause the
371 // stream so that it can be consumed again by caller
372 streamListeners.dispose();
373 stream.pause();
374 return resolve({ stream, buffer, ended: false });
375 }
376 };
377 streamListeners.push(disposable_1.Disposable.create(() => stream.removeListener('data', dataListener)));
378 stream.on('data', dataListener);
379 // Error Listener
380 const errorListener = (error) => reject(error);
381 streamListeners.push(disposable_1.Disposable.create(() => stream.removeListener('error', errorListener)));
382 stream.on('error', errorListener);
383 const endListener = () => resolve({ stream, buffer, ended: true });
384 streamListeners.push(disposable_1.Disposable.create(() => stream.removeListener('end', endListener)));
385 stream.on('end', endListener);
386 });
387}
388exports.peekStream = peekStream;
389/**
390 * Helper to read a T stream up to a maximum of chunks. If the limit is
391 * reached, will return a stream instead to ensure all data can still
392 * be read.
393 */
394function consumeStreamWithLimit(stream, reducer, maxChunks) {
395 return new Promise((resolve, reject) => {
396 const chunks = [];
397 let wrapperStream = undefined;
398 stream.on('data', data => {
399 // If we reach maxChunks, we start to return a stream
400 // and make sure that any data we have already read
401 // is in it as well
402 if (!wrapperStream && chunks.length === maxChunks) {
403 wrapperStream = newWriteableStream(reducer);
404 while (chunks.length) {
405 wrapperStream.write(chunks.shift());
406 }
407 wrapperStream.write(data);
408 return resolve(wrapperStream);
409 }
410 if (wrapperStream) {
411 wrapperStream.write(data);
412 }
413 else {
414 chunks.push(data);
415 }
416 });
417 stream.on('error', error => {
418 if (wrapperStream) {
419 wrapperStream.error(error);
420 }
421 else {
422 return reject(error);
423 }
424 });
425 stream.on('end', () => {
426 if (wrapperStream) {
427 while (chunks.length) {
428 wrapperStream.write(chunks.shift());
429 }
430 wrapperStream.end();
431 }
432 else {
433 return resolve(reducer(chunks));
434 }
435 });
436 });
437}
438exports.consumeStreamWithLimit = consumeStreamWithLimit;
439/**
440 * Helper to create a readable stream from an existing T.
441 */
442function toStream(t, reducer) {
443 const stream = newWriteableStream(reducer);
444 stream.end(t);
445 return stream;
446}
447exports.toStream = toStream;
448/**
449 * Helper to convert a T into a Readable<T>.
450 */
451function toReadable(t) {
452 let consumed = false;
453 return {
454 read: () => {
455 if (consumed) {
456 return null;
457 }
458 consumed = true;
459 return t;
460 }
461 };
462}
463exports.toReadable = toReadable;
464/**
465 * Helper to transform a readable stream into another stream.
466 */
467function transform(stream, transformer, reducer) {
468 const target = newWriteableStream(reducer);
469 stream.on('data', data => target.write(transformer.data(data)));
470 stream.on('end', () => target.end());
471 stream.on('error', error => target.error(transformer.error ? transformer.error(error) : error));
472 return target;
473}
474exports.transform = transform;
475//# sourceMappingURL=stream.js.map
\No newline at end of file