UNPKG

52.8 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.AsyncOrderedQueue = exports.AsyncIterableQueue = exports.AsyncQueue = exports.throttle = exports.cacheFn = exports.memoizeFn = exports.RateLimiter = exports.Pump = exports.Funnel = exports.retryOp = exports.DeferredWorker = exports.Deferred = void 0;
4const assert = require("assert");
5const crypto_1 = require("crypto");
6const error_1 = require("./error");
7const serialize_1 = require("./serialize");
8const shared_1 = require("./shared");
9class Deferred {
10 constructor() {
11 this.promise = new Promise((resolve, reject) => {
12 this.resolve = resolve;
13 this.reject = reject;
14 });
15 }
16}
17exports.Deferred = Deferred;
18class DeferredWorker extends Deferred {
19 constructor(worker, cancel) {
20 super();
21 this.worker = worker;
22 this.cancel = cancel;
23 }
24 async execute() {
25 const cancelMessage = this.cancel?.();
26 if (cancelMessage) {
27 this.reject(new error_1.FaastError({ name: error_1.FaastErrorNames.ECANCEL }, cancelMessage));
28 }
29 else {
30 try {
31 const rv = await this.worker();
32 this.resolve(rv);
33 }
34 catch (err) {
35 this.reject(err);
36 }
37 }
38 }
39}
40exports.DeferredWorker = DeferredWorker;
41function popFirst(set) {
42 let firstElem;
43 for (const elem of set) {
44 firstElem = elem;
45 break;
46 }
47 if (firstElem) {
48 set.delete(firstElem);
49 }
50 return firstElem;
51}
52async function retryOp(retryN, fn) {
53 const retryTest = typeof retryN === "function" ? retryN : (_, i) => i < retryN;
54 for (let i = 0; true; i++) {
55 try {
56 return await fn(i);
57 }
58 catch (err) {
59 if (!retryTest(err, i)) {
60 throw err;
61 }
62 await (0, shared_1.sleep)(Math.min(30 * 1000, 1000 * (1 + Math.random()) * 2 ** i) + Math.random());
63 }
64 }
65}
66exports.retryOp = retryOp;
67class Funnel {
68 constructor(concurrency = 0, shouldRetry) {
69 this.concurrency = concurrency;
70 this.shouldRetry = shouldRetry;
71 this.pendingQueue = new Set();
72 this.executingQueue = new Set();
73 this.processed = 0;
74 this.errors = 0;
75 }
76 push(worker, shouldRetry, cancel) {
77 const retryTest = shouldRetry || this.shouldRetry || 0;
78 const retryWorker = () => retryOp(retryTest, worker);
79 const future = new DeferredWorker(retryWorker, cancel);
80 this.pendingQueue.add(future);
81 setImmediate(() => this.doWork());
82 return future.promise;
83 }
84 clear() {
85 this.pendingQueue.clear();
86 this.executingQueue.clear();
87 }
88 promises() {
89 return [...this.executingQueue, ...this.pendingQueue].map(p => p.promise);
90 }
91 all() {
92 return Promise.all(this.promises().map(p => p.catch(_ => { })));
93 }
94 size() {
95 return this.pendingQueue.size + this.executingQueue.size;
96 }
97 setMaxConcurrency(maxConcurrency) {
98 this.concurrency = maxConcurrency;
99 }
100 getConcurrency() {
101 return this.executingQueue.size;
102 }
103 doWork() {
104 const { pendingQueue } = this;
105 while (pendingQueue.size > 0 &&
106 (!this.concurrency || this.executingQueue.size < this.concurrency)) {
107 const worker = popFirst(pendingQueue);
108 this.executingQueue.add(worker);
109 worker.promise
110 .then(_ => this.processed++)
111 .catch(_ => this.errors++)
112 .then(_ => {
113 this.executingQueue.delete(worker);
114 this.doWork();
115 });
116 worker.execute();
117 }
118 }
119}
120exports.Funnel = Funnel;
121/**
122 * @internal
123 */
124class Pump extends Funnel {
125 constructor(options, worker) {
126 super(options.concurrency);
127 this.options = options;
128 this.worker = worker;
129 this.stopped = false;
130 options.verbose = options.verbose ?? true;
131 }
132 start() {
133 const restart = () => {
134 if (this.stopped) {
135 return;
136 }
137 while (this.executingQueue.size + this.pendingQueue.size < this.concurrency) {
138 this.push(async () => {
139 try {
140 return await this.worker();
141 }
142 catch (err) {
143 this.options.verbose && console.error(err);
144 return;
145 }
146 finally {
147 setImmediate(restart);
148 }
149 });
150 }
151 };
152 this.stopped = false;
153 restart();
154 }
155 stop() {
156 this.stopped = true;
157 }
158 drain() {
159 this.stop();
160 return this.all();
161 }
162 setMaxConcurrency(concurrency) {
163 super.setMaxConcurrency(concurrency);
164 if (!this.stopped) {
165 this.start();
166 }
167 }
168}
169exports.Pump = Pump;
170class RateLimiter {
171 constructor(targetRequestsPerSecond, burst = 1) {
172 this.targetRequestsPerSecond = targetRequestsPerSecond;
173 this.burst = burst;
174 this.lastTick = 0;
175 this.bucket = 0;
176 this.queue = new Set();
177 assert(targetRequestsPerSecond > 0);
178 assert(this.burst >= 1);
179 }
180 push(worker, cancel) {
181 this.updateBucket();
182 if (this.queue.size === 0 && this.bucket <= this.burst - 1) {
183 this.bucket++;
184 return worker();
185 }
186 const future = new DeferredWorker(worker, cancel);
187 this.queue.add(future);
188 if (this.queue.size === 1) {
189 this.drainQueue();
190 }
191 return future.promise;
192 }
193 updateBucket() {
194 const now = Date.now();
195 const secondsElapsed = (now - this.lastTick) / 1000;
196 this.bucket -= secondsElapsed * this.targetRequestsPerSecond;
197 this.bucket = Math.max(this.bucket, 0);
198 this.lastTick = now;
199 }
200 async drainQueue() {
201 const requestAmountToDrain = 1 - (this.burst - this.bucket);
202 const secondsToDrain = requestAmountToDrain / this.targetRequestsPerSecond;
203 if (secondsToDrain > 0) {
204 await (0, shared_1.sleep)(Math.ceil(secondsToDrain * 1000));
205 }
206 this.updateBucket();
207 while (this.bucket <= this.burst - 1) {
208 const next = popFirst(this.queue);
209 if (!next) {
210 break;
211 }
212 this.bucket++;
213 next.execute();
214 }
215 if (this.queue.size > 0) {
216 this.drainQueue();
217 }
218 }
219 clear() {
220 this.queue.clear();
221 }
222}
223exports.RateLimiter = RateLimiter;
224function memoizeFn(fn, cache = new Map()) {
225 return (...args) => {
226 const key = JSON.stringify(args);
227 const prev = cache.get(key);
228 if (prev) {
229 return prev;
230 }
231 const value = fn(...args);
232 cache.set(key, value);
233 return value;
234 };
235}
236exports.memoizeFn = memoizeFn;
237function cacheFn(cache, fn) {
238 return async (...args) => {
239 const key = (0, serialize_1.serialize)(args, true);
240 const hasher = (0, crypto_1.createHash)("sha256");
241 hasher.update(key);
242 const cacheKey = hasher.digest("hex");
243 const prev = await cache.get(cacheKey);
244 if (prev) {
245 const str = prev.toString();
246 if (str === "undefined") {
247 return undefined;
248 }
249 return (0, serialize_1.deserialize)(str);
250 }
251 const value = await fn(...args);
252 await cache.set(cacheKey, (0, serialize_1.serialize)(value, true));
253 return value;
254 };
255}
256exports.cacheFn = cacheFn;
257/**
258 * A decorator for rate limiting, concurrency limiting, retry, memoization, and
259 * on-disk caching. See {@link Limits}.
260 * @remarks
261 * When programming against cloud services, databases, and other resources, it
262 * is often necessary to control the rate of request issuance to avoid
263 * overwhelming the service provider. In many cases the provider has built-in
264 * safeguards against abuse, which automatically fail requests if they are
265 * coming in too fast. Some systems don't have safeguards and precipitously
266 * degrade their service level or fail outright when faced with excessive load.
267 *
268 * With faast.js it becomes very easy to (accidentally) generate requests from
269 * thousands of cloud functions. The `throttle` function can help manage request
270 * flow without resorting to setting up a separate service. This is in keeping
271 * with faast.js' zero-ops philosophy.
272 *
273 * Usage is simple:
274 *
275 * ```typescript
276 * async function operation() { ... }
277 * const throttledOperation = throttle({ concurrency: 10, rate: 5 }, operation);
278 * for(let i = 0; i < 100; i++) {
279 * // at most 10 concurrent executions at a rate of 5 invocations per second.
280 * throttledOperation();
281 * }
282 * ```
283 *
284 * Note that each invocation to `throttle` creates a separate function with a
285 * separate limits. Therefore it is likely that you want to use `throttle` in a
286 * global context, not within a dynamic context:
287 *
288 * ```typescript
289 * async function operation() { ... }
290 * for(let i = 0; i < 100; i++) {
291 * // WRONG - each iteration creates a separate throttled function that's only called once.
292 * const throttledOperation = throttle({ concurrency: 10, rate: 5 }, operation);
293 * throttledOperation();
294 * }
295 * ```
296 *
297 * A better way to use throttle avoids creating a named `operation` function
298 * altogether, ensuring it cannot be accidentally called without throttling:
299 *
300 * ```typescript
301 * const operation = throttle({ concurrency: 10, rate: 5 }, async () => {
302 * ...
303 * });
304 * ```
305 *
306 * Throttle supports functions with arguments automatically infers the correct
307 * type for the returned function:
308 *
309 * ```typescript
310 * // `operation` inferred to have type (str: string) => Promise<string>
311 * const operation = throttle({ concurrency: 10, rate: 5 }, async (str: string) => {
312 * return string;
313 * });
314 * ```
315 *
316 * In addition to limiting concurrency and invocation rate, `throttle` also
317 * supports retrying failed invocations, memoizing calls, and on-disk caching.
318 * See {@link Limits} for details.
319 *
320 * @param limits - see {@link Limits}.
321 * @param fn - The function to throttle. It can take any arguments, but must
322 * return a Promise (which includes `async` functions).
323 * @returns Returns a throttled function with the same signature as the argument
324 * `fn`.
325 * @public
326 */
327function throttle(limits, fn) {
328 const { concurrency, retry, rate, burst, memoize, cache, cancel } = limits;
329 const funnel = new Funnel(concurrency, retry);
330 const cancellationQueue = [() => funnel.clear()];
331 let conditionedFunc;
332 if (rate) {
333 const rateLimiter = new RateLimiter(rate, burst);
334 cancellationQueue.push(() => rateLimiter.clear());
335 conditionedFunc = (...args) => funnel.push(() => rateLimiter.push(() => fn(...args)));
336 }
337 else {
338 conditionedFunc = (...args) => funnel.push(() => fn(...args));
339 }
340 if (cache) {
341 conditionedFunc = cacheFn(cache, conditionedFunc);
342 }
343 if (memoize) {
344 const mcache = new Map();
345 cancellationQueue.push(() => mcache.clear());
346 conditionedFunc = memoizeFn(conditionedFunc, mcache);
347 }
348 cancel?.then(() => cancellationQueue.forEach(cleanupFn => cleanupFn()));
349 return conditionedFunc;
350}
351exports.throttle = throttle;
352function iteratorResult(value) {
353 return Promise.resolve(value).then(v => ({ done: false, value: v }));
354}
355const done = Promise.resolve({ done: true, value: undefined });
356class AsyncQueue {
357 constructor() {
358 this.deferred = [];
359 this.enqueued = [];
360 }
361 enqueue(value) {
362 if (this.deferred.length > 0) {
363 const d = this.deferred.shift();
364 d.resolve(value);
365 }
366 else {
367 this.enqueued.push(Promise.resolve(value));
368 }
369 }
370 next() {
371 if (this.enqueued.length > 0) {
372 return this.enqueued.shift();
373 }
374 const d = new Deferred();
375 this.deferred.push(d);
376 return d.promise;
377 }
378 clear() {
379 this.deferred = [];
380 this.enqueued = [];
381 }
382}
383exports.AsyncQueue = AsyncQueue;
384class AsyncIterableQueue extends AsyncQueue {
385 push(value) {
386 super.enqueue(iteratorResult(value));
387 }
388 done() {
389 super.enqueue(done);
390 }
391 [Symbol.asyncIterator]() {
392 return this;
393 }
394}
395exports.AsyncIterableQueue = AsyncIterableQueue;
396class AsyncOrderedQueue {
397 constructor() {
398 this.queue = new AsyncQueue();
399 this.arrived = new Map();
400 this.current = 0;
401 }
402 push(value, sequence) {
403 this.enqueue(Promise.resolve(value), sequence);
404 }
405 pushImmediate(value) {
406 this.queue.enqueue(value);
407 }
408 enqueue(value, sequence) {
409 if (sequence < this.current) {
410 return;
411 }
412 if (!this.arrived.has(sequence)) {
413 this.arrived.set(sequence, value);
414 }
415 while (this.arrived.has(this.current)) {
416 this.queue.enqueue(this.arrived.get(this.current));
417 this.arrived.delete(this.current);
418 this.current++;
419 }
420 }
421 next() {
422 return this.queue.next();
423 }
424 clear() {
425 this.arrived.clear();
426 this.queue.clear();
427 this.current = 0;
428 }
429}
430exports.AsyncOrderedQueue = AsyncOrderedQueue;
431//# sourceMappingURL=data:application/json;base64,
\No newline at end of file