UNPKG

11.8 kBJavaScriptView Raw
1/*
2Copyright 2015, 2016 OpenMarket Ltd
3Copyright 2019 The Matrix.org Foundation C.I.C.
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16*/
17
18/**
19 * This is an internal module which manages queuing, scheduling and retrying
20 * of requests.
21 * @module scheduler
22 */
23import * as utils from "./utils";
24import {logger} from './logger';
25
26const DEBUG = false; // set true to enable console logging.
27
28/**
29 * Construct a scheduler for Matrix. Requires
30 * {@link module:scheduler~MatrixScheduler#setProcessFunction} to be provided
31 * with a way of processing events.
32 * @constructor
33 * @param {module:scheduler~retryAlgorithm} retryAlgorithm Optional. The retry
34 * algorithm to apply when determining when to try to send an event again.
35 * Defaults to {@link module:scheduler~MatrixScheduler.RETRY_BACKOFF_RATELIMIT}.
36 * @param {module:scheduler~queueAlgorithm} queueAlgorithm Optional. The queuing
37 * algorithm to apply when determining which events should be sent before the
38 * given event. Defaults to {@link module:scheduler~MatrixScheduler.QUEUE_MESSAGES}.
39 */
40export function MatrixScheduler(retryAlgorithm, queueAlgorithm) {
41 this.retryAlgorithm = retryAlgorithm || MatrixScheduler.RETRY_BACKOFF_RATELIMIT;
42 this.queueAlgorithm = queueAlgorithm || MatrixScheduler.QUEUE_MESSAGES;
43 this._queues = {
44 // queueName: [{
45 // event: MatrixEvent, // event to send
46 // defer: Deferred, // defer to resolve/reject at the END of the retries
47 // attempts: Number // number of times we've called processFn
48 // }, ...]
49 };
50 this._activeQueues = [];
51 this._procFn = null;
52}
53
54/**
55 * Retrieve a queue based on an event. The event provided does not need to be in
56 * the queue.
57 * @param {MatrixEvent} event An event to get the queue for.
58 * @return {?Array<MatrixEvent>} A shallow copy of events in the queue or null.
59 * Modifying this array will not modify the list itself. Modifying events in
60 * this array <i>will</i> modify the underlying event in the queue.
61 * @see MatrixScheduler.removeEventFromQueue To remove an event from the queue.
62 */
63MatrixScheduler.prototype.getQueueForEvent = function(event) {
64 const name = this.queueAlgorithm(event);
65 if (!name || !this._queues[name]) {
66 return null;
67 }
68 return utils.map(this._queues[name], function(obj) {
69 return obj.event;
70 });
71};
72
73/**
74 * Remove this event from the queue. The event is equal to another event if they
75 * have the same ID returned from event.getId().
76 * @param {MatrixEvent} event The event to remove.
77 * @return {boolean} True if this event was removed.
78 */
79MatrixScheduler.prototype.removeEventFromQueue = function(event) {
80 const name = this.queueAlgorithm(event);
81 if (!name || !this._queues[name]) {
82 return false;
83 }
84 let removed = false;
85 utils.removeElement(this._queues[name], function(element) {
86 if (element.event.getId() === event.getId()) {
87 // XXX we should probably reject the promise?
88 // https://github.com/matrix-org/matrix-js-sdk/issues/496
89 removed = true;
90 return true;
91 }
92 });
93 return removed;
94};
95
96
97/**
98 * Set the process function. Required for events in the queue to be processed.
99 * If set after events have been added to the queue, this will immediately start
100 * processing them.
101 * @param {module:scheduler~processFn} fn The function that can process events
102 * in the queue.
103 */
104MatrixScheduler.prototype.setProcessFunction = function(fn) {
105 this._procFn = fn;
106 _startProcessingQueues(this);
107};
108
109/**
110 * Queue an event if it is required and start processing queues.
111 * @param {MatrixEvent} event The event that may be queued.
112 * @return {?Promise} A promise if the event was queued, which will be
113 * resolved or rejected in due time, else null.
114 */
115MatrixScheduler.prototype.queueEvent = function(event) {
116 const queueName = this.queueAlgorithm(event);
117 if (!queueName) {
118 return null;
119 }
120 // add the event to the queue and make a deferred for it.
121 if (!this._queues[queueName]) {
122 this._queues[queueName] = [];
123 }
124 const defer = utils.defer();
125 this._queues[queueName].push({
126 event: event,
127 defer: defer,
128 attempts: 0,
129 });
130 debuglog(
131 "Queue algorithm dumped event %s into queue '%s'",
132 event.getId(), queueName,
133 );
134 _startProcessingQueues(this);
135 return defer.promise;
136};
137
138/**
139 * Retries events up to 4 times using exponential backoff. This produces wait
140 * times of 2, 4, 8, and 16 seconds (30s total) after which we give up. If the
141 * failure was due to a rate limited request, the time specified in the error is
142 * waited before being retried.
143 * @param {MatrixEvent} event
144 * @param {Number} attempts
145 * @param {MatrixError} err
146 * @return {Number}
147 * @see module:scheduler~retryAlgorithm
148 */
149MatrixScheduler.RETRY_BACKOFF_RATELIMIT = function(event, attempts, err) {
150 if (err.httpStatus === 400 || err.httpStatus === 403 || err.httpStatus === 401) {
151 // client error; no amount of retrying with save you now.
152 return -1;
153 }
154 // we ship with browser-request which returns { cors: rejected } when trying
155 // with no connection, so if we match that, give up since they have no conn.
156 if (err.cors === "rejected") {
157 return -1;
158 }
159
160 // if event that we are trying to send is too large in any way then retrying won't help
161 if (err.name === "M_TOO_LARGE") {
162 return -1;
163 }
164
165 if (err.name === "M_LIMIT_EXCEEDED") {
166 const waitTime = err.data.retry_after_ms;
167 if (waitTime) {
168 return waitTime;
169 }
170 }
171 if (attempts > 4) {
172 return -1; // give up
173 }
174 return (1000 * Math.pow(2, attempts));
175};
176
177/**
178 * Queues <code>m.room.message</code> events and lets other events continue
179 * concurrently.
180 * @param {MatrixEvent} event
181 * @return {string}
182 * @see module:scheduler~queueAlgorithm
183 */
184MatrixScheduler.QUEUE_MESSAGES = function(event) {
185 // enqueue messages or events that associate with another event (redactions and relations)
186 if (event.getType() === "m.room.message" || event.hasAssocation()) {
187 // put these events in the 'message' queue.
188 return "message";
189 }
190 // allow all other events continue concurrently.
191 return null;
192};
193
194function _startProcessingQueues(scheduler) {
195 if (!scheduler._procFn) {
196 return;
197 }
198 // for each inactive queue with events in them
199 utils.forEach(utils.filter(utils.keys(scheduler._queues), function(queueName) {
200 return scheduler._activeQueues.indexOf(queueName) === -1 &&
201 scheduler._queues[queueName].length > 0;
202 }), function(queueName) {
203 // mark the queue as active
204 scheduler._activeQueues.push(queueName);
205 // begin processing the head of the queue
206 debuglog("Spinning up queue: '%s'", queueName);
207 _processQueue(scheduler, queueName);
208 });
209}
210
211function _processQueue(scheduler, queueName) {
212 // get head of queue
213 const obj = _peekNextEvent(scheduler, queueName);
214 if (!obj) {
215 // queue is empty. Mark as inactive and stop recursing.
216 const index = scheduler._activeQueues.indexOf(queueName);
217 if (index >= 0) {
218 scheduler._activeQueues.splice(index, 1);
219 }
220 debuglog("Stopping queue '%s' as it is now empty", queueName);
221 return;
222 }
223 debuglog(
224 "Queue '%s' has %s pending events",
225 queueName, scheduler._queues[queueName].length,
226 );
227 // fire the process function and if it resolves, resolve the deferred. Else
228 // invoke the retry algorithm.
229
230 // First wait for a resolved promise, so the resolve handlers for
231 // the deferred of the previously sent event can run.
232 // This way enqueued relations/redactions to enqueued events can receive
233 // the remove id of their target before being sent.
234 Promise.resolve().then(() => {
235 return scheduler._procFn(obj.event);
236 }).then(function(res) {
237 // remove this from the queue
238 _removeNextEvent(scheduler, queueName);
239 debuglog("Queue '%s' sent event %s", queueName, obj.event.getId());
240 obj.defer.resolve(res);
241 // keep processing
242 _processQueue(scheduler, queueName);
243 }, function(err) {
244 obj.attempts += 1;
245 // ask the retry algorithm when/if we should try again
246 const waitTimeMs = scheduler.retryAlgorithm(obj.event, obj.attempts, err);
247 debuglog(
248 "retry(%s) err=%s event_id=%s waitTime=%s",
249 obj.attempts, err, obj.event.getId(), waitTimeMs,
250 );
251 if (waitTimeMs === -1) { // give up (you quitter!)
252 debuglog(
253 "Queue '%s' giving up on event %s", queueName, obj.event.getId(),
254 );
255 // remove this from the queue
256 _removeNextEvent(scheduler, queueName);
257 obj.defer.reject(err);
258 // process next event
259 _processQueue(scheduler, queueName);
260 } else {
261 setTimeout(function() {
262 _processQueue(scheduler, queueName);
263 }, waitTimeMs);
264 }
265 });
266}
267
268function _peekNextEvent(scheduler, queueName) {
269 const queue = scheduler._queues[queueName];
270 if (!utils.isArray(queue)) {
271 return null;
272 }
273 return queue[0];
274}
275
276function _removeNextEvent(scheduler, queueName) {
277 const queue = scheduler._queues[queueName];
278 if (!utils.isArray(queue)) {
279 return null;
280 }
281 return queue.shift();
282}
283
284function debuglog() {
285 if (DEBUG) {
286 logger.log(...arguments);
287 }
288}
289
290/**
291 * The retry algorithm to apply when retrying events. To stop retrying, return
292 * <code>-1</code>. If this event was part of a queue, it will be removed from
293 * the queue.
294 * @callback retryAlgorithm
295 * @param {MatrixEvent} event The event being retried.
296 * @param {Number} attempts The number of failed attempts. This will always be
297 * >= 1.
298 * @param {MatrixError} err The most recent error message received when trying
299 * to send this event.
300 * @return {Number} The number of milliseconds to wait before trying again. If
301 * this is 0, the request will be immediately retried. If this is
302 * <code>-1</code>, the event will be marked as
303 * {@link module:models/event.EventStatus.NOT_SENT} and will not be retried.
304 */
305
306/**
307 * The queuing algorithm to apply to events. This function must be idempotent as
308 * it may be called multiple times with the same event. All queues created are
309 * serviced in a FIFO manner. To send the event ASAP, return <code>null</code>
310 * which will not put this event in a queue. Events that fail to send that form
311 * part of a queue will be removed from the queue and the next event in the
312 * queue will be sent.
313 * @callback queueAlgorithm
314 * @param {MatrixEvent} event The event to be sent.
315 * @return {string} The name of the queue to put the event into. If a queue with
316 * this name does not exist, it will be created. If this is <code>null</code>,
317 * the event is not put into a queue and will be sent concurrently.
318 */
319
320 /**
321 * The function to invoke to process (send) events in the queue.
322 * @callback processFn
323 * @param {MatrixEvent} event The event to send.
324 * @return {Promise} Resolved/rejected depending on the outcome of the request.
325 */
326