1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 | import {fs} from 'mz';
|
16 | import {Deferred} from 'polymer-analyzer/lib/core/utils';
|
17 | import {PassThrough, Transform, TransformCallback} from 'stream';
|
18 |
|
19 | import File = require('vinyl');
|
20 |
|
21 | const multipipe = require('multipipe');
|
22 |
|
23 | if (Symbol.asyncIterator === undefined) {
|
24 |
|
25 | (Symbol as any).asyncIterator = Symbol('asyncIterator');
|
26 | }
|
27 |
|
28 |
|
29 |
|
30 |
|
31 | export function waitFor(stream: NodeJS.ReadableStream):
|
32 | Promise<NodeJS.ReadableStream> {
|
33 | return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
|
34 | stream.on('end', resolve);
|
35 | stream.on('error', reject);
|
36 | });
|
37 | }
|
38 |
|
39 | /**
|
40 | * Waits for all the given ReadableStreams
|
41 | */
|
42 | export function waitForAll(streams: NodeJS.ReadableStream[]):
|
43 | Promise<NodeJS.ReadableStream[]> {
|
44 | return Promise.all<NodeJS.ReadableStream>(streams.map((s) => waitFor(s)));
|
45 | }
|
46 |
|
47 | /**
|
48 | * Returns the string contents of a Vinyl File object, waiting for
|
49 | * all chunks if the File is a stream.
|
50 | */
|
51 | export async function getFileContents(file: File): Promise<string> {
|
52 | if (file.isBuffer()) {
|
53 | return file.contents.toString('utf-8');
|
54 | } else if (file.isStream()) {
|
55 | const stream = file.contents;
|
56 | stream.setEncoding('utf-8');
|
57 | const contents: string[] = [];
|
58 | stream.on('data', (chunk: string) => contents.push(chunk));
|
59 |
|
60 | return new Promise<string>((resolve, reject) => {
|
61 | stream.on('end', () => resolve(contents.join('')));
|
62 | stream.on('error', reject);
|
63 | });
|
64 | }
|
65 | throw new Error(
|
66 | `Unable to get contents of file ${file.path}. ` +
|
67 | `It has neither a buffer nor a stream.`);
|
68 | }
|
69 |
|
70 | /**
|
71 | * Composes multiple streams (or Transforms) into one.
|
72 | */
|
73 | export function compose(streams: NodeJS.ReadWriteStream[]) {
|
74 | if (streams && streams.length > 0) {
|
75 | return multipipe(streams);
|
76 | } else {
|
77 | return new PassThrough({objectMode: true});
|
78 | }
|
79 | }
|
80 |
|
81 |
|
82 | /**
|
83 | * An asynchronous queue that is read as an async iterable.
|
84 | */
|
85 | class AsyncQueue<V> implements AsyncIterable<V> {
|
86 | private blockedOn: Deferred<IteratorResult<V>>|undefined = undefined;
|
87 | backlog: Array<{value: IteratorResult<V>, deferred: Deferred<void>}> = [];
|
88 | private _closed = false;
|
89 | private _finished = false;
|
90 |
|
91 | /**
|
92 | * Add the given value onto the queue.
|
93 | *
|
94 | * The return value of this method resolves once the value has been removed
|
95 | * from the queue. Useful for flow control.
|
96 | *
|
97 | * Must not be called after the queue has been closed.
|
98 | */
|
99 | async write(value: V) {
|
100 | if (this._closed) {
|
101 | throw new Error('Wrote to closed writable iterable');
|
102 | }
|
103 | return this._write({value, done: false});
|
104 | }
|
105 |
|
106 | /**
|
107 | * True once the queue has been closed and all input has been read from it.
|
108 | */
|
109 | get finished() {
|
110 | return this._finished;
|
111 | }
|
112 |
|
113 | /**
|
114 | * Close the queue, indicating that no more values will be written.
|
115 | *
|
116 | * If this method is not called, a consumer iterating over the values will
|
117 | * wait forever.
|
118 | *
|
119 | * The returned promise resolves once the consumer has been notified of the
|
120 | * end of the queue.
|
121 | */
|
122 | async close() {
|
123 | this._closed = true;
|
124 | return this._write({done: true} as IteratorResult<V>);
|
125 | }
|
126 |
|
127 | private async _write(value: IteratorResult<V>) {
|
128 | if (this.blockedOn) {
|
129 | this.blockedOn.resolve(value);
|
130 | this.blockedOn = undefined;
|
131 | } else {
|
132 | const deferred = new Deferred<void>();
|
133 | this.backlog.push({value, deferred});
|
134 | await deferred.promise;
|
135 | }
|
136 | }
|
137 |
|
138 | /**
|
139 | * Iterate over values in the queue. Not intended for multiple readers.
|
140 | * In the case where there are multiple readers, some values may be received
|
141 | * by multiple readers, but all values will be seen by at least one reader.
|
142 | */
|
143 | async * [Symbol.asyncIterator](): AsyncIterator<V> {
|
144 | while (true) {
|
145 | let value;
|
146 | const maybeValue = this.backlog.shift();
|
147 | if (maybeValue) {
|
148 | maybeValue.deferred.resolve(undefined);
|
149 | value = maybeValue.value;
|
150 | } else {
|
151 | this.blockedOn = new Deferred();
|
152 | value = await this.blockedOn.promise;
|
153 | }
|
154 | if (value.done) {
|
155 | this._finished = true;
|
156 | this._write(value);
|
157 | return;
|
158 | } else {
|
159 | yield value.value;
|
160 | }
|
161 | }
|
162 | }
|
163 | }
|
164 |
|
165 | /**
|
166 | * Implements `stream.Transform` via standard async iteration.
|
167 | *
|
168 | * The main advantage over implementing stream.Transform itself is that correct
|
169 | * error handling is built in and easy to get right, simply by using
|
170 | * async/await.
|
171 | *
|
172 | * `In` and `Out` extend `{}` because they may not be `null`.
|
173 | */
|
174 | export abstract class AsyncTransformStream<In extends {}, Out extends {}>
|
175 | extends Transform {
|
176 | private readonly _inputs = new AsyncQueue<In>();
|
177 |
|
178 | /**
|
179 | * Implement this method!
|
180 | *
|
181 | * Read from the given iterator to consume input, yield values to write
|
182 | * chunks of your own. You may yield any number of values for each input.
|
183 | *
|
184 | * Note: currently you *must* completely consume `inputs` and return for this
|
185 | * stream to close.
|
186 | */
|
187 | protected abstract _transformIter(inputs: AsyncIterable<In>):
|
188 | AsyncIterable<Out>;
|
189 |
|
190 | private _initialized = false;
|
191 | private _writingFinished = new Deferred<void>();
|
192 | private _initializeOnce() {
|
193 | if (this._initialized === false) {
|
194 | this._initialized = true;
|
195 | const transformDonePromise = (async () => {
|
196 | for await (const value of this._transformIter(this._inputs)) {
|
197 |
|
198 |
|
199 | this.push(value);
|
200 | }
|
201 | })();
|
202 | transformDonePromise.then(() => {
|
203 | if (this._inputs.finished) {
|
204 | this._writingFinished.resolve(undefined);
|
205 | } else {
|
206 | this.emit(
|
207 | 'error',
|
208 | new Error(
|
209 | `${this.constructor.name}` +
|
210 | ` did not consume all input while transforming.`));
|
211 |
|
212 |
|
213 |
|
214 | this.push(null);
|
215 | }
|
216 | }, (err) => this.emit('error', err));
|
217 | }
|
218 | }
|
219 |
|
220 | /**
|
221 | * Don't override.
|
222 | *
|
223 | * Passes input into this._inputs.
|
224 | */
|
225 | _transform(
|
226 | input: In, _encoding: string,
|
227 | callback: (error?: Error, value?: Out) => void) {
|
228 | this._initializeOnce();
|
229 | this._inputs.write(input).then(() => {
|
230 | callback();
|
231 | }, (err) => callback(err));
|
232 | }
|
233 |
|
234 |
|
235 | /**
|
236 | * Don't override.
|
237 | *
|
238 | * Finish writing out the outputs.
|
239 | */
|
240 | async _flush(callback: TransformCallback) {
|
241 | try {
|
242 | // We won't get any more inputs. Wait for them all to be processed.
|
243 | await this._inputs.close();
|
244 | // Wait for all of our output to be written.
|
245 | await this._writingFinished.promise;
|
246 | callback();
|
247 | } catch (e) {
|
248 | callback(e);
|
249 | }
|
250 | }
|
251 | }
|
252 |
|
253 | /**
|
254 | * A stream that takes file path strings, and outputs full Vinyl file objects
|
255 | * for the file at each location.
|
256 | */
|
257 | export class VinylReaderTransform extends AsyncTransformStream<string, File> {
|
258 | constructor() {
|
259 | super({objectMode: true});
|
260 | }
|
261 |
|
262 | protected async *
|
263 | _transformIter(paths: AsyncIterable<string>): AsyncIterable<File> {
|
264 | for await (const filePath of paths) {
|
265 | yield new File({path: filePath, contents: await fs.readFile(filePath)});
|
266 | }
|
267 | }
|
268 | }
|
269 |
|
270 | export type PipeStream = (NodeJS.ReadableStream|NodeJS.WritableStream|
|
271 | NodeJS.ReadableStream[]|NodeJS.WritableStream[]);
|
272 |
|
273 | /**
|
274 | * pipeStreams() takes in a collection streams and pipes them together,
|
275 | * returning the last stream in the pipeline. Each element in the `streams`
|
276 | * array must be either a stream, or an array of streams (see PipeStream).
|
277 | * pipeStreams() will then flatten this array before piping them all together.
|
278 | */
|
279 | export function pipeStreams(streams: PipeStream[]): NodeJS.ReadableStream {
|
280 | return Array.prototype.concat.apply([], streams)
|
281 | .reduce((a: NodeJS.ReadableStream, b: NodeJS.ReadWriteStream) => {
|
282 | return a.pipe(b);
|
283 | });
|
284 | }
|
285 |
|
\ | No newline at end of file |