UNPKG

6.57 kBJavaScriptView Raw
1import { publish } from './operators/publish';
2import { fromDOMStream } from './fromdomstream';
3import { AsyncIterableX } from './asynciterablex';
4/** @ignore */
5function memcpy(target, source, targetByteOffset = 0, sourceByteLength = source.byteLength) {
6 const targetByteLength = target.byteLength;
7 const dst = new Uint8Array(target.buffer, target.byteOffset, targetByteLength);
8 const src = new Uint8Array(source.buffer, source.byteOffset, Math.min(sourceByteLength, targetByteLength, source.buffer.byteLength - source.byteOffset));
9 dst.set(src, targetByteOffset);
10 return src.byteLength;
11}
12class AbstractUnderlyingSource {
13 constructor(_source) {
14 this._source = _source;
15 }
16 async cancel() {
17 const source = this._source;
18 if (source && source.return) {
19 await source.return();
20 }
21 this._source = null;
22 }
23}
24class UnderlyingAsyncIterableDefaultSource extends AbstractUnderlyingSource {
25 constructor(source) {
26 super(source);
27 }
28 // eslint-disable-next-line consistent-return
29 async pull(controller) {
30 const source = this._source;
31 if (source) {
32 const r = await source.next(controller.desiredSize);
33 if (!r.done) {
34 return controller.enqueue(r.value);
35 }
36 }
37 controller.close();
38 }
39}
40class UnderlyingAsyncIterableByteSource extends AbstractUnderlyingSource {
41 constructor(reader, opts = {}) {
42 super(reader);
43 this.type = 'bytes';
44 this.autoAllocateChunkSize = opts.autoAllocateChunkSize;
45 this.fallbackDefaultSource = new UnderlyingAsyncIterableDefaultSource(reader);
46 }
47 // eslint-disable-next-line consistent-return
48 async pull(controller) {
49 if (!controller.byobRequest) {
50 return await this.fallbackDefaultSource.pull(controller);
51 }
52 if (this._source) {
53 const { view } = controller.byobRequest;
54 const { done, value } = await this._source.next(view);
55 if (!done) {
56 // Did the source write into the BYOB view itself,
57 // then yield us the `bytesWritten` value? If so,
58 // pass that along
59 if (typeof value === 'number') {
60 return controller.byobRequest.respond(value);
61 }
62 // otherwise if the source is only producing buffers
63 // but doesn't expect to be given one, we should copy
64 // the produced buffer into the front of the BYOB view
65 if (ArrayBuffer.isView(value)) {
66 return value.buffer === view.buffer
67 ? controller.byobRequest.respondWithNewView(value)
68 : controller.byobRequest.respond(memcpy(view, value));
69 }
70 }
71 }
72 controller.close();
73 }
74}
75// Generate subclasses of ReadableStream that conform to the
76// AsyncIterable protocol. These classes are dynamically created
77// the first time a ReadableStream is produced because ReadableStream
78// is a browser-only API, and closure-compiler won't compile if they're
79// statically defined at the module scope.
80/** @ignore */
81const asyncIterableReadableStream = (() => {
82 let AsyncIterableReadableByteStream_;
83 let AsyncIterableDefaultReadableStream_;
84 // A function that's called the first time someone creates a
85 // ReadableStream via `toDOMStream()`
86 const createFirstTime = (source, opts) => {
87 // Generate the subclasses with [Symbol.asyncIterator]() methods
88 class AsyncIterableDefaultReadableStream extends ReadableStream {
89 [Symbol.asyncIterator]() {
90 return fromDOMStream(this)[Symbol.asyncIterator]();
91 }
92 }
93 class AsyncIterableReadableByteStream extends ReadableStream {
94 [Symbol.asyncIterator]() {
95 return fromDOMStream(this, { mode: 'byob' })[Symbol.asyncIterator]();
96 }
97 }
98 AsyncIterableReadableByteStream_ = AsyncIterableReadableByteStream;
99 AsyncIterableDefaultReadableStream_ = AsyncIterableDefaultReadableStream;
100 // Now point `createAsyncIterableReadableStream` to the function that
101 // instantiates the classes we just created
102 // eslint-disable-next-line @typescript-eslint/no-use-before-define, no-use-before-define
103 createAsyncIterableReadableStream = createAsyncIterableReadableStreamEveryOtherTime;
104 // Create and return the first ReadableStream<T> instance
105 // eslint-disable-next-line @typescript-eslint/no-use-before-define, no-use-before-define
106 return createAsyncIterableReadableStreamEveryOtherTime(source, opts);
107 };
108 // Shared function pointer that's called by the wrapper closure we return
109 let createAsyncIterableReadableStream = createFirstTime;
110 // Create instances of the classes generated by `createFirstTime`
111 const createAsyncIterableReadableStreamEveryOtherTime = (source, opts) => {
112 return source instanceof UnderlyingAsyncIterableByteSource
113 ? new AsyncIterableReadableByteStream_(source, opts)
114 : new AsyncIterableDefaultReadableStream_(source, opts);
115 };
116 return (source, opts) => createAsyncIterableReadableStream(source, opts);
117})();
118/**
119 * Converts an async-iterable stream to a DOM stream.
120 * @param source The async-iterable stream to convert to a DOM stream.
121 * @param options The options to apply to the DOM stream.
122 */
123export function toDOMStream(source, options) {
124 if (!options || !('type' in options) || options['type'] !== 'bytes') {
125 return asyncIterableReadableStream(new UnderlyingAsyncIterableDefaultSource(source[Symbol.asyncIterator]()), options);
126 }
127 return asyncIterableReadableStream(new UnderlyingAsyncIterableByteSource(source[Symbol.asyncIterator]()), options);
128}
129AsyncIterableX.prototype.tee = function () {
130 return _getDOMStream(this).tee();
131};
132AsyncIterableX.prototype.pipeTo = function (writable, options) {
133 return _getDOMStream(this).pipeTo(writable, options);
134};
135AsyncIterableX.prototype.pipeThrough = function (duplex, options) {
136 return _getDOMStream(this).pipeThrough(duplex, options);
137};
138function _getDOMStream(self) {
139 return self._DOMStream || (self._DOMStream = self.pipe(publish(), toDOMStream));
140}
141export function toDOMStreamProto(options) {
142 return !options ? toDOMStream(this) : toDOMStream(this, options);
143}
144AsyncIterableX.prototype.toDOMStream = toDOMStreamProto;
145
146//# sourceMappingURL=todomstream.mjs.map