UNPKG

12.7 kBJavaScriptView Raw
1// This file had a function whose name is all caps, which displeases eslint
2/* eslint new-cap: "off" */
3
4import 'source-map-support/register';
5import Promise from 'bluebird';
6const sdk = require("../..");
7const MatrixScheduler = sdk.MatrixScheduler;
8const MatrixError = sdk.MatrixError;
9const utils = require("../test-utils");
10
11import expect from 'expect';
12import lolex from 'lolex';
13
14describe("MatrixScheduler", function() {
15 let clock;
16 let scheduler;
17 let retryFn;
18 let queueFn;
19 let defer;
20 const roomId = "!foo:bar";
21 const eventA = utils.mkMessage({
22 user: "@alice:bar", room: roomId, event: true,
23 });
24 const eventB = utils.mkMessage({
25 user: "@alice:bar", room: roomId, event: true,
26 });
27
28 beforeEach(function() {
29 utils.beforeEach(this); // eslint-disable-line babel/no-invalid-this
30 clock = lolex.install();
31 scheduler = new MatrixScheduler(function(ev, attempts, err) {
32 if (retryFn) {
33 return retryFn(ev, attempts, err);
34 }
35 return -1;
36 }, function(event) {
37 if (queueFn) {
38 return queueFn(event);
39 }
40 return null;
41 });
42 retryFn = null;
43 queueFn = null;
44 defer = Promise.defer();
45 });
46
47 afterEach(function() {
48 clock.uninstall();
49 });
50
51 it("should process events in a queue in a FIFO manner", async function() {
52 retryFn = function() {
53 return 0;
54 };
55 queueFn = function() {
56 return "one_big_queue";
57 };
58 const deferA = Promise.defer();
59 const deferB = Promise.defer();
60 let yieldedA = false;
61 scheduler.setProcessFunction(function(event) {
62 if (yieldedA) {
63 expect(event).toEqual(eventB);
64 return deferB.promise;
65 } else {
66 yieldedA = true;
67 expect(event).toEqual(eventA);
68 return deferA.promise;
69 }
70 });
71 const abPromise = Promise.all([
72 scheduler.queueEvent(eventA),
73 scheduler.queueEvent(eventB),
74 ]);
75 deferB.resolve({b: true});
76 deferA.resolve({a: true});
77 const [a, b] = await abPromise;
78 expect(a.a).toEqual(true);
79 expect(b.b).toEqual(true);
80 });
81
82 it("should invoke the retryFn on failure and wait the amount of time specified",
83 async function() {
84 const waitTimeMs = 1500;
85 const retryDefer = Promise.defer();
86 retryFn = function() {
87 retryDefer.resolve();
88 return waitTimeMs;
89 };
90 queueFn = function() {
91 return "yep";
92 };
93
94 let procCount = 0;
95 scheduler.setProcessFunction(function(ev) {
96 procCount += 1;
97 if (procCount === 1) {
98 expect(ev).toEqual(eventA);
99 return defer.promise;
100 } else if (procCount === 2) {
101 // don't care about this defer
102 return new Promise();
103 }
104 expect(procCount).toBeLessThan(3);
105 });
106
107 scheduler.queueEvent(eventA);
108 // as queueing doesn't start processing synchronously anymore (see commit bbdb5ac)
109 // wait just long enough before it does
110 await Promise.resolve();
111 expect(procCount).toEqual(1);
112 defer.reject({});
113 await retryDefer.promise;
114 expect(procCount).toEqual(1);
115 clock.tick(waitTimeMs);
116 await Promise.resolve();
117 expect(procCount).toEqual(2);
118 });
119
120 it("should give up if the retryFn on failure returns -1 and try the next event",
121 async function() {
122 // Queue A & B.
123 // Reject A and return -1 on retry.
124 // Expect B to be tried next and the promise for A to be rejected.
125 retryFn = function() {
126 return -1;
127 };
128 queueFn = function() {
129 return "yep";
130 };
131
132 const deferA = Promise.defer();
133 const deferB = Promise.defer();
134 let procCount = 0;
135 scheduler.setProcessFunction(function(ev) {
136 procCount += 1;
137 if (procCount === 1) {
138 expect(ev).toEqual(eventA);
139 return deferA.promise;
140 } else if (procCount === 2) {
141 expect(ev).toEqual(eventB);
142 return deferB.promise;
143 }
144 expect(procCount).toBeLessThan(3);
145 });
146
147 const globalA = scheduler.queueEvent(eventA);
148 scheduler.queueEvent(eventB);
149 // as queueing doesn't start processing synchronously anymore (see commit bbdb5ac)
150 // wait just long enough before it does
151 await Promise.resolve();
152 expect(procCount).toEqual(1);
153 deferA.reject({});
154 try {
155 await globalA;
156 } catch(err) {
157 await Promise.resolve();
158 expect(procCount).toEqual(2);
159 }
160 });
161
162 it("should treat each queue separately", function(done) {
163 // Queue messages A B C D.
164 // Bucket A&D into queue_A
165 // Bucket B&C into queue_B
166 // Expect to have processFn invoked for A&B.
167 // Resolve A.
168 // Expect to have processFn invoked for D.
169 const eventC = utils.mkMessage({user: "@a:bar", room: roomId, event: true});
170 const eventD = utils.mkMessage({user: "@b:bar", room: roomId, event: true});
171
172 const buckets = {};
173 buckets[eventA.getId()] = "queue_A";
174 buckets[eventD.getId()] = "queue_A";
175 buckets[eventB.getId()] = "queue_B";
176 buckets[eventC.getId()] = "queue_B";
177
178 retryFn = function() {
179 return 0;
180 };
181 queueFn = function(event) {
182 return buckets[event.getId()];
183 };
184
185 const expectOrder = [
186 eventA.getId(), eventB.getId(), eventD.getId(),
187 ];
188 const deferA = Promise.defer();
189 scheduler.setProcessFunction(function(event) {
190 const id = expectOrder.shift();
191 expect(id).toEqual(event.getId());
192 if (expectOrder.length === 0) {
193 done();
194 }
195 return id === eventA.getId() ? deferA.promise : defer.promise;
196 });
197 scheduler.queueEvent(eventA);
198 scheduler.queueEvent(eventB);
199 scheduler.queueEvent(eventC);
200 scheduler.queueEvent(eventD);
201
202 // wait a bit then resolve A and we should get D (not C) next.
203 setTimeout(function() {
204 deferA.resolve({});
205 }, 1000);
206 clock.tick(1000);
207 });
208
209 describe("queueEvent", function() {
210 it("should return null if the event shouldn't be queued", function() {
211 queueFn = function() {
212 return null;
213 };
214 expect(scheduler.queueEvent(eventA)).toEqual(null);
215 });
216
217 it("should return a Promise if the event is queued", function() {
218 queueFn = function() {
219 return "yep";
220 };
221 const prom = scheduler.queueEvent(eventA);
222 expect(prom).toBeTruthy();
223 expect(prom.then).toBeTruthy();
224 });
225 });
226
227 describe("getQueueForEvent", function() {
228 it("should return null if the event doesn't map to a queue name", function() {
229 queueFn = function() {
230 return null;
231 };
232 expect(scheduler.getQueueForEvent(eventA)).toBe(null);
233 });
234
235 it("should return null if the mapped queue doesn't exist", function() {
236 queueFn = function() {
237 return "yep";
238 };
239 expect(scheduler.getQueueForEvent(eventA)).toBe(null);
240 });
241
242 it("should return a list of events in the queue and modifications to" +
243 " the list should not affect the underlying queue.", function() {
244 queueFn = function() {
245 return "yep";
246 };
247 scheduler.queueEvent(eventA);
248 scheduler.queueEvent(eventB);
249 const queue = scheduler.getQueueForEvent(eventA);
250 expect(queue.length).toEqual(2);
251 expect(queue).toEqual([eventA, eventB]);
252 // modify the queue
253 const eventC = utils.mkMessage(
254 {user: "@a:bar", room: roomId, event: true},
255 );
256 queue.push(eventC);
257 const queueAgain = scheduler.getQueueForEvent(eventA);
258 expect(queueAgain.length).toEqual(2);
259 });
260
261 it("should return a list of events in the queue and modifications to" +
262 " an event in the queue should affect the underlying queue.", function() {
263 queueFn = function() {
264 return "yep";
265 };
266 scheduler.queueEvent(eventA);
267 scheduler.queueEvent(eventB);
268 const queue = scheduler.getQueueForEvent(eventA);
269 queue[1].event.content.body = "foo";
270 const queueAgain = scheduler.getQueueForEvent(eventA);
271 expect(queueAgain[1].event.content.body).toEqual("foo");
272 });
273 });
274
275 describe("removeEventFromQueue", function() {
276 it("should return false if the event doesn't map to a queue name", function() {
277 queueFn = function() {
278 return null;
279 };
280 expect(scheduler.removeEventFromQueue(eventA)).toBe(false);
281 });
282
283 it("should return false if the event isn't in the queue", function() {
284 queueFn = function() {
285 return "yep";
286 };
287 expect(scheduler.removeEventFromQueue(eventA)).toBe(false);
288 });
289
290 it("should return true if the event was removed", function() {
291 queueFn = function() {
292 return "yep";
293 };
294 scheduler.queueEvent(eventA);
295 expect(scheduler.removeEventFromQueue(eventA)).toBe(true);
296 });
297 });
298
299 describe("setProcessFunction", function() {
300 it("should call the processFn if there are queued events", function() {
301 queueFn = function() {
302 return "yep";
303 };
304 let procCount = 0;
305 scheduler.queueEvent(eventA);
306 scheduler.setProcessFunction(function(ev) {
307 procCount += 1;
308 expect(ev).toEqual(eventA);
309 return defer.promise;
310 });
311 // as queueing doesn't start processing synchronously anymore (see commit bbdb5ac)
312 // wait just long enough before it does
313 Promise.resolve().then(() => {
314 expect(procCount).toEqual(1);
315 });
316 });
317
318 it("should not call the processFn if there are no queued events", function() {
319 queueFn = function() {
320 return "yep";
321 };
322 let procCount = 0;
323 scheduler.setProcessFunction(function(ev) {
324 procCount += 1;
325 return defer.promise;
326 });
327 expect(procCount).toEqual(0);
328 });
329 });
330
331 describe("QUEUE_MESSAGES", function() {
332 it("should queue m.room.message events only", function() {
333 expect(MatrixScheduler.QUEUE_MESSAGES(eventA)).toEqual("message");
334 expect(MatrixScheduler.QUEUE_MESSAGES(
335 utils.mkMembership({
336 user: "@alice:bar", room: roomId, mship: "join", event: true,
337 }),
338 )).toEqual(null);
339 });
340 });
341
342 describe("RETRY_BACKOFF_RATELIMIT", function() {
343 it("should wait at least the time given on M_LIMIT_EXCEEDED", function() {
344 const res = MatrixScheduler.RETRY_BACKOFF_RATELIMIT(
345 eventA, 1, new MatrixError({
346 errcode: "M_LIMIT_EXCEEDED", retry_after_ms: 5000,
347 }),
348 );
349 expect(res >= 500).toBe(true, "Didn't wait long enough.");
350 });
351
352 it("should give up after 5 attempts", function() {
353 const res = MatrixScheduler.RETRY_BACKOFF_RATELIMIT(
354 eventA, 5, {},
355 );
356 expect(res).toBe(-1, "Didn't give up.");
357 });
358
359 it("should do exponential backoff", function() {
360 expect(MatrixScheduler.RETRY_BACKOFF_RATELIMIT(
361 eventA, 1, {},
362 )).toEqual(2000);
363 expect(MatrixScheduler.RETRY_BACKOFF_RATELIMIT(
364 eventA, 2, {},
365 )).toEqual(4000);
366 expect(MatrixScheduler.RETRY_BACKOFF_RATELIMIT(
367 eventA, 3, {},
368 )).toEqual(8000);
369 expect(MatrixScheduler.RETRY_BACKOFF_RATELIMIT(
370 eventA, 4, {},
371 )).toEqual(16000);
372 });
373 });
374});