UNPKG

5.83 kBPlain TextView Raw
1import Promise = require('bluebird');
2import _ = require('lodash');
3import types = require('../types');
4import FiberMgr = require('./fiberManager');
5import RunContext = require('./runContext');
6import Semaphore = require('./semaphore');
7import Config = require('./config');
8import defer = require('./defer');
9import await = require('../await/index');
10export = AsyncIterator;
11
12
13/**
14 * Asynchronous analogue to an ES6 Iterator. Rather than return each value/done
15 * result synchronously, the next() function notifies a promise and/or callback
16 * when the next result is ready.
17 */
18class AsyncIterator {
19
20 /** Construct a new AsyncIterator instance. This will create a fiber. */
21 constructor(runContext: RunContext, semaphore: Semaphore, returnValue: string, acceptsCallback: boolean) {
22 this._runContext = runContext;
23 this._semaphore = semaphore;
24 this._fiber = FiberMgr.create();
25 this._returnValue = returnValue;
26 this._acceptsCallback = acceptsCallback;
27 }
28
29 /** Fetch the next result from the iterator. */
30 next(callback?: (err, result) => void) {
31
32 // Configure the run context.
33 if (this._acceptsCallback) {
34 this._runContext.callback = callback; // May be null, in which case it won't be used.
35 }
36 if (this._returnValue !== Config.NONE) {
37 var resolver = defer();
38 this._runContext.resolver = resolver;
39 }
40
41 // Remove concurrency restrictions for nested calls, to avoid race conditions.
42 if (FiberMgr.isExecutingInFiber()) this._semaphore = Semaphore.unlimited;
43
44 // Run the fiber until it either yields a value or completes. For thunks, this is a lazy operation.
45 if (this._returnValue === Config.THUNK) {
46 var thunk: types.Thunk<any> = (done?) => {
47 if (done) resolver.promise.then(val => done(null, val), err => done(err));
48 this._semaphore.enter(() => this._fiber.run(this._runContext));
49 this._runContext.done = () => this._semaphore.leave();
50 };
51 } else {
52 this._semaphore.enter(() => this._fiber.run(this._runContext));
53 this._runContext.done = () => this._semaphore.leave();
54 }
55
56 // Return the appropriate value.
57 switch (this._returnValue) {
58 case Config.PROMISE: return resolver.promise;
59 case Config.THUNK: return thunk;
60 case Config.RESULT: return await (resolver.promise);
61 case Config.NONE: return;
62 }
63 }
64
65 /** Enumerate the entire iterator, calling callback with each result. */
66 forEach(callback: (value) => void, doneCallback?: (err?) => void): any {
67
68 // Create a function that calls next() in an asynchronous loop until the iteration is complete.
69 var run, runCtx = this._runContext;
70 if (this._returnValue === Config.RESULT) run = () => stepAwaited(() => this.next());
71 else if (this._returnValue === Config.THUNK) run = () => this.next()(stepCallback);
72 else if (this._acceptsCallback) run = () => this.next(stepCallback);
73 else run = () => this.next().then(stepResolved, endOfIteration);
74
75 // Configure the resolver and callback to be invoked at the end of the iteration.
76 if (this._returnValue === Config.PROMISE || this._returnValue === Config.THUNK) {
77 var doneResolver = defer();
78 }
79 if (!this._acceptsCallback) doneCallback = null;
80
81 // Execute the entire iteration. For thunks, this is a lazy operation.
82 if (this._returnValue === Config.THUNK) {
83 var thunk: types.Thunk<any> = (done?) => {
84 if (done) doneResolver.promise.then(val => done(null, val), err => done(err));
85 run();
86 }
87 } else {
88 run();
89 }
90
91 // Return the appropriate value.
92 switch (this._returnValue) {
93 case Config.PROMISE: return doneResolver.promise;
94 case Config.THUNK: return thunk;
95 case Config.RESULT: return undefined;
96 case Config.NONE: return undefined;
97 }
98
99 // These functions handle stepping through and finalising the iteration.
100 function stepAwaited(next) {
101 try { while (true) {
102 var item = next();
103 if (item.done) return endOfIteration();
104 callback(item.value);
105 } }
106 catch (err) { endOfIteration(err); throw err; }
107 }
108 function stepCallback(err, result) {
109 if (err || result.done) return endOfIteration(err);
110 callback(result.value);
111 setImmediate(run);
112 }
113 function stepResolved(result) {
114 if (result.done) return endOfIteration();
115 callback(result.value);
116 setImmediate(run);
117 }
118 function endOfIteration(err?) {
119 if (doneCallback) err ? doneCallback(err) : doneCallback();
120 if (doneResolver) {
121 if (FiberMgr.isExecutingInFiber()) {
122 runCtx.resolver = doneResolver; // FiberManager will handle it
123 } else {
124 err ? doneResolver.reject(err) : doneResolver.resolve(null);
125 }
126 }
127 }
128 }
129
130 /** Release resources associated with this object (i.e., the fiber). */
131 destroy() {
132 this._fiber = null;
133 }
134
135 private _runContext: RunContext;
136 private _semaphore: Semaphore;
137 private _fiber: Fiber;
138 private _returnValue: string;
139 private _acceptsCallback: boolean;
140}