UNPKG

5.88 kBJavaScriptView Raw
1"use strict";
2var __importDefault =
3 (this && this.__importDefault) ||
4 function (mod) {
5 return mod && mod.__esModule ? mod : { default: mod };
6 };
7Object.defineProperty(exports, "__esModule", { value: true });
8exports.Batcher = exports.BATCHER_RETRY_TOKEN = void 0;
9const p_defer_1 = __importDefault(require("p-defer"));
10const util_1 = __importDefault(require("util"));
11const debug = util_1.default.debuglog("promise-batcher");
12function isNull(val) {
13 return val === undefined || val === null;
14}
15exports.BATCHER_RETRY_TOKEN = Symbol("PromiseBatcher.BATCHER_RETRY_TOKEN");
16class Batcher {
17 constructor(options) {
18 this._maxBatchSize = Infinity;
19 this._queuingDelay = 1;
20 this._inputQueue = [];
21 this._outputQueue = [];
22 this._waiting = false;
23 this._activeBatchCount = 0;
24 this._immediateCount = 0;
25 this._batchingFunction = options.batchingFunction;
26 this._delayFunction = options.delayFunction;
27 if (Array.isArray(options.queuingThresholds)) {
28 if (!options.queuingThresholds.length) {
29 throw new Error("options.queuingThresholds must contain at least one number");
30 }
31 for (const n of options.queuingThresholds) {
32 if (n < 1) {
33 throw new Error("options.queuingThresholds must only contain numbers greater than 0");
34 }
35 }
36 this._queuingThresholds = options.queuingThresholds.slice();
37 } else {
38 this._queuingThresholds = [1];
39 }
40 if (!isNull(options.maxBatchSize)) {
41 if (options.maxBatchSize < 1) {
42 throw new Error("options.maxBatchSize must be greater than 0");
43 }
44 this._maxBatchSize = options.maxBatchSize;
45 }
46 if (!isNull(options.queuingDelay)) {
47 if (options.queuingDelay < 0) {
48 throw new Error("options.queuingDelay must be greater than or equal to 0");
49 }
50 this._queuingDelay = options.queuingDelay;
51 }
52 }
53 getResult(input) {
54 const index = this._inputQueue.length;
55 debug("Queuing request at index %O", index);
56 this._inputQueue[index] = input;
57 const deferred = (0, p_defer_1.default)();
58 this._outputQueue[index] = deferred;
59 this._trigger();
60 return deferred.promise;
61 }
62 send() {
63 debug("Send triggered.");
64 this._immediateCount = this._inputQueue.length;
65 this._trigger();
66 }
67 _trigger() {
68 if (this._waiting && !this._waitTimeout) {
69 return;
70 }
71 const thresholdIndex = Math.min(this._activeBatchCount, this._queuingThresholds.length - 1);
72 if (this._inputQueue.length < this._queuingThresholds[thresholdIndex]) {
73 if (this._idlePromise && this.idling) {
74 this._idlePromise.resolve();
75 this._idlePromise = undefined;
76 }
77 return;
78 }
79 if (this._inputQueue.length >= this._maxBatchSize || this._immediateCount) {
80 debug("Running immediately.");
81 if (this._waitTimeout) {
82 clearTimeout(this._waitTimeout);
83 this._waitTimeout = undefined;
84 }
85 this._waiting = true;
86 this._run();
87 return;
88 }
89 if (this._waiting) {
90 return;
91 }
92 this._waiting = true;
93 debug("Running in %Oms (thresholdIndex %O).", this._queuingDelay, thresholdIndex);
94 this._waitTimeout = setTimeout(() => {
95 this._waitTimeout = undefined;
96 this._run();
97 }, this._queuingDelay);
98 }
99 _run() {
100 if (this._delayFunction) {
101 let result;
102 try {
103 result = this._delayFunction();
104 } catch (err) {
105 result = Promise.reject(err);
106 }
107 if (result) {
108 const resultPromise = result instanceof Promise ? result : Promise.resolve(result);
109 resultPromise
110 .then(() => {
111 this._runImmediately();
112 })
113 .catch((err) => {
114 debug("Caught error in delayFunction. Rejecting promises.");
115 this._inputQueue.length = 0;
116 const promises = this._outputQueue.splice(0, this._outputQueue.length);
117 for (const promise of promises) {
118 promise.reject(err);
119 }
120 this._waiting = false;
121 });
122 return;
123 }
124 debug("Bypassing batch delay.");
125 }
126 this._runImmediately();
127 }
128 _runImmediately() {
129 const inputs = this._inputQueue.splice(0, this._maxBatchSize);
130 const outputPromises = this._outputQueue.splice(0, this._maxBatchSize);
131 if (this._immediateCount) {
132 this._immediateCount = Math.max(0, this._immediateCount - inputs.length);
133 }
134 (async () => {
135 try {
136 debug("Running batch of %O", inputs.length);
137 this._waiting = false;
138 this._activeBatchCount++;
139 let batchPromise;
140 try {
141 batchPromise = this._batchingFunction.call(this, inputs);
142 } finally {
143 this._trigger();
144 }
145 const outputs = await batchPromise;
146 if (!Array.isArray(outputs)) {
147 throw new Error("batchingFunction must return an array");
148 }
149 debug("Promise resolved.");
150 if (outputs.length !== outputPromises.length) {
151 throw new Error("batchingFunction output length does not equal the input length");
152 }
153 const retryInputs = [];
154 const retryPromises = [];
155 for (const [index, promise] of outputPromises.entries()) {
156 const output = outputs[index];
157 if (output === exports.BATCHER_RETRY_TOKEN) {
158 retryInputs.push(inputs[index]);
159 retryPromises.push(promise);
160 } else if (output instanceof Error) {
161 promise.reject(output);
162 } else {
163 promise.resolve(output);
164 }
165 }
166 if (retryPromises.length) {
167 debug("Adding %O requests to the queue to retry.", retryPromises.length);
168 if (this._immediateCount) {
169 this._immediateCount += retryPromises.length;
170 }
171 this._inputQueue.unshift(...retryInputs);
172 this._outputQueue.unshift(...retryPromises);
173 }
174 } catch (err) {
175 for (const promise of outputPromises) {
176 promise.reject(err);
177 }
178 } finally {
179 this._activeBatchCount--;
180 this._trigger();
181 }
182 })();
183 }
184 get idling() {
185 return this._activeBatchCount <= 0 && this._inputQueue.length <= 0;
186 }
187 async idlePromise() {
188 if (this.idling) {
189 return;
190 }
191 if (!this._idlePromise) {
192 this._idlePromise = (0, p_defer_1.default)();
193 }
194 return this._idlePromise.promise;
195 }
196}
197exports.Batcher = Batcher;