UNPKG

8.61 kBPlain TextView Raw
1/**
2 * @license
3 * Copyright (c) 2016 The Polymer Project Authors. All rights reserved.
4 * This code may only be used under the BSD style license found at
5 * http://polymer.github.io/LICENSE.txt
6 * The complete set of authors may be found at
7 * http://polymer.github.io/AUTHORS.txt
8 * The complete set of contributors may be found at
9 * http://polymer.github.io/CONTRIBUTORS.txt
10 * Code distributed by Google as part of the polymer project is also
11 * subject to an additional IP rights grant found at
12 * http://polymer.github.io/PATENTS.txt
13 */
14
15import {fs} from 'mz';
16import {Deferred} from 'polymer-analyzer/lib/core/utils';
17import {PassThrough, Transform, TransformCallback} from 'stream';
18
19import File = require('vinyl');
20
21const multipipe = require('multipipe');
22
23if (Symbol.asyncIterator === undefined) {
24 // tslint:disable-next-line: no-any polyfilling.
25 (Symbol as any).asyncIterator = Symbol('asyncIterator');
26}
27
28/**
29 * Waits for the given ReadableStream
30 */
31export function waitFor(stream: NodeJS.ReadableStream):
32 Promise<NodeJS.ReadableStream> {
33 return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
34 stream.on('end', resolve);
35 stream.on('error', reject);
36 });
37}
38
39/**
40 * Waits for all the given ReadableStreams
41 */
42export function waitForAll(streams: NodeJS.ReadableStream[]):
43 Promise<NodeJS.ReadableStream[]> {
44 return Promise.all<NodeJS.ReadableStream>(streams.map((s) => waitFor(s)));
45}
46
47/**
48 * Returns the string contents of a Vinyl File object, waiting for
49 * all chunks if the File is a stream.
50 */
51export async function getFileContents(file: File): Promise<string> {
52 if (file.isBuffer()) {
53 return file.contents.toString('utf-8');
54 } else if (file.isStream()) {
55 const stream = file.contents;
56 stream.setEncoding('utf-8');
57 const contents: string[] = [];
58 stream.on('data', (chunk: string) => contents.push(chunk));
59
60 return new Promise<string>((resolve, reject) => {
61 stream.on('end', () => resolve(contents.join('')));
62 stream.on('error', reject);
63 });
64 }
65 throw new Error(
66 `Unable to get contents of file ${file.path}. ` +
67 `It has neither a buffer nor a stream.`);
68}
69
70/**
71 * Composes multiple streams (or Transforms) into one.
72 */
73export function compose(streams: NodeJS.ReadWriteStream[]) {
74 if (streams && streams.length > 0) {
75 return multipipe(streams);
76 } else {
77 return new PassThrough({objectMode: true});
78 }
79}
80
81
82/**
83 * An asynchronous queue that is read as an async iterable.
84 */
85class AsyncQueue<V> implements AsyncIterable<V> {
86 private blockedOn: Deferred<IteratorResult<V>>|undefined = undefined;
87 backlog: Array<{value: IteratorResult<V>, deferred: Deferred<void>}> = [];
88 private _closed = false;
89 private _finished = false;
90
91 /**
92 * Add the given value onto the queue.
93 *
94 * The return value of this method resolves once the value has been removed
95 * from the queue. Useful for flow control.
96 *
97 * Must not be called after the queue has been closed.
98 */
99 async write(value: V) {
100 if (this._closed) {
101 throw new Error('Wrote to closed writable iterable');
102 }
103 return this._write({value, done: false});
104 }
105
106 /**
107 * True once the queue has been closed and all input has been read from it.
108 */
109 get finished() {
110 return this._finished;
111 }
112
113 /**
114 * Close the queue, indicating that no more values will be written.
115 *
116 * If this method is not called, a consumer iterating over the values will
117 * wait forever.
118 *
119 * The returned promise resolves once the consumer has been notified of the
120 * end of the queue.
121 */
122 async close() {
123 this._closed = true;
124 return this._write({done: true} as IteratorResult<V>);
125 }
126
127 private async _write(value: IteratorResult<V>) {
128 if (this.blockedOn) {
129 this.blockedOn.resolve(value);
130 this.blockedOn = undefined;
131 } else {
132 const deferred = new Deferred<void>();
133 this.backlog.push({value, deferred});
134 await deferred.promise;
135 }
136 }
137
138 /**
139 * Iterate over values in the queue. Not intended for multiple readers.
140 * In the case where there are multiple readers, some values may be received
141 * by multiple readers, but all values will be seen by at least one reader.
142 */
143 async * [Symbol.asyncIterator](): AsyncIterator<V> {
144 while (true) {
145 let value;
146 const maybeValue = this.backlog.shift();
147 if (maybeValue) {
148 maybeValue.deferred.resolve(undefined);
149 value = maybeValue.value;
150 } else {
151 this.blockedOn = new Deferred();
152 value = await this.blockedOn.promise;
153 }
154 if (value.done) {
155 this._finished = true;
156 this._write(value);
157 return;
158 } else {
159 yield value.value;
160 }
161 }
162 }
163}
164
165/**
166 * Implements `stream.Transform` via standard async iteration.
167 *
168 * The main advantage over implementing stream.Transform itself is that correct
169 * error handling is built in and easy to get right, simply by using
170 * async/await.
171 *
172 * `In` and `Out` extend `{}` because they may not be `null`.
173 */
174export abstract class AsyncTransformStream<In extends {}, Out extends {}>
175 extends Transform {
176 private readonly _inputs = new AsyncQueue<In>();
177
178 /**
179 * Implement this method!
180 *
181 * Read from the given iterator to consume input, yield values to write
182 * chunks of your own. You may yield any number of values for each input.
183 *
184 * Note: currently you *must* completely consume `inputs` and return for this
185 * stream to close.
186 */
187 protected abstract _transformIter(inputs: AsyncIterable<In>):
188 AsyncIterable<Out>;
189
190 private _initialized = false;
191 private _writingFinished = new Deferred<void>();
192 private _initializeOnce() {
193 if (this._initialized === false) {
194 this._initialized = true;
195 const transformDonePromise = (async () => {
196 for await (const value of this._transformIter(this._inputs)) {
197 // TODO(rictic): if `this.push` returns false, should we wait until
198 // we get a drain event to keep iterating?
199 this.push(value);
200 }
201 })();
202 transformDonePromise.then(() => {
203 if (this._inputs.finished) {
204 this._writingFinished.resolve(undefined);
205 } else {
206 this.emit(
207 'error',
208 new Error(
209 `${this.constructor.name}` +
210 ` did not consume all input while transforming.`));
211 // Since _transformIter has exited, but not all input was consumed,
212 // this._flush won't be called. We need to signal manually that
213 // no more output will be written by this stream.
214 this.push(null);
215 }
216 }, (err) => this.emit('error', err));
217 }
218 }
219
220 /**
221 * Don't override.
222 *
223 * Passes input into this._inputs.
224 */
225 _transform(
226 input: In, _encoding: string,
227 callback: (error?: Error, value?: Out) => void) {
228 this._initializeOnce();
229 this._inputs.write(input).then(() => {
230 callback();
231 }, (err) => callback(err));
232 }
233
234
235 /**
236 * Don't override.
237 *
238 * Finish writing out the outputs.
239 */
240 async _flush(callback: TransformCallback) {
241 try {
242 // We won't get any more inputs. Wait for them all to be processed.
243 await this._inputs.close();
244 // Wait for all of our output to be written.
245 await this._writingFinished.promise;
246 callback();
247 } catch (e) {
248 callback(e);
249 }
250 }
251}
252
253/**
254 * A stream that takes file path strings, and outputs full Vinyl file objects
255 * for the file at each location.
256 */
257export class VinylReaderTransform extends AsyncTransformStream<string, File> {
258 constructor() {
259 super({objectMode: true});
260 }
261
262 protected async *
263 _transformIter(paths: AsyncIterable<string>): AsyncIterable<File> {
264 for await (const filePath of paths) {
265 yield new File({path: filePath, contents: await fs.readFile(filePath)});
266 }
267 }
268}
269
270export type PipeStream = (NodeJS.ReadableStream|NodeJS.WritableStream|
271 NodeJS.ReadableStream[]|NodeJS.WritableStream[]);
272
273/**
274 * pipeStreams() takes in a collection streams and pipes them together,
275 * returning the last stream in the pipeline. Each element in the `streams`
276 * array must be either a stream, or an array of streams (see PipeStream).
277 * pipeStreams() will then flatten this array before piping them all together.
278 */
279export function pipeStreams(streams: PipeStream[]): NodeJS.ReadableStream {
280 return Array.prototype.concat.apply([], streams)
281 .reduce((a: NodeJS.ReadableStream, b: NodeJS.ReadWriteStream) => {
282 return a.pipe(b);
283 });
284}
285
\No newline at end of file