1 |
|
2 |
|
3 |
|
4 | import 'source-map-support/register';
|
5 | import Promise from 'bluebird';
|
6 | const sdk = require("../..");
|
7 | const MatrixScheduler = sdk.MatrixScheduler;
|
8 | const MatrixError = sdk.MatrixError;
|
9 | const utils = require("../test-utils");
|
10 |
|
11 | import expect from 'expect';
|
12 | import lolex from 'lolex';
|
13 |
|
14 | describe("MatrixScheduler", function() {
|
15 | let clock;
|
16 | let scheduler;
|
17 | let retryFn;
|
18 | let queueFn;
|
19 | let defer;
|
20 | const roomId = "!foo:bar";
|
21 | const eventA = utils.mkMessage({
|
22 | user: "@alice:bar", room: roomId, event: true,
|
23 | });
|
24 | const eventB = utils.mkMessage({
|
25 | user: "@alice:bar", room: roomId, event: true,
|
26 | });
|
27 |
|
28 | beforeEach(function() {
|
29 | utils.beforeEach(this);
|
30 | clock = lolex.install();
|
31 | scheduler = new MatrixScheduler(function(ev, attempts, err) {
|
32 | if (retryFn) {
|
33 | return retryFn(ev, attempts, err);
|
34 | }
|
35 | return -1;
|
36 | }, function(event) {
|
37 | if (queueFn) {
|
38 | return queueFn(event);
|
39 | }
|
40 | return null;
|
41 | });
|
42 | retryFn = null;
|
43 | queueFn = null;
|
44 | defer = Promise.defer();
|
45 | });
|
46 |
|
47 | afterEach(function() {
|
48 | clock.uninstall();
|
49 | });
|
50 |
|
51 | it("should process events in a queue in a FIFO manner", async function() {
|
52 | retryFn = function() {
|
53 | return 0;
|
54 | };
|
55 | queueFn = function() {
|
56 | return "one_big_queue";
|
57 | };
|
58 | const deferA = Promise.defer();
|
59 | const deferB = Promise.defer();
|
60 | let yieldedA = false;
|
61 | scheduler.setProcessFunction(function(event) {
|
62 | if (yieldedA) {
|
63 | expect(event).toEqual(eventB);
|
64 | return deferB.promise;
|
65 | } else {
|
66 | yieldedA = true;
|
67 | expect(event).toEqual(eventA);
|
68 | return deferA.promise;
|
69 | }
|
70 | });
|
71 | const abPromise = Promise.all([
|
72 | scheduler.queueEvent(eventA),
|
73 | scheduler.queueEvent(eventB),
|
74 | ]);
|
75 | deferB.resolve({b: true});
|
76 | deferA.resolve({a: true});
|
77 | const [a, b] = await abPromise;
|
78 | expect(a.a).toEqual(true);
|
79 | expect(b.b).toEqual(true);
|
80 | });
|
81 |
|
82 | it("should invoke the retryFn on failure and wait the amount of time specified",
|
83 | async function() {
|
84 | const waitTimeMs = 1500;
|
85 | const retryDefer = Promise.defer();
|
86 | retryFn = function() {
|
87 | retryDefer.resolve();
|
88 | return waitTimeMs;
|
89 | };
|
90 | queueFn = function() {
|
91 | return "yep";
|
92 | };
|
93 |
|
94 | let procCount = 0;
|
95 | scheduler.setProcessFunction(function(ev) {
|
96 | procCount += 1;
|
97 | if (procCount === 1) {
|
98 | expect(ev).toEqual(eventA);
|
99 | return defer.promise;
|
100 | } else if (procCount === 2) {
|
101 |
|
102 | return new Promise();
|
103 | }
|
104 | expect(procCount).toBeLessThan(3);
|
105 | });
|
106 |
|
107 | scheduler.queueEvent(eventA);
|
108 |
|
109 |
|
110 | await Promise.resolve();
|
111 | expect(procCount).toEqual(1);
|
112 | defer.reject({});
|
113 | await retryDefer.promise;
|
114 | expect(procCount).toEqual(1);
|
115 | clock.tick(waitTimeMs);
|
116 | await Promise.resolve();
|
117 | expect(procCount).toEqual(2);
|
118 | });
|
119 |
|
120 | it("should give up if the retryFn on failure returns -1 and try the next event",
|
121 | async function() {
|
122 |
|
123 |
|
124 |
|
125 | retryFn = function() {
|
126 | return -1;
|
127 | };
|
128 | queueFn = function() {
|
129 | return "yep";
|
130 | };
|
131 |
|
132 | const deferA = Promise.defer();
|
133 | const deferB = Promise.defer();
|
134 | let procCount = 0;
|
135 | scheduler.setProcessFunction(function(ev) {
|
136 | procCount += 1;
|
137 | if (procCount === 1) {
|
138 | expect(ev).toEqual(eventA);
|
139 | return deferA.promise;
|
140 | } else if (procCount === 2) {
|
141 | expect(ev).toEqual(eventB);
|
142 | return deferB.promise;
|
143 | }
|
144 | expect(procCount).toBeLessThan(3);
|
145 | });
|
146 |
|
147 | const globalA = scheduler.queueEvent(eventA);
|
148 | scheduler.queueEvent(eventB);
|
149 |
|
150 |
|
151 | await Promise.resolve();
|
152 | expect(procCount).toEqual(1);
|
153 | deferA.reject({});
|
154 | try {
|
155 | await globalA;
|
156 | } catch(err) {
|
157 | await Promise.resolve();
|
158 | expect(procCount).toEqual(2);
|
159 | }
|
160 | });
|
161 |
|
162 | it("should treat each queue separately", function(done) {
|
163 |
|
164 |
|
165 |
|
166 |
|
167 |
|
168 |
|
169 | const eventC = utils.mkMessage({user: "@a:bar", room: roomId, event: true});
|
170 | const eventD = utils.mkMessage({user: "@b:bar", room: roomId, event: true});
|
171 |
|
172 | const buckets = {};
|
173 | buckets[eventA.getId()] = "queue_A";
|
174 | buckets[eventD.getId()] = "queue_A";
|
175 | buckets[eventB.getId()] = "queue_B";
|
176 | buckets[eventC.getId()] = "queue_B";
|
177 |
|
178 | retryFn = function() {
|
179 | return 0;
|
180 | };
|
181 | queueFn = function(event) {
|
182 | return buckets[event.getId()];
|
183 | };
|
184 |
|
185 | const expectOrder = [
|
186 | eventA.getId(), eventB.getId(), eventD.getId(),
|
187 | ];
|
188 | const deferA = Promise.defer();
|
189 | scheduler.setProcessFunction(function(event) {
|
190 | const id = expectOrder.shift();
|
191 | expect(id).toEqual(event.getId());
|
192 | if (expectOrder.length === 0) {
|
193 | done();
|
194 | }
|
195 | return id === eventA.getId() ? deferA.promise : defer.promise;
|
196 | });
|
197 | scheduler.queueEvent(eventA);
|
198 | scheduler.queueEvent(eventB);
|
199 | scheduler.queueEvent(eventC);
|
200 | scheduler.queueEvent(eventD);
|
201 |
|
202 |
|
203 | setTimeout(function() {
|
204 | deferA.resolve({});
|
205 | }, 1000);
|
206 | clock.tick(1000);
|
207 | });
|
208 |
|
209 | describe("queueEvent", function() {
|
210 | it("should return null if the event shouldn't be queued", function() {
|
211 | queueFn = function() {
|
212 | return null;
|
213 | };
|
214 | expect(scheduler.queueEvent(eventA)).toEqual(null);
|
215 | });
|
216 |
|
217 | it("should return a Promise if the event is queued", function() {
|
218 | queueFn = function() {
|
219 | return "yep";
|
220 | };
|
221 | const prom = scheduler.queueEvent(eventA);
|
222 | expect(prom).toBeTruthy();
|
223 | expect(prom.then).toBeTruthy();
|
224 | });
|
225 | });
|
226 |
|
227 | describe("getQueueForEvent", function() {
|
228 | it("should return null if the event doesn't map to a queue name", function() {
|
229 | queueFn = function() {
|
230 | return null;
|
231 | };
|
232 | expect(scheduler.getQueueForEvent(eventA)).toBe(null);
|
233 | });
|
234 |
|
235 | it("should return null if the mapped queue doesn't exist", function() {
|
236 | queueFn = function() {
|
237 | return "yep";
|
238 | };
|
239 | expect(scheduler.getQueueForEvent(eventA)).toBe(null);
|
240 | });
|
241 |
|
242 | it("should return a list of events in the queue and modifications to" +
|
243 | " the list should not affect the underlying queue.", function() {
|
244 | queueFn = function() {
|
245 | return "yep";
|
246 | };
|
247 | scheduler.queueEvent(eventA);
|
248 | scheduler.queueEvent(eventB);
|
249 | const queue = scheduler.getQueueForEvent(eventA);
|
250 | expect(queue.length).toEqual(2);
|
251 | expect(queue).toEqual([eventA, eventB]);
|
252 |
|
253 | const eventC = utils.mkMessage(
|
254 | {user: "@a:bar", room: roomId, event: true},
|
255 | );
|
256 | queue.push(eventC);
|
257 | const queueAgain = scheduler.getQueueForEvent(eventA);
|
258 | expect(queueAgain.length).toEqual(2);
|
259 | });
|
260 |
|
261 | it("should return a list of events in the queue and modifications to" +
|
262 | " an event in the queue should affect the underlying queue.", function() {
|
263 | queueFn = function() {
|
264 | return "yep";
|
265 | };
|
266 | scheduler.queueEvent(eventA);
|
267 | scheduler.queueEvent(eventB);
|
268 | const queue = scheduler.getQueueForEvent(eventA);
|
269 | queue[1].event.content.body = "foo";
|
270 | const queueAgain = scheduler.getQueueForEvent(eventA);
|
271 | expect(queueAgain[1].event.content.body).toEqual("foo");
|
272 | });
|
273 | });
|
274 |
|
275 | describe("removeEventFromQueue", function() {
|
276 | it("should return false if the event doesn't map to a queue name", function() {
|
277 | queueFn = function() {
|
278 | return null;
|
279 | };
|
280 | expect(scheduler.removeEventFromQueue(eventA)).toBe(false);
|
281 | });
|
282 |
|
283 | it("should return false if the event isn't in the queue", function() {
|
284 | queueFn = function() {
|
285 | return "yep";
|
286 | };
|
287 | expect(scheduler.removeEventFromQueue(eventA)).toBe(false);
|
288 | });
|
289 |
|
290 | it("should return true if the event was removed", function() {
|
291 | queueFn = function() {
|
292 | return "yep";
|
293 | };
|
294 | scheduler.queueEvent(eventA);
|
295 | expect(scheduler.removeEventFromQueue(eventA)).toBe(true);
|
296 | });
|
297 | });
|
298 |
|
299 | describe("setProcessFunction", function() {
|
300 | it("should call the processFn if there are queued events", function() {
|
301 | queueFn = function() {
|
302 | return "yep";
|
303 | };
|
304 | let procCount = 0;
|
305 | scheduler.queueEvent(eventA);
|
306 | scheduler.setProcessFunction(function(ev) {
|
307 | procCount += 1;
|
308 | expect(ev).toEqual(eventA);
|
309 | return defer.promise;
|
310 | });
|
311 |
|
312 |
|
313 | Promise.resolve().then(() => {
|
314 | expect(procCount).toEqual(1);
|
315 | });
|
316 | });
|
317 |
|
318 | it("should not call the processFn if there are no queued events", function() {
|
319 | queueFn = function() {
|
320 | return "yep";
|
321 | };
|
322 | let procCount = 0;
|
323 | scheduler.setProcessFunction(function(ev) {
|
324 | procCount += 1;
|
325 | return defer.promise;
|
326 | });
|
327 | expect(procCount).toEqual(0);
|
328 | });
|
329 | });
|
330 |
|
331 | describe("QUEUE_MESSAGES", function() {
|
332 | it("should queue m.room.message events only", function() {
|
333 | expect(MatrixScheduler.QUEUE_MESSAGES(eventA)).toEqual("message");
|
334 | expect(MatrixScheduler.QUEUE_MESSAGES(
|
335 | utils.mkMembership({
|
336 | user: "@alice:bar", room: roomId, mship: "join", event: true,
|
337 | }),
|
338 | )).toEqual(null);
|
339 | });
|
340 | });
|
341 |
|
342 | describe("RETRY_BACKOFF_RATELIMIT", function() {
|
343 | it("should wait at least the time given on M_LIMIT_EXCEEDED", function() {
|
344 | const res = MatrixScheduler.RETRY_BACKOFF_RATELIMIT(
|
345 | eventA, 1, new MatrixError({
|
346 | errcode: "M_LIMIT_EXCEEDED", retry_after_ms: 5000,
|
347 | }),
|
348 | );
|
349 | expect(res >= 500).toBe(true, "Didn't wait long enough.");
|
350 | });
|
351 |
|
352 | it("should give up after 5 attempts", function() {
|
353 | const res = MatrixScheduler.RETRY_BACKOFF_RATELIMIT(
|
354 | eventA, 5, {},
|
355 | );
|
356 | expect(res).toBe(-1, "Didn't give up.");
|
357 | });
|
358 |
|
359 | it("should do exponential backoff", function() {
|
360 | expect(MatrixScheduler.RETRY_BACKOFF_RATELIMIT(
|
361 | eventA, 1, {},
|
362 | )).toEqual(2000);
|
363 | expect(MatrixScheduler.RETRY_BACKOFF_RATELIMIT(
|
364 | eventA, 2, {},
|
365 | )).toEqual(4000);
|
366 | expect(MatrixScheduler.RETRY_BACKOFF_RATELIMIT(
|
367 | eventA, 3, {},
|
368 | )).toEqual(8000);
|
369 | expect(MatrixScheduler.RETRY_BACKOFF_RATELIMIT(
|
370 | eventA, 4, {},
|
371 | )).toEqual(16000);
|
372 | });
|
373 | });
|
374 | });
|