UNPKG

7.01 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const Debug = require("debug");
4const debug = Debug("promise-batcher");
5function defer() {
6 const o = {};
7 o.promise = new Promise((resolve, reject) => {
8 o.resolve = resolve;
9 o.reject = reject;
10 });
11 return o;
12}
13function isNull(val) {
14 return val === undefined || val === null;
15}
16class Batcher {
17 constructor(options) {
18 this._maxBatchSize = Infinity;
19 this._queuingDelay = 1;
20 this._inputQueue = [];
21 this._outputQueue = [];
22 this._waiting = false;
23 this._activePromiseCount = 0;
24 this._batchingFunction = options.batchingFunction;
25 this._delayFunction = options.delayFunction;
26 if (Array.isArray(options.queuingThresholds)) {
27 if (!options.queuingThresholds.length) {
28 throw new Error("options.batchThresholds must contain at least one number");
29 }
30 options.queuingThresholds.forEach((n) => {
31 if (n < 1) {
32 throw new Error("options.batchThresholds must only contain numbers greater than 0");
33 }
34 });
35 this._queuingThresholds = options.queuingThresholds.slice();
36 }
37 else {
38 this._queuingThresholds = [1];
39 }
40 if (!isNull(options.maxBatchSize)) {
41 if (options.maxBatchSize < 1) {
42 throw new Error("options.batchSize must be greater than 0");
43 }
44 this._maxBatchSize = options.maxBatchSize;
45 }
46 if (!isNull(options.queuingDelay)) {
47 if (options.queuingDelay < 0) {
48 throw new Error("options.queuingDelay must be greater than or equal to 0");
49 }
50 this._queuingDelay = options.queuingDelay;
51 }
52 }
53 /**
54 * Returns a promise which resolves or rejects with the individual result returned from the batching function.
55 */
56 getResult(input) {
57 const index = this._inputQueue.length;
58 debug("Queuing request at index ", index);
59 this._inputQueue[index] = input;
60 const deferred = defer();
61 this._outputQueue[index] = deferred;
62 this._trigger();
63 return deferred.promise;
64 }
65 /**
66 * Triggers a batch to run, adhering to the maxBatchSize, queueingThresholds, and queuingDelay
67 */
68 _trigger() {
69 // If the batch is set to run immediately, there is nothing more to be done
70 if (this._waiting && !this._waitTimeout) {
71 return;
72 }
73 // Always obey the queuing threshold
74 const thresholdIndex = Math.min(this._activePromiseCount, this._queuingThresholds.length - 1);
75 if (this._inputQueue.length < this._queuingThresholds[thresholdIndex]) {
76 return;
77 }
78 // If the queue has reached the maximum batch size, start it immediately
79 if (this._inputQueue.length >= this._maxBatchSize) {
80 debug("Queue reached maxBatchSize, launching immediately.");
81 if (this._waitTimeout) {
82 clearTimeout(this._waitTimeout);
83 this._waitTimeout = undefined;
84 }
85 this._waiting = true;
86 this._run();
87 return;
88 }
89 if (this._waiting) {
90 return;
91 }
92 // Run the batch, but with a delay
93 this._waiting = true;
94 debug("Running in %Oms (thresholdIndex %O).", this._queuingDelay, thresholdIndex);
95 // Tests showed that nextTick would commonly run before promises could resolve.
96 // SetImmediate would run later than setTimeout as well.
97 this._waitTimeout = setTimeout(() => {
98 this._waitTimeout = undefined;
99 this._run();
100 }, this._queuingDelay);
101 }
102 /**
103 * Runs the batch, while respecting delays imposed by the supplied delayFunction
104 */
105 _run() {
106 if (this._delayFunction) {
107 let result;
108 try {
109 result = this._delayFunction();
110 }
111 catch (err) {
112 result = Promise.reject(err);
113 }
114 if (!isNull(result)) {
115 const resultPromise = result instanceof Promise ? result : Promise.resolve(result);
116 resultPromise.then(() => {
117 this._runImmediately();
118 }).catch((err) => {
119 debug("Caught error in delayFunction. Rejecting promises.");
120 this._inputQueue.length = 0;
121 const promises = this._outputQueue.splice(0, this._outputQueue.length);
122 promises.forEach((promise) => {
123 promise.reject(err);
124 });
125 this._waiting = false;
126 });
127 return;
128 }
129 debug("Bypassing batch delay.");
130 }
131 this._runImmediately();
132 }
133 /**
134 * Runs the batch immediately without further delay
135 */
136 _runImmediately() {
137 const inputs = this._inputQueue.splice(0, this._maxBatchSize);
138 const outputPromises = this._outputQueue.splice(0, this._maxBatchSize);
139 debug("Running batch of %O", inputs.length);
140 let batchPromise;
141 try {
142 batchPromise = this._batchingFunction.call(this, inputs);
143 if (!(batchPromise instanceof Promise)) {
144 batchPromise = Promise.resolve(batchPromise);
145 }
146 }
147 catch (err) {
148 batchPromise = Promise.reject(err);
149 }
150 this._waiting = false;
151 this._activePromiseCount++;
152 batchPromise.then((outputs) => {
153 if (!Array.isArray(outputs)) {
154 throw new Error("Invalid type returned from batching function.");
155 }
156 debug("Promise resolved.");
157 if (outputs.length !== outputPromises.length) {
158 throw new Error("Batching function output length does not equal the input length.");
159 }
160 outputPromises.forEach((promise, index) => {
161 const output = outputs[index];
162 if (output instanceof Error) {
163 promise.reject(output);
164 }
165 else {
166 promise.resolve(output);
167 }
168 });
169 }).catch((err) => {
170 outputPromises.forEach((promise) => {
171 promise.reject(err);
172 });
173 }).then(() => {
174 this._activePromiseCount--;
175 // Since we may be operating at a lower queuing threshold now, we should try run again
176 this._trigger();
177 });
178 // The batch has started. Trigger another batch if appropriate.
179 this._trigger();
180 }
181}
182exports.Batcher = Batcher;