UNPKG

9.94 kBPlain TextView Raw
1import Fiber = require('../fibers');
2import Promise = require('bluebird');
3import _ = require('lodash');
4import types = require('../types');
5import Config = require('./config');
6import FiberMgr = require('./fiberManager');
7import RunContext = require('./runContext');
8import Semaphore = require('./semaphore');
9import AsyncIterator = require('./asyncIterator');
10import defer = require('./defer');
11import await = require('../await/index');
12export = makeAsyncFunc;
13
14
15/** Function for creating a specific variant of the async function. */
16function makeAsyncFunc(config: Config): types.AsyncFunction {
17
18 // Validate the specified configuration
19 config.validate();
20
21 // Create an async function tailored to the given options.
22 var result: types.AsyncFunction = <any> function async(bodyFunc: Function) {
23
24 // Create a semaphore for limiting top-level concurrency, if specified in options.
25 var semaphore = config.maxConcurrency ? new Semaphore(config.maxConcurrency) : Semaphore.unlimited;
26
27 // Choose and run the appropriate function factory based on whether the result should be iterable.
28 var makeFunc = config.isIterable ? makeAsyncIterator : makeAsyncNonIterator;
29 var result: Function = makeFunc(bodyFunc, config, semaphore);
30
31 // Ensure the suspendable function's arity matches that of the function it wraps.
32 var arity = bodyFunc.length;
33 if (config.acceptsCallback) ++arity;
34 result = makeFuncWithArity(result, arity);
35 return result;
36 };
37
38 // Add the mod() function, and return the result.
39 result.mod = makeModFunc(config);
40 return result;
41}
42
43
44/** Function for creating iterable suspendable functions. */
45function makeAsyncIterator(bodyFunc: Function, config: Config, semaphore: Semaphore) {
46
47 // Return a function that returns an iterator.
48 return function iterable(): any {
49
50 // Capture the initial arguments used to start the iterator, as an array.
51 var startupArgs = new Array(arguments.length + 1); // Reserve 0th arg for the yield function.
52 for (var i = 0, len = arguments.length; i < len; ++i) startupArgs[i + 1] = arguments[i];
53
54 // Create a yield() function tailored for this iterator.
55 var yield_ = expr => {
56
57 // Ensure this function is executing inside a fiber.
58 if (!Fiber.current) {
59 throw new Error(
60 'await functions, yield functions, and value-returning suspendable ' +
61 'functions may only be called from inside a suspendable function. '
62 );
63 }
64
65 // Notify waiters of the next result, then suspend the iterator.
66 if (runContext.callback) runContext.callback(null, { value: expr, done: false });
67 if (runContext.resolver) runContext.resolver.resolve({ value: expr, done: false });
68 Fiber.yield();
69 }
70
71 // Insert the yield function as the first argument when starting the iterator.
72 startupArgs[0] = yield_;
73
74 // Create the iterator.
75 var runContext = new RunContext(bodyFunc, this, startupArgs);
76 var iterator = new AsyncIterator(runContext, semaphore, config.returnValue, config.acceptsCallback);
77
78 // Wrap the given bodyFunc to properly complete the iteration.
79 runContext.wrapped = function () {
80 var len = arguments.length, args=new Array(len);
81 for (var i = 0; i < len; ++i) args[i] = arguments[i];
82 bodyFunc.apply(this, args);
83 iterator.destroy();
84 return { done: true };
85 };
86
87 // Return the iterator.
88 return iterator;
89 };
90}
91
92
93/** Function for creating non-iterable suspendable functions. */
94function makeAsyncNonIterator(bodyFunc: Function, config: Config, semaphore: Semaphore) {
95
96 // Return a function that executes fn in a fiber and returns a promise of fn's result.
97 return function nonIterable(): any {
98
99 // Get all the arguments passed in, as an array.
100 var argsAsArray = new Array(arguments.length);
101 for (var i = 0; i < argsAsArray.length; ++i) argsAsArray[i] = arguments[i];
102
103 // Remove concurrency restrictions for nested calls, to avoid race conditions.
104 if (FiberMgr.isExecutingInFiber()) this._semaphore = Semaphore.unlimited;
105
106 // Configure the run context.
107 var runContext = new RunContext(bodyFunc, this, argsAsArray, () => semaphore.leave());
108 if (config.returnValue !== Config.NONE) {
109 var resolver = defer();
110 runContext.resolver = resolver;
111 }
112 if (config.acceptsCallback && argsAsArray.length && _.isFunction(argsAsArray[argsAsArray.length - 1])) {
113 var callback = argsAsArray.pop();
114 runContext.callback = callback;
115 }
116
117 // Execute bodyFunc to completion in a coroutine. For thunks, this is a lazy operation.
118 if (config.returnValue === Config.THUNK) {
119 var thunk: types.Thunk<any> = (done?) => {
120 if (done) resolver.promise.then(val => done(null, val), err => done(err));
121 semaphore.enter(() => FiberMgr.create().run(runContext));
122 };
123 } else {
124 semaphore.enter(() => FiberMgr.create().run(runContext));
125 }
126
127 // Return the appropriate value.
128 switch (config.returnValue) {
129 case Config.PROMISE: return resolver.promise;
130 case Config.THUNK: return thunk;
131 case Config.RESULT: return await (resolver.promise);
132 case Config.NONE: return;
133 }
134 };
135}
136
137
138/** Returns a function that directly proxies the given function, whilst reporting the given arity. */
139function makeFuncWithArity(fn: Function, arity: number) {
140
141 // Need to handle each arity individually, but the body never changes.
142 switch (arity) {
143 case 0: return function f0() {var i,l=arguments.length,r=new Array(l);for(i=0;i<l;++i)r[i]=arguments[i];return fn.apply(this,r)}
144 case 1: return function f1(a) {var i,l=arguments.length,r=new Array(l);for(i=0;i<l;++i)r[i]=arguments[i];return fn.apply(this,r)}
145 case 2: return function f2(a,b) {var i,l=arguments.length,r=new Array(l);for(i=0;i<l;++i)r[i]=arguments[i];return fn.apply(this,r)}
146 case 3: return function f3(a,b,c) {var i,l=arguments.length,r=new Array(l);for(i=0;i<l;++i)r[i]=arguments[i];return fn.apply(this,r)}
147 case 4: return function f4(a,b,c,d) {var i,l=arguments.length,r=new Array(l);for(i=0;i<l;++i)r[i]=arguments[i];return fn.apply(this,r)}
148 case 5: return function f5(a,b,c,d,e) {var i,l=arguments.length,r=new Array(l);for(i=0;i<l;++i)r[i]=arguments[i];return fn.apply(this,r)}
149 case 6: return function f6(a,b,c,d,e,f) {var i,l=arguments.length,r=new Array(l);for(i=0;i<l;++i)r[i]=arguments[i];return fn.apply(this,r)}
150 case 7: return function f7(a,b,c,d,e,f,g) {var i,l=arguments.length,r=new Array(l);for(i=0;i<l;++i)r[i]=arguments[i];return fn.apply(this,r)}
151 case 8: return function f8(a,b,c,d,e,f,g,h) {var i,l=arguments.length,r=new Array(l);for(i=0;i<l;++i)r[i]=arguments[i];return fn.apply(this,r)}
152 case 9: return function f9(a,b,c,d,e,f,g,h,_i) {var i,l=arguments.length,r=new Array(l);for(i=0;i<l;++i)r[i]=arguments[i];return fn.apply(this,r)}
153 default: return fn; // Bail out if arity is crazy high.
154 }
155}
156
157
158function makeModFunc(config: Config) {
159 return (options: any, maxConcurrency?: number) => {
160 if (_.isString(options)) {
161
162 // This way of specifying options is useful for TypeScript users, as they get better type information.
163 // JavaScript users can use this too, but providing an options hash is more useful in that case.
164 var rt, cb, it;
165 switch(options) {
166 case 'returns: promise, callback: false, iterable: false': rt = 'promise'; cb = false; it = false; break;
167 case 'returns: thunk, callback: false, iterable: false': rt = 'thunk'; cb = false; it = false; break;
168 case 'returns: result, callback: false, iterable: false': rt = 'result'; cb = false; it = false; break;
169 case 'returns: promise, callback: true, iterable: false': rt = 'promise'; cb = true; it = false; break;
170 case 'returns: thunk, callback: true, iterable: false': rt = 'thunk'; cb = true; it = false; break;
171 case 'returns: result, callback: true, iterable: false': rt = 'result'; cb = true; it = false; break;
172 case 'returns: none, callback: true, iterable: false': rt = 'none'; cb = true; it = false; break;
173 case 'returns: promise, callback: false, iterable: true': rt = 'promise'; cb = false; it = true; break;
174 case 'returns: thunk, callback: false, iterable: true': rt = 'thunk'; cb = false; it = true; break;
175 case 'returns: result, callback: false, iterable: true': rt = 'result'; cb = false; it = true; break;
176 case 'returns: promise, callback: true, iterable: true': rt = 'promise'; cb = true; it = true; break;
177 case 'returns: thunk, callback: true, iterable: true': rt = 'thunk'; cb = true; it = true; break;
178 case 'returns: result, callback: true, iterable: true': rt = 'result'; cb = true; it = true; break;
179 case 'returns: none, callback: true, iterable: true': rt = 'none'; cb = true; it = true; break;
180 }
181 options = { returnValue: rt, acceptsCallback: cb, isIterable: it, maxConcurrency: maxConcurrency };
182 }
183 var newConfig = new Config(_.defaults({}, options, config));
184 return makeAsyncFunc(newConfig);
185 };
186}