1 | /*
|
2 | Copyright 2015, 2016 OpenMarket Ltd
|
3 | Copyright 2019 The Matrix.org Foundation C.I.C.
|
4 |
|
5 | Licensed under the Apache License, Version 2.0 (the "License");
|
6 | you may not use this file except in compliance with the License.
|
7 | You may obtain a copy of the License at
|
8 |
|
9 | http://www.apache.org/licenses/LICENSE-2.0
|
10 |
|
11 | Unless required by applicable law or agreed to in writing, software
|
12 | distributed under the License is distributed on an "AS IS" BASIS,
|
13 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | See the License for the specific language governing permissions and
|
15 | limitations under the License.
|
16 | */
|
17 |
|
18 | /**
|
19 | * This is an internal module which manages queuing, scheduling and retrying
|
20 | * of requests.
|
21 | * @module scheduler
|
22 | */
|
23 | import * as utils from "./utils";
|
24 | import {logger} from './logger';
|
25 |
|
26 | const DEBUG = false; // set true to enable console logging.
|
27 |
|
28 | /**
|
29 | * Construct a scheduler for Matrix. Requires
|
30 | * {@link module:scheduler~MatrixScheduler#setProcessFunction} to be provided
|
31 | * with a way of processing events.
|
32 | * @constructor
|
33 | * @param {module:scheduler~retryAlgorithm} retryAlgorithm Optional. The retry
|
34 | * algorithm to apply when determining when to try to send an event again.
|
35 | * Defaults to {@link module:scheduler~MatrixScheduler.RETRY_BACKOFF_RATELIMIT}.
|
36 | * @param {module:scheduler~queueAlgorithm} queueAlgorithm Optional. The queuing
|
37 | * algorithm to apply when determining which events should be sent before the
|
38 | * given event. Defaults to {@link module:scheduler~MatrixScheduler.QUEUE_MESSAGES}.
|
39 | */
|
40 | export function MatrixScheduler(retryAlgorithm, queueAlgorithm) {
|
41 | this.retryAlgorithm = retryAlgorithm || MatrixScheduler.RETRY_BACKOFF_RATELIMIT;
|
42 | this.queueAlgorithm = queueAlgorithm || MatrixScheduler.QUEUE_MESSAGES;
|
43 | this._queues = {
|
44 | // queueName: [{
|
45 | // event: MatrixEvent, // event to send
|
46 | // defer: Deferred, // defer to resolve/reject at the END of the retries
|
47 | // attempts: Number // number of times we've called processFn
|
48 | // }, ...]
|
49 | };
|
50 | this._activeQueues = [];
|
51 | this._procFn = null;
|
52 | }
|
53 |
|
54 | /**
|
55 | * Retrieve a queue based on an event. The event provided does not need to be in
|
56 | * the queue.
|
57 | * @param {MatrixEvent} event An event to get the queue for.
|
58 | * @return {?Array<MatrixEvent>} A shallow copy of events in the queue or null.
|
59 | * Modifying this array will not modify the list itself. Modifying events in
|
60 | * this array <i>will</i> modify the underlying event in the queue.
|
61 | * @see MatrixScheduler.removeEventFromQueue To remove an event from the queue.
|
62 | */
|
63 | MatrixScheduler.prototype.getQueueForEvent = function(event) {
|
64 | const name = this.queueAlgorithm(event);
|
65 | if (!name || !this._queues[name]) {
|
66 | return null;
|
67 | }
|
68 | return utils.map(this._queues[name], function(obj) {
|
69 | return obj.event;
|
70 | });
|
71 | };
|
72 |
|
73 | /**
|
74 | * Remove this event from the queue. The event is equal to another event if they
|
75 | * have the same ID returned from event.getId().
|
76 | * @param {MatrixEvent} event The event to remove.
|
77 | * @return {boolean} True if this event was removed.
|
78 | */
|
79 | MatrixScheduler.prototype.removeEventFromQueue = function(event) {
|
80 | const name = this.queueAlgorithm(event);
|
81 | if (!name || !this._queues[name]) {
|
82 | return false;
|
83 | }
|
84 | let removed = false;
|
85 | utils.removeElement(this._queues[name], function(element) {
|
86 | if (element.event.getId() === event.getId()) {
|
87 | // XXX we should probably reject the promise?
|
88 | // https://github.com/matrix-org/matrix-js-sdk/issues/496
|
89 | removed = true;
|
90 | return true;
|
91 | }
|
92 | });
|
93 | return removed;
|
94 | };
|
95 |
|
96 |
|
97 | /**
|
98 | * Set the process function. Required for events in the queue to be processed.
|
99 | * If set after events have been added to the queue, this will immediately start
|
100 | * processing them.
|
101 | * @param {module:scheduler~processFn} fn The function that can process events
|
102 | * in the queue.
|
103 | */
|
104 | MatrixScheduler.prototype.setProcessFunction = function(fn) {
|
105 | this._procFn = fn;
|
106 | _startProcessingQueues(this);
|
107 | };
|
108 |
|
109 | /**
|
110 | * Queue an event if it is required and start processing queues.
|
111 | * @param {MatrixEvent} event The event that may be queued.
|
112 | * @return {?Promise} A promise if the event was queued, which will be
|
113 | * resolved or rejected in due time, else null.
|
114 | */
|
115 | MatrixScheduler.prototype.queueEvent = function(event) {
|
116 | const queueName = this.queueAlgorithm(event);
|
117 | if (!queueName) {
|
118 | return null;
|
119 | }
|
120 | // add the event to the queue and make a deferred for it.
|
121 | if (!this._queues[queueName]) {
|
122 | this._queues[queueName] = [];
|
123 | }
|
124 | const defer = utils.defer();
|
125 | this._queues[queueName].push({
|
126 | event: event,
|
127 | defer: defer,
|
128 | attempts: 0,
|
129 | });
|
130 | debuglog(
|
131 | "Queue algorithm dumped event %s into queue '%s'",
|
132 | event.getId(), queueName,
|
133 | );
|
134 | _startProcessingQueues(this);
|
135 | return defer.promise;
|
136 | };
|
137 |
|
138 | /**
|
139 | * Retries events up to 4 times using exponential backoff. This produces wait
|
140 | * times of 2, 4, 8, and 16 seconds (30s total) after which we give up. If the
|
141 | * failure was due to a rate limited request, the time specified in the error is
|
142 | * waited before being retried.
|
143 | * @param {MatrixEvent} event
|
144 | * @param {Number} attempts
|
145 | * @param {MatrixError} err
|
146 | * @return {Number}
|
147 | * @see module:scheduler~retryAlgorithm
|
148 | */
|
149 | MatrixScheduler.RETRY_BACKOFF_RATELIMIT = function(event, attempts, err) {
|
150 | if (err.httpStatus === 400 || err.httpStatus === 403 || err.httpStatus === 401) {
|
151 | // client error; no amount of retrying with save you now.
|
152 | return -1;
|
153 | }
|
154 | // we ship with browser-request which returns { cors: rejected } when trying
|
155 | // with no connection, so if we match that, give up since they have no conn.
|
156 | if (err.cors === "rejected") {
|
157 | return -1;
|
158 | }
|
159 |
|
160 | // if event that we are trying to send is too large in any way then retrying won't help
|
161 | if (err.name === "M_TOO_LARGE") {
|
162 | return -1;
|
163 | }
|
164 |
|
165 | if (err.name === "M_LIMIT_EXCEEDED") {
|
166 | const waitTime = err.data.retry_after_ms;
|
167 | if (waitTime) {
|
168 | return waitTime;
|
169 | }
|
170 | }
|
171 | if (attempts > 4) {
|
172 | return -1; // give up
|
173 | }
|
174 | return (1000 * Math.pow(2, attempts));
|
175 | };
|
176 |
|
177 | /**
|
178 | * Queues <code>m.room.message</code> events and lets other events continue
|
179 | * concurrently.
|
180 | * @param {MatrixEvent} event
|
181 | * @return {string}
|
182 | * @see module:scheduler~queueAlgorithm
|
183 | */
|
184 | MatrixScheduler.QUEUE_MESSAGES = function(event) {
|
185 | // enqueue messages or events that associate with another event (redactions and relations)
|
186 | if (event.getType() === "m.room.message" || event.hasAssocation()) {
|
187 | // put these events in the 'message' queue.
|
188 | return "message";
|
189 | }
|
190 | // allow all other events continue concurrently.
|
191 | return null;
|
192 | };
|
193 |
|
194 | function _startProcessingQueues(scheduler) {
|
195 | if (!scheduler._procFn) {
|
196 | return;
|
197 | }
|
198 | // for each inactive queue with events in them
|
199 | utils.forEach(utils.filter(utils.keys(scheduler._queues), function(queueName) {
|
200 | return scheduler._activeQueues.indexOf(queueName) === -1 &&
|
201 | scheduler._queues[queueName].length > 0;
|
202 | }), function(queueName) {
|
203 | // mark the queue as active
|
204 | scheduler._activeQueues.push(queueName);
|
205 | // begin processing the head of the queue
|
206 | debuglog("Spinning up queue: '%s'", queueName);
|
207 | _processQueue(scheduler, queueName);
|
208 | });
|
209 | }
|
210 |
|
211 | function _processQueue(scheduler, queueName) {
|
212 | // get head of queue
|
213 | const obj = _peekNextEvent(scheduler, queueName);
|
214 | if (!obj) {
|
215 | // queue is empty. Mark as inactive and stop recursing.
|
216 | const index = scheduler._activeQueues.indexOf(queueName);
|
217 | if (index >= 0) {
|
218 | scheduler._activeQueues.splice(index, 1);
|
219 | }
|
220 | debuglog("Stopping queue '%s' as it is now empty", queueName);
|
221 | return;
|
222 | }
|
223 | debuglog(
|
224 | "Queue '%s' has %s pending events",
|
225 | queueName, scheduler._queues[queueName].length,
|
226 | );
|
227 | // fire the process function and if it resolves, resolve the deferred. Else
|
228 | // invoke the retry algorithm.
|
229 |
|
230 | // First wait for a resolved promise, so the resolve handlers for
|
231 | // the deferred of the previously sent event can run.
|
232 | // This way enqueued relations/redactions to enqueued events can receive
|
233 | // the remove id of their target before being sent.
|
234 | Promise.resolve().then(() => {
|
235 | return scheduler._procFn(obj.event);
|
236 | }).then(function(res) {
|
237 | // remove this from the queue
|
238 | _removeNextEvent(scheduler, queueName);
|
239 | debuglog("Queue '%s' sent event %s", queueName, obj.event.getId());
|
240 | obj.defer.resolve(res);
|
241 | // keep processing
|
242 | _processQueue(scheduler, queueName);
|
243 | }, function(err) {
|
244 | obj.attempts += 1;
|
245 | // ask the retry algorithm when/if we should try again
|
246 | const waitTimeMs = scheduler.retryAlgorithm(obj.event, obj.attempts, err);
|
247 | debuglog(
|
248 | "retry(%s) err=%s event_id=%s waitTime=%s",
|
249 | obj.attempts, err, obj.event.getId(), waitTimeMs,
|
250 | );
|
251 | if (waitTimeMs === -1) { // give up (you quitter!)
|
252 | debuglog(
|
253 | "Queue '%s' giving up on event %s", queueName, obj.event.getId(),
|
254 | );
|
255 | // remove this from the queue
|
256 | _removeNextEvent(scheduler, queueName);
|
257 | obj.defer.reject(err);
|
258 | // process next event
|
259 | _processQueue(scheduler, queueName);
|
260 | } else {
|
261 | setTimeout(function() {
|
262 | _processQueue(scheduler, queueName);
|
263 | }, waitTimeMs);
|
264 | }
|
265 | });
|
266 | }
|
267 |
|
268 | function _peekNextEvent(scheduler, queueName) {
|
269 | const queue = scheduler._queues[queueName];
|
270 | if (!utils.isArray(queue)) {
|
271 | return null;
|
272 | }
|
273 | return queue[0];
|
274 | }
|
275 |
|
276 | function _removeNextEvent(scheduler, queueName) {
|
277 | const queue = scheduler._queues[queueName];
|
278 | if (!utils.isArray(queue)) {
|
279 | return null;
|
280 | }
|
281 | return queue.shift();
|
282 | }
|
283 |
|
284 | function debuglog() {
|
285 | if (DEBUG) {
|
286 | logger.log(...arguments);
|
287 | }
|
288 | }
|
289 |
|
290 | /**
|
291 | * The retry algorithm to apply when retrying events. To stop retrying, return
|
292 | * <code>-1</code>. If this event was part of a queue, it will be removed from
|
293 | * the queue.
|
294 | * @callback retryAlgorithm
|
295 | * @param {MatrixEvent} event The event being retried.
|
296 | * @param {Number} attempts The number of failed attempts. This will always be
|
297 | * >= 1.
|
298 | * @param {MatrixError} err The most recent error message received when trying
|
299 | * to send this event.
|
300 | * @return {Number} The number of milliseconds to wait before trying again. If
|
301 | * this is 0, the request will be immediately retried. If this is
|
302 | * <code>-1</code>, the event will be marked as
|
303 | * {@link module:models/event.EventStatus.NOT_SENT} and will not be retried.
|
304 | */
|
305 |
|
306 | /**
|
307 | * The queuing algorithm to apply to events. This function must be idempotent as
|
308 | * it may be called multiple times with the same event. All queues created are
|
309 | * serviced in a FIFO manner. To send the event ASAP, return <code>null</code>
|
310 | * which will not put this event in a queue. Events that fail to send that form
|
311 | * part of a queue will be removed from the queue and the next event in the
|
312 | * queue will be sent.
|
313 | * @callback queueAlgorithm
|
314 | * @param {MatrixEvent} event The event to be sent.
|
315 | * @return {string} The name of the queue to put the event into. If a queue with
|
316 | * this name does not exist, it will be created. If this is <code>null</code>,
|
317 | * the event is not put into a queue and will be sent concurrently.
|
318 | */
|
319 |
|
320 | /**
|
321 | * The function to invoke to process (send) events in the queue.
|
322 | * @callback processFn
|
323 | * @param {MatrixEvent} event The event to send.
|
324 | * @return {Promise} Resolved/rejected depending on the outcome of the request.
|
325 | */
|
326 |
|