UNPKG

11.2 kBJavaScriptView Raw
1/*
2Copyright 2015, 2016 OpenMarket Ltd
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16"use strict";
17/**
18 * This is an internal module which manages queuing, scheduling and retrying
19 * of requests.
20 * @module scheduler
21 */
22const utils = require("./utils");
23import Promise from 'bluebird';
24
25const DEBUG = false; // set true to enable console logging.
26
27/**
28 * Construct a scheduler for Matrix. Requires
29 * {@link module:scheduler~MatrixScheduler#setProcessFunction} to be provided
30 * with a way of processing events.
31 * @constructor
32 * @param {module:scheduler~retryAlgorithm} retryAlgorithm Optional. The retry
33 * algorithm to apply when determining when to try to send an event again.
34 * Defaults to {@link module:scheduler~MatrixScheduler.RETRY_BACKOFF_RATELIMIT}.
35 * @param {module:scheduler~queueAlgorithm} queueAlgorithm Optional. The queuing
36 * algorithm to apply when determining which events should be sent before the
37 * given event. Defaults to {@link module:scheduler~MatrixScheduler.QUEUE_MESSAGES}.
38 */
39function MatrixScheduler(retryAlgorithm, queueAlgorithm) {
40 this.retryAlgorithm = retryAlgorithm || MatrixScheduler.RETRY_BACKOFF_RATELIMIT;
41 this.queueAlgorithm = queueAlgorithm || MatrixScheduler.QUEUE_MESSAGES;
42 this._queues = {
43 // queueName: [{
44 // event: MatrixEvent, // event to send
45 // defer: Deferred, // defer to resolve/reject at the END of the retries
46 // attempts: Number // number of times we've called processFn
47 // }, ...]
48 };
49 this._activeQueues = [];
50 this._procFn = null;
51}
52
53/**
54 * Retrieve a queue based on an event. The event provided does not need to be in
55 * the queue.
56 * @param {MatrixEvent} event An event to get the queue for.
57 * @return {?Array<MatrixEvent>} A shallow copy of events in the queue or null.
58 * Modifying this array will not modify the list itself. Modifying events in
59 * this array <i>will</i> modify the underlying event in the queue.
60 * @see MatrixScheduler.removeEventFromQueue To remove an event from the queue.
61 */
62MatrixScheduler.prototype.getQueueForEvent = function(event) {
63 const name = this.queueAlgorithm(event);
64 if (!name || !this._queues[name]) {
65 return null;
66 }
67 return utils.map(this._queues[name], function(obj) {
68 return obj.event;
69 });
70};
71
72/**
73 * Remove this event from the queue. The event is equal to another event if they
74 * have the same ID returned from event.getId().
75 * @param {MatrixEvent} event The event to remove.
76 * @return {boolean} True if this event was removed.
77 */
78MatrixScheduler.prototype.removeEventFromQueue = function(event) {
79 const name = this.queueAlgorithm(event);
80 if (!name || !this._queues[name]) {
81 return false;
82 }
83 let removed = false;
84 utils.removeElement(this._queues[name], function(element) {
85 if (element.event.getId() === event.getId()) {
86 // XXX we should probably reject the promise?
87 // https://github.com/matrix-org/matrix-js-sdk/issues/496
88 removed = true;
89 return true;
90 }
91 });
92 return removed;
93};
94
95
96/**
97 * Set the process function. Required for events in the queue to be processed.
98 * If set after events have been added to the queue, this will immediately start
99 * processing them.
100 * @param {module:scheduler~processFn} fn The function that can process events
101 * in the queue.
102 */
103MatrixScheduler.prototype.setProcessFunction = function(fn) {
104 this._procFn = fn;
105 _startProcessingQueues(this);
106};
107
108/**
109 * Queue an event if it is required and start processing queues.
110 * @param {MatrixEvent} event The event that may be queued.
111 * @return {?Promise} A promise if the event was queued, which will be
112 * resolved or rejected in due time, else null.
113 */
114MatrixScheduler.prototype.queueEvent = function(event) {
115 const queueName = this.queueAlgorithm(event);
116 if (!queueName) {
117 return null;
118 }
119 // add the event to the queue and make a deferred for it.
120 if (!this._queues[queueName]) {
121 this._queues[queueName] = [];
122 }
123 const defer = Promise.defer();
124 this._queues[queueName].push({
125 event: event,
126 defer: defer,
127 attempts: 0,
128 });
129 debuglog(
130 "Queue algorithm dumped event %s into queue '%s'",
131 event.getId(), queueName,
132 );
133 _startProcessingQueues(this);
134 return defer.promise;
135};
136
137/**
138 * Retries events up to 4 times using exponential backoff. This produces wait
139 * times of 2, 4, 8, and 16 seconds (30s total) after which we give up. If the
140 * failure was due to a rate limited request, the time specified in the error is
141 * waited before being retried.
142 * @param {MatrixEvent} event
143 * @param {Number} attempts
144 * @param {MatrixError} err
145 * @return {Number}
146 * @see module:scheduler~retryAlgorithm
147 */
148MatrixScheduler.RETRY_BACKOFF_RATELIMIT = function(event, attempts, err) {
149 if (err.httpStatus === 400 || err.httpStatus === 403 || err.httpStatus === 401) {
150 // client error; no amount of retrying with save you now.
151 return -1;
152 }
153 // we ship with browser-request which returns { cors: rejected } when trying
154 // with no connection, so if we match that, give up since they have no conn.
155 if (err.cors === "rejected") {
156 return -1;
157 }
158
159 if (err.name === "M_LIMIT_EXCEEDED") {
160 const waitTime = err.data.retry_after_ms;
161 if (waitTime) {
162 return waitTime;
163 }
164 }
165 if (attempts > 4) {
166 return -1; // give up
167 }
168 return (1000 * Math.pow(2, attempts));
169};
170
171/**
172 * Queues <code>m.room.message</code> events and lets other events continue
173 * concurrently.
174 * @param {MatrixEvent} event
175 * @return {string}
176 * @see module:scheduler~queueAlgorithm
177 */
178MatrixScheduler.QUEUE_MESSAGES = function(event) {
179 if (event.getType() === "m.room.message") {
180 // put these events in the 'message' queue.
181 return "message";
182 }
183 // allow all other events continue concurrently.
184 return null;
185};
186
187function _startProcessingQueues(scheduler) {
188 if (!scheduler._procFn) {
189 return;
190 }
191 // for each inactive queue with events in them
192 utils.forEach(utils.filter(utils.keys(scheduler._queues), function(queueName) {
193 return scheduler._activeQueues.indexOf(queueName) === -1 &&
194 scheduler._queues[queueName].length > 0;
195 }), function(queueName) {
196 // mark the queue as active
197 scheduler._activeQueues.push(queueName);
198 // begin processing the head of the queue
199 debuglog("Spinning up queue: '%s'", queueName);
200 _processQueue(scheduler, queueName);
201 });
202}
203
204function _processQueue(scheduler, queueName) {
205 // get head of queue
206 const obj = _peekNextEvent(scheduler, queueName);
207 if (!obj) {
208 // queue is empty. Mark as inactive and stop recursing.
209 const index = scheduler._activeQueues.indexOf(queueName);
210 if (index >= 0) {
211 scheduler._activeQueues.splice(index, 1);
212 }
213 debuglog("Stopping queue '%s' as it is now empty", queueName);
214 return;
215 }
216 debuglog(
217 "Queue '%s' has %s pending events",
218 queueName, scheduler._queues[queueName].length,
219 );
220 // fire the process function and if it resolves, resolve the deferred. Else
221 // invoke the retry algorithm.
222 scheduler._procFn(obj.event).done(function(res) {
223 // remove this from the queue
224 _removeNextEvent(scheduler, queueName);
225 debuglog("Queue '%s' sent event %s", queueName, obj.event.getId());
226 obj.defer.resolve(res);
227 // keep processing
228 _processQueue(scheduler, queueName);
229 }, function(err) {
230 obj.attempts += 1;
231 // ask the retry algorithm when/if we should try again
232 const waitTimeMs = scheduler.retryAlgorithm(obj.event, obj.attempts, err);
233 debuglog(
234 "retry(%s) err=%s event_id=%s waitTime=%s",
235 obj.attempts, err, obj.event.getId(), waitTimeMs,
236 );
237 if (waitTimeMs === -1) { // give up (you quitter!)
238 debuglog(
239 "Queue '%s' giving up on event %s", queueName, obj.event.getId(),
240 );
241 // remove this from the queue
242 _removeNextEvent(scheduler, queueName);
243 obj.defer.reject(err);
244 // process next event
245 _processQueue(scheduler, queueName);
246 } else {
247 setTimeout(function() {
248 _processQueue(scheduler, queueName);
249 }, waitTimeMs);
250 }
251 });
252}
253
254function _peekNextEvent(scheduler, queueName) {
255 const queue = scheduler._queues[queueName];
256 if (!utils.isArray(queue)) {
257 return null;
258 }
259 return queue[0];
260}
261
262function _removeNextEvent(scheduler, queueName) {
263 const queue = scheduler._queues[queueName];
264 if (!utils.isArray(queue)) {
265 return null;
266 }
267 return queue.shift();
268}
269
270function debuglog() {
271 if (DEBUG) {
272 console.log(...arguments);
273 }
274}
275
276/**
277 * The retry algorithm to apply when retrying events. To stop retrying, return
278 * <code>-1</code>. If this event was part of a queue, it will be removed from
279 * the queue.
280 * @callback retryAlgorithm
281 * @param {MatrixEvent} event The event being retried.
282 * @param {Number} attempts The number of failed attempts. This will always be
283 * >= 1.
284 * @param {MatrixError} err The most recent error message received when trying
285 * to send this event.
286 * @return {Number} The number of milliseconds to wait before trying again. If
287 * this is 0, the request will be immediately retried. If this is
288 * <code>-1</code>, the event will be marked as
289 * {@link module:models/event.EventStatus.NOT_SENT} and will not be retried.
290 */
291
292/**
293 * The queuing algorithm to apply to events. This function must be idempotent as
294 * it may be called multiple times with the same event. All queues created are
295 * serviced in a FIFO manner. To send the event ASAP, return <code>null</code>
296 * which will not put this event in a queue. Events that fail to send that form
297 * part of a queue will be removed from the queue and the next event in the
298 * queue will be sent.
299 * @callback queueAlgorithm
300 * @param {MatrixEvent} event The event to be sent.
301 * @return {string} The name of the queue to put the event into. If a queue with
302 * this name does not exist, it will be created. If this is <code>null</code>,
303 * the event is not put into a queue and will be sent concurrently.
304 */
305
306 /**
307 * The function to invoke to process (send) events in the queue.
308 * @callback processFn
309 * @param {MatrixEvent} event The event to send.
310 * @return {Promise} Resolved/rejected depending on the outcome of the request.
311 */
312
313/**
314 * The MatrixScheduler class.
315 */
316module.exports = MatrixScheduler;