1 | /*
|
2 | Copyright 2015, 2016 OpenMarket Ltd
|
3 |
|
4 | Licensed under the Apache License, Version 2.0 (the "License");
|
5 | you may not use this file except in compliance with the License.
|
6 | You may obtain a copy of the License at
|
7 |
|
8 | http://www.apache.org/licenses/LICENSE-2.0
|
9 |
|
10 | Unless required by applicable law or agreed to in writing, software
|
11 | distributed under the License is distributed on an "AS IS" BASIS,
|
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
13 | See the License for the specific language governing permissions and
|
14 | limitations under the License.
|
15 | */
|
16 | ;
|
17 | /**
|
18 | * This is an internal module which manages queuing, scheduling and retrying
|
19 | * of requests.
|
20 | * @module scheduler
|
21 | */
|
22 |
|
23 | var _bluebird = require("bluebird");
|
24 |
|
25 | var _bluebird2 = _interopRequireDefault(_bluebird);
|
26 |
|
27 | function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
|
28 |
|
29 | var utils = require("./utils");
|
30 |
|
31 |
|
32 | var DEBUG = false; // set true to enable console logging.
|
33 |
|
34 | /**
|
35 | * Construct a scheduler for Matrix. Requires
|
36 | * {@link module:scheduler~MatrixScheduler#setProcessFunction} to be provided
|
37 | * with a way of processing events.
|
38 | * @constructor
|
39 | * @param {module:scheduler~retryAlgorithm} retryAlgorithm Optional. The retry
|
40 | * algorithm to apply when determining when to try to send an event again.
|
41 | * Defaults to {@link module:scheduler~MatrixScheduler.RETRY_BACKOFF_RATELIMIT}.
|
42 | * @param {module:scheduler~queueAlgorithm} queueAlgorithm Optional. The queuing
|
43 | * algorithm to apply when determining which events should be sent before the
|
44 | * given event. Defaults to {@link module:scheduler~MatrixScheduler.QUEUE_MESSAGES}.
|
45 | */
|
46 | function MatrixScheduler(retryAlgorithm, queueAlgorithm) {
|
47 | this.retryAlgorithm = retryAlgorithm || MatrixScheduler.RETRY_BACKOFF_RATELIMIT;
|
48 | this.queueAlgorithm = queueAlgorithm || MatrixScheduler.QUEUE_MESSAGES;
|
49 | this._queues = {
|
50 | // queueName: [{
|
51 | // event: MatrixEvent, // event to send
|
52 | // defer: Deferred, // defer to resolve/reject at the END of the retries
|
53 | // attempts: Number // number of times we've called processFn
|
54 | // }, ...]
|
55 | };
|
56 | this._activeQueues = [];
|
57 | this._procFn = null;
|
58 | }
|
59 |
|
60 | /**
|
61 | * Retrieve a queue based on an event. The event provided does not need to be in
|
62 | * the queue.
|
63 | * @param {MatrixEvent} event An event to get the queue for.
|
64 | * @return {?Array<MatrixEvent>} A shallow copy of events in the queue or null.
|
65 | * Modifying this array will not modify the list itself. Modifying events in
|
66 | * this array <i>will</i> modify the underlying event in the queue.
|
67 | * @see MatrixScheduler.removeEventFromQueue To remove an event from the queue.
|
68 | */
|
69 | MatrixScheduler.prototype.getQueueForEvent = function (event) {
|
70 | var name = this.queueAlgorithm(event);
|
71 | if (!name || !this._queues[name]) {
|
72 | return null;
|
73 | }
|
74 | return utils.map(this._queues[name], function (obj) {
|
75 | return obj.event;
|
76 | });
|
77 | };
|
78 |
|
79 | /**
|
80 | * Remove this event from the queue. The event is equal to another event if they
|
81 | * have the same ID returned from event.getId().
|
82 | * @param {MatrixEvent} event The event to remove.
|
83 | * @return {boolean} True if this event was removed.
|
84 | */
|
85 | MatrixScheduler.prototype.removeEventFromQueue = function (event) {
|
86 | var name = this.queueAlgorithm(event);
|
87 | if (!name || !this._queues[name]) {
|
88 | return false;
|
89 | }
|
90 | var removed = false;
|
91 | utils.removeElement(this._queues[name], function (element) {
|
92 | if (element.event.getId() === event.getId()) {
|
93 | // XXX we should probably reject the promise?
|
94 | // https://github.com/matrix-org/matrix-js-sdk/issues/496
|
95 | removed = true;
|
96 | return true;
|
97 | }
|
98 | });
|
99 | return removed;
|
100 | };
|
101 |
|
102 | /**
|
103 | * Set the process function. Required for events in the queue to be processed.
|
104 | * If set after events have been added to the queue, this will immediately start
|
105 | * processing them.
|
106 | * @param {module:scheduler~processFn} fn The function that can process events
|
107 | * in the queue.
|
108 | */
|
109 | MatrixScheduler.prototype.setProcessFunction = function (fn) {
|
110 | this._procFn = fn;
|
111 | _startProcessingQueues(this);
|
112 | };
|
113 |
|
114 | /**
|
115 | * Queue an event if it is required and start processing queues.
|
116 | * @param {MatrixEvent} event The event that may be queued.
|
117 | * @return {?Promise} A promise if the event was queued, which will be
|
118 | * resolved or rejected in due time, else null.
|
119 | */
|
120 | MatrixScheduler.prototype.queueEvent = function (event) {
|
121 | var queueName = this.queueAlgorithm(event);
|
122 | if (!queueName) {
|
123 | return null;
|
124 | }
|
125 | // add the event to the queue and make a deferred for it.
|
126 | if (!this._queues[queueName]) {
|
127 | this._queues[queueName] = [];
|
128 | }
|
129 | var defer = _bluebird2.default.defer();
|
130 | this._queues[queueName].push({
|
131 | event: event,
|
132 | defer: defer,
|
133 | attempts: 0
|
134 | });
|
135 | debuglog("Queue algorithm dumped event %s into queue '%s'", event.getId(), queueName);
|
136 | _startProcessingQueues(this);
|
137 | return defer.promise;
|
138 | };
|
139 |
|
140 | /**
|
141 | * Retries events up to 4 times using exponential backoff. This produces wait
|
142 | * times of 2, 4, 8, and 16 seconds (30s total) after which we give up. If the
|
143 | * failure was due to a rate limited request, the time specified in the error is
|
144 | * waited before being retried.
|
145 | * @param {MatrixEvent} event
|
146 | * @param {Number} attempts
|
147 | * @param {MatrixError} err
|
148 | * @return {Number}
|
149 | * @see module:scheduler~retryAlgorithm
|
150 | */
|
151 | MatrixScheduler.RETRY_BACKOFF_RATELIMIT = function (event, attempts, err) {
|
152 | if (err.httpStatus === 400 || err.httpStatus === 403 || err.httpStatus === 401) {
|
153 | // client error; no amount of retrying with save you now.
|
154 | return -1;
|
155 | }
|
156 | // we ship with browser-request which returns { cors: rejected } when trying
|
157 | // with no connection, so if we match that, give up since they have no conn.
|
158 | if (err.cors === "rejected") {
|
159 | return -1;
|
160 | }
|
161 |
|
162 | if (err.name === "M_LIMIT_EXCEEDED") {
|
163 | var waitTime = err.data.retry_after_ms;
|
164 | if (waitTime) {
|
165 | return waitTime;
|
166 | }
|
167 | }
|
168 | if (attempts > 4) {
|
169 | return -1; // give up
|
170 | }
|
171 | return 1000 * Math.pow(2, attempts);
|
172 | };
|
173 |
|
174 | /**
|
175 | * Queues <code>m.room.message</code> events and lets other events continue
|
176 | * concurrently.
|
177 | * @param {MatrixEvent} event
|
178 | * @return {string}
|
179 | * @see module:scheduler~queueAlgorithm
|
180 | */
|
181 | MatrixScheduler.QUEUE_MESSAGES = function (event) {
|
182 | if (event.getType() === "m.room.message") {
|
183 | // put these events in the 'message' queue.
|
184 | return "message";
|
185 | }
|
186 | // allow all other events continue concurrently.
|
187 | return null;
|
188 | };
|
189 |
|
190 | function _startProcessingQueues(scheduler) {
|
191 | if (!scheduler._procFn) {
|
192 | return;
|
193 | }
|
194 | // for each inactive queue with events in them
|
195 | utils.forEach(utils.filter(utils.keys(scheduler._queues), function (queueName) {
|
196 | return scheduler._activeQueues.indexOf(queueName) === -1 && scheduler._queues[queueName].length > 0;
|
197 | }), function (queueName) {
|
198 | // mark the queue as active
|
199 | scheduler._activeQueues.push(queueName);
|
200 | // begin processing the head of the queue
|
201 | debuglog("Spinning up queue: '%s'", queueName);
|
202 | _processQueue(scheduler, queueName);
|
203 | });
|
204 | }
|
205 |
|
206 | function _processQueue(scheduler, queueName) {
|
207 | // get head of queue
|
208 | var obj = _peekNextEvent(scheduler, queueName);
|
209 | if (!obj) {
|
210 | // queue is empty. Mark as inactive and stop recursing.
|
211 | var index = scheduler._activeQueues.indexOf(queueName);
|
212 | if (index >= 0) {
|
213 | scheduler._activeQueues.splice(index, 1);
|
214 | }
|
215 | debuglog("Stopping queue '%s' as it is now empty", queueName);
|
216 | return;
|
217 | }
|
218 | debuglog("Queue '%s' has %s pending events", queueName, scheduler._queues[queueName].length);
|
219 | // fire the process function and if it resolves, resolve the deferred. Else
|
220 | // invoke the retry algorithm.
|
221 | scheduler._procFn(obj.event).done(function (res) {
|
222 | // remove this from the queue
|
223 | _removeNextEvent(scheduler, queueName);
|
224 | debuglog("Queue '%s' sent event %s", queueName, obj.event.getId());
|
225 | obj.defer.resolve(res);
|
226 | // keep processing
|
227 | _processQueue(scheduler, queueName);
|
228 | }, function (err) {
|
229 | obj.attempts += 1;
|
230 | // ask the retry algorithm when/if we should try again
|
231 | var waitTimeMs = scheduler.retryAlgorithm(obj.event, obj.attempts, err);
|
232 | debuglog("retry(%s) err=%s event_id=%s waitTime=%s", obj.attempts, err, obj.event.getId(), waitTimeMs);
|
233 | if (waitTimeMs === -1) {
|
234 | // give up (you quitter!)
|
235 | debuglog("Queue '%s' giving up on event %s", queueName, obj.event.getId());
|
236 | // remove this from the queue
|
237 | _removeNextEvent(scheduler, queueName);
|
238 | obj.defer.reject(err);
|
239 | // process next event
|
240 | _processQueue(scheduler, queueName);
|
241 | } else {
|
242 | setTimeout(function () {
|
243 | _processQueue(scheduler, queueName);
|
244 | }, waitTimeMs);
|
245 | }
|
246 | });
|
247 | }
|
248 |
|
249 | function _peekNextEvent(scheduler, queueName) {
|
250 | var queue = scheduler._queues[queueName];
|
251 | if (!utils.isArray(queue)) {
|
252 | return null;
|
253 | }
|
254 | return queue[0];
|
255 | }
|
256 |
|
257 | function _removeNextEvent(scheduler, queueName) {
|
258 | var queue = scheduler._queues[queueName];
|
259 | if (!utils.isArray(queue)) {
|
260 | return null;
|
261 | }
|
262 | return queue.shift();
|
263 | }
|
264 |
|
265 | function debuglog() {
|
266 | if (DEBUG) {
|
267 | var _console;
|
268 |
|
269 | (_console = console).log.apply(_console, arguments);
|
270 | }
|
271 | }
|
272 |
|
273 | /**
|
274 | * The retry algorithm to apply when retrying events. To stop retrying, return
|
275 | * <code>-1</code>. If this event was part of a queue, it will be removed from
|
276 | * the queue.
|
277 | * @callback retryAlgorithm
|
278 | * @param {MatrixEvent} event The event being retried.
|
279 | * @param {Number} attempts The number of failed attempts. This will always be
|
280 | * >= 1.
|
281 | * @param {MatrixError} err The most recent error message received when trying
|
282 | * to send this event.
|
283 | * @return {Number} The number of milliseconds to wait before trying again. If
|
284 | * this is 0, the request will be immediately retried. If this is
|
285 | * <code>-1</code>, the event will be marked as
|
286 | * {@link module:models/event.EventStatus.NOT_SENT} and will not be retried.
|
287 | */
|
288 |
|
289 | /**
|
290 | * The queuing algorithm to apply to events. This function must be idempotent as
|
291 | * it may be called multiple times with the same event. All queues created are
|
292 | * serviced in a FIFO manner. To send the event ASAP, return <code>null</code>
|
293 | * which will not put this event in a queue. Events that fail to send that form
|
294 | * part of a queue will be removed from the queue and the next event in the
|
295 | * queue will be sent.
|
296 | * @callback queueAlgorithm
|
297 | * @param {MatrixEvent} event The event to be sent.
|
298 | * @return {string} The name of the queue to put the event into. If a queue with
|
299 | * this name does not exist, it will be created. If this is <code>null</code>,
|
300 | * the event is not put into a queue and will be sent concurrently.
|
301 | */
|
302 |
|
303 | /**
|
304 | * The function to invoke to process (send) events in the queue.
|
305 | * @callback processFn
|
306 | * @param {MatrixEvent} event The event to send.
|
307 | * @return {Promise} Resolved/rejected depending on the outcome of the request.
|
308 | */
|
309 |
|
310 | /**
|
311 | * The MatrixScheduler class.
|
312 | */
|
313 | module.exports = MatrixScheduler;
|
314 | //# sourceMappingURL=scheduler.js.map |
\ | No newline at end of file |