UNPKG

5.96 kBJavaScriptView Raw
1'use strict';
2
3var _Object$setPrototypeO;
4
5function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
6
7var finished = require('./end-of-stream');
8
9var kLastResolve = Symbol('lastResolve');
10var kLastReject = Symbol('lastReject');
11var kError = Symbol('error');
12var kEnded = Symbol('ended');
13var kLastPromise = Symbol('lastPromise');
14var kHandlePromise = Symbol('handlePromise');
15var kStream = Symbol('stream');
16
17function createIterResult(value, done) {
18 return {
19 value: value,
20 done: done
21 };
22}
23
24function readAndResolve(iter) {
25 var resolve = iter[kLastResolve];
26
27 if (resolve !== null) {
28 var data = iter[kStream].read(); // we defer if data is null
29 // we can be expecting either 'end' or
30 // 'error'
31
32 if (data !== null) {
33 iter[kLastPromise] = null;
34 iter[kLastResolve] = null;
35 iter[kLastReject] = null;
36 resolve(createIterResult(data, false));
37 }
38 }
39}
40
41function onReadable(iter) {
42 // we wait for the next tick, because it might
43 // emit an error with process.nextTick
44 process.nextTick(readAndResolve, iter);
45}
46
47function wrapForNext(lastPromise, iter) {
48 return function (resolve, reject) {
49 lastPromise.then(function () {
50 if (iter[kEnded]) {
51 resolve(createIterResult(undefined, true));
52 return;
53 }
54
55 iter[kHandlePromise](resolve, reject);
56 }, reject);
57 };
58}
59
60var AsyncIteratorPrototype = Object.getPrototypeOf(function () {});
61var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPrototypeO = {
62 get stream() {
63 return this[kStream];
64 },
65
66 next: function next() {
67 var _this = this;
68
69 // if we have detected an error in the meanwhile
70 // reject straight away
71 var error = this[kError];
72
73 if (error !== null) {
74 return Promise.reject(error);
75 }
76
77 if (this[kEnded]) {
78 return Promise.resolve(createIterResult(undefined, true));
79 }
80
81 if (this[kStream].destroyed) {
82 // We need to defer via nextTick because if .destroy(err) is
83 // called, the error will be emitted via nextTick, and
84 // we cannot guarantee that there is no error lingering around
85 // waiting to be emitted.
86 return new Promise(function (resolve, reject) {
87 process.nextTick(function () {
88 if (_this[kError]) {
89 reject(_this[kError]);
90 } else {
91 resolve(createIterResult(undefined, true));
92 }
93 });
94 });
95 } // if we have multiple next() calls
96 // we will wait for the previous Promise to finish
97 // this logic is optimized to support for await loops,
98 // where next() is only called once at a time
99
100
101 var lastPromise = this[kLastPromise];
102 var promise;
103
104 if (lastPromise) {
105 promise = new Promise(wrapForNext(lastPromise, this));
106 } else {
107 // fast path needed to support multiple this.push()
108 // without triggering the next() queue
109 var data = this[kStream].read();
110
111 if (data !== null) {
112 return Promise.resolve(createIterResult(data, false));
113 }
114
115 promise = new Promise(this[kHandlePromise]);
116 }
117
118 this[kLastPromise] = promise;
119 return promise;
120 }
121}, _defineProperty(_Object$setPrototypeO, Symbol.asyncIterator, function () {
122 return this;
123}), _defineProperty(_Object$setPrototypeO, "return", function _return() {
124 var _this2 = this;
125
126 // destroy(err, cb) is a private API
127 // we can guarantee we have that here, because we control the
128 // Readable class this is attached to
129 return new Promise(function (resolve, reject) {
130 _this2[kStream].destroy(null, function (err) {
131 if (err) {
132 reject(err);
133 return;
134 }
135
136 resolve(createIterResult(undefined, true));
137 });
138 });
139}), _Object$setPrototypeO), AsyncIteratorPrototype);
140
141var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterator(stream) {
142 var _Object$create;
143
144 var iterator = Object.create(ReadableStreamAsyncIteratorPrototype, (_Object$create = {}, _defineProperty(_Object$create, kStream, {
145 value: stream,
146 writable: true
147 }), _defineProperty(_Object$create, kLastResolve, {
148 value: null,
149 writable: true
150 }), _defineProperty(_Object$create, kLastReject, {
151 value: null,
152 writable: true
153 }), _defineProperty(_Object$create, kError, {
154 value: null,
155 writable: true
156 }), _defineProperty(_Object$create, kEnded, {
157 value: stream._readableState.endEmitted,
158 writable: true
159 }), _defineProperty(_Object$create, kHandlePromise, {
160 value: function value(resolve, reject) {
161 var data = iterator[kStream].read();
162
163 if (data) {
164 iterator[kLastPromise] = null;
165 iterator[kLastResolve] = null;
166 iterator[kLastReject] = null;
167 resolve(createIterResult(data, false));
168 } else {
169 iterator[kLastResolve] = resolve;
170 iterator[kLastReject] = reject;
171 }
172 },
173 writable: true
174 }), _Object$create));
175 iterator[kLastPromise] = null;
176 finished(stream, function (err) {
177 if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
178 var reject = iterator[kLastReject]; // reject if we are waiting for data in the Promise
179 // returned by next() and store the error
180
181 if (reject !== null) {
182 iterator[kLastPromise] = null;
183 iterator[kLastResolve] = null;
184 iterator[kLastReject] = null;
185 reject(err);
186 }
187
188 iterator[kError] = err;
189 return;
190 }
191
192 var resolve = iterator[kLastResolve];
193
194 if (resolve !== null) {
195 iterator[kLastPromise] = null;
196 iterator[kLastResolve] = null;
197 iterator[kLastReject] = null;
198 resolve(createIterResult(undefined, true));
199 }
200
201 iterator[kEnded] = true;
202 });
203 stream.on('readable', onReadable.bind(null, iterator));
204 return iterator;
205};
206
207module.exports = createReadableStreamAsyncIterator;
\No newline at end of file