UNPKG

8.61 kBJavaScriptView Raw
1"use strict";
2var __importDefault = (this && this.__importDefault) || function (mod) {
3 return (mod && mod.__esModule) ? mod : { "default": mod };
4};
5Object.defineProperty(exports, "__esModule", { value: true });
6exports.Batcher = exports.BATCHER_RETRY_TOKEN = void 0;
7const p_defer_1 = __importDefault(require("p-defer"));
8const util_1 = __importDefault(require("util"));
9const debug = util_1.default.debuglog("promise-batcher");
10function isNull(val) {
11 return val === undefined || val === null;
12}
13/**
14 * If this token is returned in the results from a batchingFunction, the corresponding requests will be placed back
15 * into the the head of the queue.
16 */
17exports.BATCHER_RETRY_TOKEN = Symbol("PromiseBatcher.BATCHER_RETRY_TOKEN");
18// tslint:disable-next-line:max-classes-per-file
19class Batcher {
20 constructor(options) {
21 this._maxBatchSize = Infinity;
22 this._queuingDelay = 1;
23 this._inputQueue = [];
24 this._outputQueue = [];
25 this._waiting = false;
26 this._activePromiseCount = 0;
27 this._immediateCount = 0;
28 this._batchingFunction = options.batchingFunction;
29 this._delayFunction = options.delayFunction;
30 if (Array.isArray(options.queuingThresholds)) {
31 if (!options.queuingThresholds.length) {
32 throw new Error("options.queuingThresholds must contain at least one number");
33 }
34 for (const n of options.queuingThresholds) {
35 if (n < 1) {
36 throw new Error("options.queuingThresholds must only contain numbers greater than 0");
37 }
38 }
39 this._queuingThresholds = options.queuingThresholds.slice();
40 }
41 else {
42 this._queuingThresholds = [1];
43 }
44 if (!isNull(options.maxBatchSize)) {
45 if (options.maxBatchSize < 1) {
46 throw new Error("options.maxBatchSize must be greater than 0");
47 }
48 this._maxBatchSize = options.maxBatchSize;
49 }
50 if (!isNull(options.queuingDelay)) {
51 if (options.queuingDelay < 0) {
52 throw new Error("options.queuingDelay must be greater than or equal to 0");
53 }
54 this._queuingDelay = options.queuingDelay;
55 }
56 }
57 /**
58 * Returns a promise which resolves or rejects with the individual result returned from the batching function.
59 */
60 getResult(input) {
61 const index = this._inputQueue.length;
62 debug("Queuing request at index %O", index);
63 this._inputQueue[index] = input;
64 const deferred = p_defer_1.default();
65 this._outputQueue[index] = deferred;
66 this._trigger();
67 return deferred.promise;
68 }
69 /**
70 * Triggers a batch to run, bypassing the queuingDelay while respecting other imposed delays.
71 */
72 send() {
73 debug("Send triggered.");
74 // no inputs?
75 // delayed?
76 this._immediateCount = this._inputQueue.length;
77 this._trigger();
78 }
79 /**
80 * Triggers a batch to run, adhering to the maxBatchSize, queueingThresholds, and queuingDelay
81 */
82 _trigger() {
83 // If the batch is set to run immediately, there is nothing more to be done
84 if (this._waiting && !this._waitTimeout) {
85 return;
86 }
87 // Always obey the queuing threshold
88 const thresholdIndex = Math.min(this._activePromiseCount, this._queuingThresholds.length - 1);
89 if (this._inputQueue.length < this._queuingThresholds[thresholdIndex]) {
90 return;
91 }
92 // If the queue has reached the maximum batch size, start it immediately
93 if (this._inputQueue.length >= this._maxBatchSize || this._immediateCount) {
94 debug("Running immediately.");
95 if (this._waitTimeout) {
96 clearTimeout(this._waitTimeout);
97 this._waitTimeout = undefined;
98 }
99 this._waiting = true;
100 this._run();
101 return;
102 }
103 if (this._waiting) {
104 return;
105 }
106 // Run the batch, but with a delay
107 this._waiting = true;
108 debug("Running in %Oms (thresholdIndex %O).", this._queuingDelay, thresholdIndex);
109 // Tests showed that nextTick would commonly run before promises could resolve.
110 // SetImmediate would run later than setTimeout as well.
111 this._waitTimeout = setTimeout(() => {
112 this._waitTimeout = undefined;
113 this._run();
114 }, this._queuingDelay);
115 }
116 /**
117 * Runs the batch, while respecting delays imposed by the supplied delayFunction
118 */
119 _run() {
120 if (this._delayFunction) {
121 let result;
122 try {
123 result = this._delayFunction();
124 }
125 catch (err) {
126 result = Promise.reject(err);
127 }
128 if (result) {
129 const resultPromise = result instanceof Promise ? result : Promise.resolve(result);
130 resultPromise
131 .then(() => {
132 this._runImmediately();
133 })
134 .catch((err) => {
135 debug("Caught error in delayFunction. Rejecting promises.");
136 this._inputQueue.length = 0;
137 const promises = this._outputQueue.splice(0, this._outputQueue.length);
138 for (const promise of promises) {
139 promise.reject(err);
140 }
141 this._waiting = false;
142 });
143 return;
144 }
145 debug("Bypassing batch delay.");
146 }
147 this._runImmediately();
148 }
149 /**
150 * Runs the batch immediately without further delay
151 */
152 _runImmediately() {
153 const inputs = this._inputQueue.splice(0, this._maxBatchSize);
154 const outputPromises = this._outputQueue.splice(0, this._maxBatchSize);
155 if (this._immediateCount) {
156 this._immediateCount = Math.max(0, this._immediateCount - inputs.length);
157 }
158 // eslint-disable-next-line @typescript-eslint/no-floating-promises
159 (async () => {
160 try {
161 debug("Running batch of %O", inputs.length);
162 this._waiting = false;
163 this._activePromiseCount++;
164 let batchPromise;
165 try {
166 batchPromise = this._batchingFunction.call(this, inputs);
167 }
168 finally {
169 // The batch has started. Trigger another batch if appropriate.
170 this._trigger();
171 }
172 const outputs = await batchPromise;
173 if (!Array.isArray(outputs)) {
174 throw new Error("batchingFunction must return an array");
175 }
176 debug("Promise resolved.");
177 if (outputs.length !== outputPromises.length) {
178 throw new Error("batchingFunction output length does not equal the input length");
179 }
180 const retryInputs = [];
181 const retryPromises = [];
182 for (const [index, promise] of outputPromises.entries()) {
183 const output = outputs[index];
184 if (output === exports.BATCHER_RETRY_TOKEN) {
185 retryInputs.push(inputs[index]);
186 retryPromises.push(promise);
187 }
188 else if (output instanceof Error) {
189 promise.reject(output);
190 }
191 else {
192 promise.resolve(output);
193 }
194 }
195 if (retryPromises.length) {
196 debug("Adding %O requests to the queue to retry.", retryPromises.length);
197 if (this._immediateCount) {
198 this._immediateCount += retryPromises.length;
199 }
200 this._inputQueue.unshift(...retryInputs);
201 this._outputQueue.unshift(...retryPromises);
202 }
203 }
204 catch (err) {
205 for (const promise of outputPromises) {
206 promise.reject(err);
207 }
208 }
209 finally {
210 this._activePromiseCount--;
211 // Since we may be operating at a lower queuing threshold now, we should try run again
212 this._trigger();
213 }
214 })();
215 }
216}
217exports.Batcher = Batcher;