1 | /*
|
2 | Copyright 2015, 2016 OpenMarket Ltd
|
3 |
|
4 | Licensed under the Apache License, Version 2.0 (the "License");
|
5 | you may not use this file except in compliance with the License.
|
6 | You may obtain a copy of the License at
|
7 |
|
8 | http://www.apache.org/licenses/LICENSE-2.0
|
9 |
|
10 | Unless required by applicable law or agreed to in writing, software
|
11 | distributed under the License is distributed on an "AS IS" BASIS,
|
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
13 | See the License for the specific language governing permissions and
|
14 | limitations under the License.
|
15 | */
|
16 | ;
|
17 | /**
|
18 | * This is an internal module which manages queuing, scheduling and retrying
|
19 | * of requests.
|
20 | * @module scheduler
|
21 | */
|
22 | const utils = require("./utils");
|
23 | import Promise from 'bluebird';
|
24 |
|
25 | const DEBUG = false; // set true to enable console logging.
|
26 |
|
27 | /**
|
28 | * Construct a scheduler for Matrix. Requires
|
29 | * {@link module:scheduler~MatrixScheduler#setProcessFunction} to be provided
|
30 | * with a way of processing events.
|
31 | * @constructor
|
32 | * @param {module:scheduler~retryAlgorithm} retryAlgorithm Optional. The retry
|
33 | * algorithm to apply when determining when to try to send an event again.
|
34 | * Defaults to {@link module:scheduler~MatrixScheduler.RETRY_BACKOFF_RATELIMIT}.
|
35 | * @param {module:scheduler~queueAlgorithm} queueAlgorithm Optional. The queuing
|
36 | * algorithm to apply when determining which events should be sent before the
|
37 | * given event. Defaults to {@link module:scheduler~MatrixScheduler.QUEUE_MESSAGES}.
|
38 | */
|
39 | function MatrixScheduler(retryAlgorithm, queueAlgorithm) {
|
40 | this.retryAlgorithm = retryAlgorithm || MatrixScheduler.RETRY_BACKOFF_RATELIMIT;
|
41 | this.queueAlgorithm = queueAlgorithm || MatrixScheduler.QUEUE_MESSAGES;
|
42 | this._queues = {
|
43 | // queueName: [{
|
44 | // event: MatrixEvent, // event to send
|
45 | // defer: Deferred, // defer to resolve/reject at the END of the retries
|
46 | // attempts: Number // number of times we've called processFn
|
47 | // }, ...]
|
48 | };
|
49 | this._activeQueues = [];
|
50 | this._procFn = null;
|
51 | }
|
52 |
|
53 | /**
|
54 | * Retrieve a queue based on an event. The event provided does not need to be in
|
55 | * the queue.
|
56 | * @param {MatrixEvent} event An event to get the queue for.
|
57 | * @return {?Array<MatrixEvent>} A shallow copy of events in the queue or null.
|
58 | * Modifying this array will not modify the list itself. Modifying events in
|
59 | * this array <i>will</i> modify the underlying event in the queue.
|
60 | * @see MatrixScheduler.removeEventFromQueue To remove an event from the queue.
|
61 | */
|
62 | MatrixScheduler.prototype.getQueueForEvent = function(event) {
|
63 | const name = this.queueAlgorithm(event);
|
64 | if (!name || !this._queues[name]) {
|
65 | return null;
|
66 | }
|
67 | return utils.map(this._queues[name], function(obj) {
|
68 | return obj.event;
|
69 | });
|
70 | };
|
71 |
|
72 | /**
|
73 | * Remove this event from the queue. The event is equal to another event if they
|
74 | * have the same ID returned from event.getId().
|
75 | * @param {MatrixEvent} event The event to remove.
|
76 | * @return {boolean} True if this event was removed.
|
77 | */
|
78 | MatrixScheduler.prototype.removeEventFromQueue = function(event) {
|
79 | const name = this.queueAlgorithm(event);
|
80 | if (!name || !this._queues[name]) {
|
81 | return false;
|
82 | }
|
83 | let removed = false;
|
84 | utils.removeElement(this._queues[name], function(element) {
|
85 | if (element.event.getId() === event.getId()) {
|
86 | // XXX we should probably reject the promise?
|
87 | // https://github.com/matrix-org/matrix-js-sdk/issues/496
|
88 | removed = true;
|
89 | return true;
|
90 | }
|
91 | });
|
92 | return removed;
|
93 | };
|
94 |
|
95 |
|
96 | /**
|
97 | * Set the process function. Required for events in the queue to be processed.
|
98 | * If set after events have been added to the queue, this will immediately start
|
99 | * processing them.
|
100 | * @param {module:scheduler~processFn} fn The function that can process events
|
101 | * in the queue.
|
102 | */
|
103 | MatrixScheduler.prototype.setProcessFunction = function(fn) {
|
104 | this._procFn = fn;
|
105 | _startProcessingQueues(this);
|
106 | };
|
107 |
|
108 | /**
|
109 | * Queue an event if it is required and start processing queues.
|
110 | * @param {MatrixEvent} event The event that may be queued.
|
111 | * @return {?Promise} A promise if the event was queued, which will be
|
112 | * resolved or rejected in due time, else null.
|
113 | */
|
114 | MatrixScheduler.prototype.queueEvent = function(event) {
|
115 | const queueName = this.queueAlgorithm(event);
|
116 | if (!queueName) {
|
117 | return null;
|
118 | }
|
119 | // add the event to the queue and make a deferred for it.
|
120 | if (!this._queues[queueName]) {
|
121 | this._queues[queueName] = [];
|
122 | }
|
123 | const defer = Promise.defer();
|
124 | this._queues[queueName].push({
|
125 | event: event,
|
126 | defer: defer,
|
127 | attempts: 0,
|
128 | });
|
129 | debuglog(
|
130 | "Queue algorithm dumped event %s into queue '%s'",
|
131 | event.getId(), queueName,
|
132 | );
|
133 | _startProcessingQueues(this);
|
134 | return defer.promise;
|
135 | };
|
136 |
|
137 | /**
|
138 | * Retries events up to 4 times using exponential backoff. This produces wait
|
139 | * times of 2, 4, 8, and 16 seconds (30s total) after which we give up. If the
|
140 | * failure was due to a rate limited request, the time specified in the error is
|
141 | * waited before being retried.
|
142 | * @param {MatrixEvent} event
|
143 | * @param {Number} attempts
|
144 | * @param {MatrixError} err
|
145 | * @return {Number}
|
146 | * @see module:scheduler~retryAlgorithm
|
147 | */
|
148 | MatrixScheduler.RETRY_BACKOFF_RATELIMIT = function(event, attempts, err) {
|
149 | if (err.httpStatus === 400 || err.httpStatus === 403 || err.httpStatus === 401) {
|
150 | // client error; no amount of retrying with save you now.
|
151 | return -1;
|
152 | }
|
153 | // we ship with browser-request which returns { cors: rejected } when trying
|
154 | // with no connection, so if we match that, give up since they have no conn.
|
155 | if (err.cors === "rejected") {
|
156 | return -1;
|
157 | }
|
158 |
|
159 | if (err.name === "M_LIMIT_EXCEEDED") {
|
160 | const waitTime = err.data.retry_after_ms;
|
161 | if (waitTime) {
|
162 | return waitTime;
|
163 | }
|
164 | }
|
165 | if (attempts > 4) {
|
166 | return -1; // give up
|
167 | }
|
168 | return (1000 * Math.pow(2, attempts));
|
169 | };
|
170 |
|
171 | /**
|
172 | * Queues <code>m.room.message</code> events and lets other events continue
|
173 | * concurrently.
|
174 | * @param {MatrixEvent} event
|
175 | * @return {string}
|
176 | * @see module:scheduler~queueAlgorithm
|
177 | */
|
178 | MatrixScheduler.QUEUE_MESSAGES = function(event) {
|
179 | if (event.getType() === "m.room.message") {
|
180 | // put these events in the 'message' queue.
|
181 | return "message";
|
182 | }
|
183 | // allow all other events continue concurrently.
|
184 | return null;
|
185 | };
|
186 |
|
187 | function _startProcessingQueues(scheduler) {
|
188 | if (!scheduler._procFn) {
|
189 | return;
|
190 | }
|
191 | // for each inactive queue with events in them
|
192 | utils.forEach(utils.filter(utils.keys(scheduler._queues), function(queueName) {
|
193 | return scheduler._activeQueues.indexOf(queueName) === -1 &&
|
194 | scheduler._queues[queueName].length > 0;
|
195 | }), function(queueName) {
|
196 | // mark the queue as active
|
197 | scheduler._activeQueues.push(queueName);
|
198 | // begin processing the head of the queue
|
199 | debuglog("Spinning up queue: '%s'", queueName);
|
200 | _processQueue(scheduler, queueName);
|
201 | });
|
202 | }
|
203 |
|
204 | function _processQueue(scheduler, queueName) {
|
205 | // get head of queue
|
206 | const obj = _peekNextEvent(scheduler, queueName);
|
207 | if (!obj) {
|
208 | // queue is empty. Mark as inactive and stop recursing.
|
209 | const index = scheduler._activeQueues.indexOf(queueName);
|
210 | if (index >= 0) {
|
211 | scheduler._activeQueues.splice(index, 1);
|
212 | }
|
213 | debuglog("Stopping queue '%s' as it is now empty", queueName);
|
214 | return;
|
215 | }
|
216 | debuglog(
|
217 | "Queue '%s' has %s pending events",
|
218 | queueName, scheduler._queues[queueName].length,
|
219 | );
|
220 | // fire the process function and if it resolves, resolve the deferred. Else
|
221 | // invoke the retry algorithm.
|
222 | scheduler._procFn(obj.event).done(function(res) {
|
223 | // remove this from the queue
|
224 | _removeNextEvent(scheduler, queueName);
|
225 | debuglog("Queue '%s' sent event %s", queueName, obj.event.getId());
|
226 | obj.defer.resolve(res);
|
227 | // keep processing
|
228 | _processQueue(scheduler, queueName);
|
229 | }, function(err) {
|
230 | obj.attempts += 1;
|
231 | // ask the retry algorithm when/if we should try again
|
232 | const waitTimeMs = scheduler.retryAlgorithm(obj.event, obj.attempts, err);
|
233 | debuglog(
|
234 | "retry(%s) err=%s event_id=%s waitTime=%s",
|
235 | obj.attempts, err, obj.event.getId(), waitTimeMs,
|
236 | );
|
237 | if (waitTimeMs === -1) { // give up (you quitter!)
|
238 | debuglog(
|
239 | "Queue '%s' giving up on event %s", queueName, obj.event.getId(),
|
240 | );
|
241 | // remove this from the queue
|
242 | _removeNextEvent(scheduler, queueName);
|
243 | obj.defer.reject(err);
|
244 | // process next event
|
245 | _processQueue(scheduler, queueName);
|
246 | } else {
|
247 | setTimeout(function() {
|
248 | _processQueue(scheduler, queueName);
|
249 | }, waitTimeMs);
|
250 | }
|
251 | });
|
252 | }
|
253 |
|
254 | function _peekNextEvent(scheduler, queueName) {
|
255 | const queue = scheduler._queues[queueName];
|
256 | if (!utils.isArray(queue)) {
|
257 | return null;
|
258 | }
|
259 | return queue[0];
|
260 | }
|
261 |
|
262 | function _removeNextEvent(scheduler, queueName) {
|
263 | const queue = scheduler._queues[queueName];
|
264 | if (!utils.isArray(queue)) {
|
265 | return null;
|
266 | }
|
267 | return queue.shift();
|
268 | }
|
269 |
|
270 | function debuglog() {
|
271 | if (DEBUG) {
|
272 | console.log(...arguments);
|
273 | }
|
274 | }
|
275 |
|
276 | /**
|
277 | * The retry algorithm to apply when retrying events. To stop retrying, return
|
278 | * <code>-1</code>. If this event was part of a queue, it will be removed from
|
279 | * the queue.
|
280 | * @callback retryAlgorithm
|
281 | * @param {MatrixEvent} event The event being retried.
|
282 | * @param {Number} attempts The number of failed attempts. This will always be
|
283 | * >= 1.
|
284 | * @param {MatrixError} err The most recent error message received when trying
|
285 | * to send this event.
|
286 | * @return {Number} The number of milliseconds to wait before trying again. If
|
287 | * this is 0, the request will be immediately retried. If this is
|
288 | * <code>-1</code>, the event will be marked as
|
289 | * {@link module:models/event.EventStatus.NOT_SENT} and will not be retried.
|
290 | */
|
291 |
|
292 | /**
|
293 | * The queuing algorithm to apply to events. This function must be idempotent as
|
294 | * it may be called multiple times with the same event. All queues created are
|
295 | * serviced in a FIFO manner. To send the event ASAP, return <code>null</code>
|
296 | * which will not put this event in a queue. Events that fail to send that form
|
297 | * part of a queue will be removed from the queue and the next event in the
|
298 | * queue will be sent.
|
299 | * @callback queueAlgorithm
|
300 | * @param {MatrixEvent} event The event to be sent.
|
301 | * @return {string} The name of the queue to put the event into. If a queue with
|
302 | * this name does not exist, it will be created. If this is <code>null</code>,
|
303 | * the event is not put into a queue and will be sent concurrently.
|
304 | */
|
305 |
|
306 | /**
|
307 | * The function to invoke to process (send) events in the queue.
|
308 | * @callback processFn
|
309 | * @param {MatrixEvent} event The event to send.
|
310 | * @return {Promise} Resolved/rejected depending on the outcome of the request.
|
311 | */
|
312 |
|
313 | /**
|
314 | * The MatrixScheduler class.
|
315 | */
|
316 | module.exports = MatrixScheduler;
|