UNPKG

2.84 kBJavaScriptView Raw
1import { AsyncIterableX } from './asynciterablex';
2function assertNever(value) {
3 throw new Error(`Unhandled discriminated union member ${value}`);
4}
5class BatchAsyncIterable extends AsyncIterableX {
6 constructor(source) {
7 super();
8 this._source = source;
9 }
10 [Symbol.asyncIterator]() {
11 const it = this._source[Symbol.asyncIterator]();
12 let state = { type: 'batching', values: [] };
13 let ended = null;
14 function consumeNext() {
15 it.next().then(res => {
16 if (res.done) {
17 ended = Promise.resolve({ done: true });
18 if (state.type === 'waiting') {
19 state.resolver.resolve(ended);
20 }
21 }
22 else {
23 if (state.type === 'waiting') {
24 const { resolve } = state.resolver;
25 state = { type: 'batching', values: [] };
26 resolve({ done: res.done, value: [res.value] });
27 }
28 else if (state.type === 'batching') {
29 state.values.push(res.value);
30 }
31 else {
32 assertNever(state);
33 }
34 consumeNext();
35 }
36 }, err => {
37 ended = Promise.reject(err);
38 if (state.type === 'waiting') {
39 const { reject } = state.resolver;
40 reject(err);
41 }
42 });
43 }
44 consumeNext();
45 return {
46 next() {
47 if (state.type === 'batching' && state.values.length > 0) {
48 const { values } = state;
49 state.values = [];
50 return Promise.resolve({ done: false, value: values });
51 }
52 if (ended) {
53 return ended;
54 }
55 if (state.type === 'waiting') {
56 throw new Error('Previous `next()` is still in progress');
57 }
58 return new Promise((resolve, reject) => {
59 state = {
60 type: 'waiting',
61 resolver: { resolve, reject }
62 };
63 });
64 },
65 return(value) {
66 return it.return
67 ? it.return(value).then(() => ({ done: true }))
68 : Promise.resolve({ done: true });
69 }
70 };
71 }
72}
73/**
74 * Returns an async iterable sequence of batches that are collected from the source sequence between
75 * subsequent `next()` calls.
76 */
77export function batch(source) {
78 return new BatchAsyncIterable(source);
79}
80
81//# sourceMappingURL=batch.mjs.map