UNPKG

8.33 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const Debug = require("debug");
4const defer = require("p-defer");
5const debug = Debug("promise-batcher");
6function isNull(val) {
7 return val === undefined || val === null;
8}
9class BatcherToken {
10}
11exports.BatcherToken = BatcherToken;
12/**
13 * If this token is returned in the results from a batchingFunction, the corresponding requests will be placed back
14 * into the the head of the queue.
15 */
16exports.BATCHER_RETRY_TOKEN = new BatcherToken();
17// tslint:disable-next-line:max-classes-per-file
18class Batcher {
19 constructor(options) {
20 this._maxBatchSize = Infinity;
21 this._queuingDelay = 1;
22 this._inputQueue = [];
23 this._outputQueue = [];
24 this._waiting = false;
25 this._activePromiseCount = 0;
26 this._immediateCount = 0;
27 this._batchingFunction = options.batchingFunction;
28 this._delayFunction = options.delayFunction;
29 if (Array.isArray(options.queuingThresholds)) {
30 if (!options.queuingThresholds.length) {
31 throw new Error("options.batchThresholds must contain at least one number");
32 }
33 options.queuingThresholds.forEach((n) => {
34 if (n < 1) {
35 throw new Error("options.batchThresholds must only contain numbers greater than 0");
36 }
37 });
38 this._queuingThresholds = options.queuingThresholds.slice();
39 }
40 else {
41 this._queuingThresholds = [1];
42 }
43 if (!isNull(options.maxBatchSize)) {
44 if (options.maxBatchSize < 1) {
45 throw new Error("options.batchSize must be greater than 0");
46 }
47 this._maxBatchSize = options.maxBatchSize;
48 }
49 if (!isNull(options.queuingDelay)) {
50 if (options.queuingDelay < 0) {
51 throw new Error("options.queuingDelay must be greater than or equal to 0");
52 }
53 this._queuingDelay = options.queuingDelay;
54 }
55 }
56 /**
57 * Returns a promise which resolves or rejects with the individual result returned from the batching function.
58 */
59 getResult(input) {
60 const index = this._inputQueue.length;
61 debug("Queuing request at index %O", index);
62 this._inputQueue[index] = input;
63 const deferred = defer();
64 this._outputQueue[index] = deferred;
65 this._trigger();
66 return deferred.promise;
67 }
68 /**
69 * Triggers a batch to run, bypassing the queuingDelay while respecting other imposed delays.
70 */
71 send() {
72 debug("Send triggered.");
73 // no inputs?
74 // delayed?
75 this._immediateCount = this._inputQueue.length;
76 this._trigger();
77 }
78 /**
79 * Triggers a batch to run, adhering to the maxBatchSize, queueingThresholds, and queuingDelay
80 */
81 _trigger() {
82 // If the batch is set to run immediately, there is nothing more to be done
83 if (this._waiting && !this._waitTimeout) {
84 return;
85 }
86 // Always obey the queuing threshold
87 const thresholdIndex = Math.min(this._activePromiseCount, this._queuingThresholds.length - 1);
88 if (this._inputQueue.length < this._queuingThresholds[thresholdIndex]) {
89 return;
90 }
91 // If the queue has reached the maximum batch size, start it immediately
92 if (this._inputQueue.length >= this._maxBatchSize || this._immediateCount) {
93 debug("Running immediately.");
94 if (this._waitTimeout) {
95 clearTimeout(this._waitTimeout);
96 this._waitTimeout = undefined;
97 }
98 this._waiting = true;
99 this._run();
100 return;
101 }
102 if (this._waiting) {
103 return;
104 }
105 // Run the batch, but with a delay
106 this._waiting = true;
107 debug("Running in %Oms (thresholdIndex %O).", this._queuingDelay, thresholdIndex);
108 // Tests showed that nextTick would commonly run before promises could resolve.
109 // SetImmediate would run later than setTimeout as well.
110 this._waitTimeout = setTimeout(() => {
111 this._waitTimeout = undefined;
112 this._run();
113 }, this._queuingDelay);
114 }
115 /**
116 * Runs the batch, while respecting delays imposed by the supplied delayFunction
117 */
118 _run() {
119 if (this._delayFunction) {
120 let result;
121 try {
122 result = this._delayFunction();
123 }
124 catch (err) {
125 result = Promise.reject(err);
126 }
127 if (!isNull(result)) {
128 const resultPromise = result instanceof Promise ? result : Promise.resolve(result);
129 resultPromise.then(() => {
130 this._runImmediately();
131 }).catch((err) => {
132 debug("Caught error in delayFunction. Rejecting promises.");
133 this._inputQueue.length = 0;
134 const promises = this._outputQueue.splice(0, this._outputQueue.length);
135 promises.forEach((promise) => {
136 promise.reject(err);
137 });
138 this._waiting = false;
139 });
140 return;
141 }
142 debug("Bypassing batch delay.");
143 }
144 this._runImmediately();
145 }
146 /**
147 * Runs the batch immediately without further delay
148 */
149 _runImmediately() {
150 const inputs = this._inputQueue.splice(0, this._maxBatchSize);
151 const outputPromises = this._outputQueue.splice(0, this._maxBatchSize);
152 if (this._immediateCount) {
153 this._immediateCount = Math.max(0, this._immediateCount - inputs.length);
154 }
155 debug("Running batch of %O", inputs.length);
156 let batchPromise;
157 try {
158 batchPromise = this._batchingFunction.call(this, inputs);
159 if (!(batchPromise instanceof Promise)) {
160 batchPromise = Promise.resolve(batchPromise);
161 }
162 }
163 catch (err) {
164 batchPromise = Promise.reject(err);
165 }
166 this._waiting = false;
167 this._activePromiseCount++;
168 batchPromise.then((outputs) => {
169 if (!Array.isArray(outputs)) {
170 throw new Error("Invalid type returned from batching function.");
171 }
172 debug("Promise resolved.");
173 if (outputs.length !== outputPromises.length) {
174 throw new Error("Batching function output length does not equal the input length.");
175 }
176 const retryInputs = [];
177 const retryPromises = [];
178 outputPromises.forEach((promise, index) => {
179 const output = outputs[index];
180 if (output === exports.BATCHER_RETRY_TOKEN) {
181 retryInputs.push(inputs[index]);
182 retryPromises.push(promise);
183 }
184 else if (output instanceof Error) {
185 promise.reject(output);
186 }
187 else {
188 promise.resolve(output);
189 }
190 });
191 if (retryPromises.length) {
192 debug("Adding %O requests to the queue to retry.", retryPromises.length);
193 if (this._immediateCount) {
194 this._immediateCount += retryPromises.length;
195 }
196 this._inputQueue.unshift(...retryInputs);
197 this._outputQueue.unshift(...retryPromises);
198 }
199 }).catch((err) => {
200 outputPromises.forEach((promise) => {
201 promise.reject(err);
202 });
203 }).then(() => {
204 this._activePromiseCount--;
205 // Since we may be operating at a lower queuing threshold now, we should try run again
206 this._trigger();
207 });
208 // The batch has started. Trigger another batch if appropriate.
209 this._trigger();
210 }
211}
212exports.Batcher = Batcher;