UNPKG

14.3 kBPlain TextView Raw
1import type { EnhancedStore } from '@reduxjs/toolkit'
2import { configureStore, createSlice, createAction } from '@reduxjs/toolkit'
3
4import type { PayloadAction } from '@reduxjs/toolkit'
5import type {
6 AbortSignalWithReason,
7 ForkedTaskExecutor,
8 TaskResult,
9} from '../types'
10import { createListenerMiddleware, TaskAbortError } from '../index'
11import {
12 listenerCancelled,
13 listenerCompleted,
14 taskCancelled,
15} from '../exceptions'
16
17function delay(ms: number) {
18 return new Promise((resolve) => setTimeout(resolve, ms))
19}
20
21// @see https://deno.land/std@0.95.0/async/deferred.ts (MIT)
22export interface Deferred<T> extends Promise<T> {
23 resolve(value?: T | PromiseLike<T>): void
24 reject(reason?: any): void
25}
26
27/** Creates a Promise with the `reject` and `resolve` functions
28 * placed as methods on the promise object itself. It allows you to do:
29 *
30 * const p = deferred<number>();
31 * // ...
32 * p.resolve(42);
33 */
34export function deferred<T>(): Deferred<T> {
35 let methods
36 const promise = new Promise<T>((resolve, reject): void => {
37 methods = { resolve, reject }
38 })
39 return Object.assign(promise, methods) as Deferred<T>
40}
41
42interface CounterSlice {
43 value: number
44}
45
46describe('fork', () => {
47 const counterSlice = createSlice({
48 name: 'counter',
49 initialState: { value: 0 } as CounterSlice,
50 reducers: {
51 increment(state) {
52 state.value += 1
53 },
54 decrement(state) {
55 state.value -= 1
56 },
57 // Use the PayloadAction type to declare the contents of `action.payload`
58 incrementByAmount: (state, action: PayloadAction<number>) => {
59 state.value += action.payload
60 },
61 },
62 })
63 const { increment, decrement, incrementByAmount } = counterSlice.actions
64 let listenerMiddleware = createListenerMiddleware()
65 let { middleware, startListening, stopListening } = listenerMiddleware
66 let store = configureStore({
67 reducer: counterSlice.reducer,
68 middleware: (gDM) => gDM().prepend(middleware),
69 })
70
71 beforeEach(() => {
72 listenerMiddleware = createListenerMiddleware()
73 middleware = listenerMiddleware.middleware
74 startListening = listenerMiddleware.startListening
75 stopListening = listenerMiddleware.stopListening
76 store = configureStore({
77 reducer: counterSlice.reducer,
78 middleware: (gDM) => gDM().prepend(middleware),
79 })
80 })
81
82 it('runs executors in the next microtask', async () => {
83 let hasRunSyncExector = false
84 let hasRunAsyncExecutor = false
85
86 startListening({
87 actionCreator: increment,
88 effect: async (_, listenerApi) => {
89 listenerApi.fork(() => {
90 hasRunSyncExector = true
91 })
92
93 listenerApi.fork(async () => {
94 hasRunAsyncExecutor = true
95 })
96 },
97 })
98
99 store.dispatch(increment())
100
101 expect(hasRunSyncExector).toBe(false)
102 expect(hasRunAsyncExecutor).toBe(false)
103
104 await Promise.resolve()
105
106 expect(hasRunSyncExector).toBe(true)
107 expect(hasRunAsyncExecutor).toBe(true)
108 })
109
110 test('forkedTask.result rejects TaskAbortError if listener is cancelled', async () => {
111 const deferredForkedTaskError = deferred()
112
113 startListening({
114 actionCreator: increment,
115 async effect(_, listenerApi) {
116 listenerApi.cancelActiveListeners()
117 listenerApi
118 .fork(async () => {
119 await delay(10)
120
121 throw new Error('unreachable code')
122 })
123 .result.then(
124 deferredForkedTaskError.resolve,
125 deferredForkedTaskError.resolve
126 )
127 },
128 })
129
130 store.dispatch(increment())
131 store.dispatch(increment())
132
133 expect(await deferredForkedTaskError).toEqual(
134 new TaskAbortError(listenerCancelled)
135 )
136 })
137
138 it('synchronously throws TypeError error if the provided executor is not a function', () => {
139 const invalidExecutors = [null, {}, undefined, 1]
140
141 startListening({
142 predicate: () => true,
143 effect: async (_, listenerApi) => {
144 invalidExecutors.forEach((invalidExecutor) => {
145 let caughtError
146 try {
147 listenerApi.fork(invalidExecutor as any)
148 } catch (err) {
149 caughtError = err
150 }
151
152 expect(caughtError).toBeInstanceOf(TypeError)
153 })
154 },
155 })
156
157 store.dispatch(increment())
158
159 expect.assertions(invalidExecutors.length)
160 })
161
162 it('does not run an executor if the task is synchronously cancelled', async () => {
163 const storeStateAfter = deferred()
164
165 startListening({
166 actionCreator: increment,
167 effect: async (action, listenerApi) => {
168 const forkedTask = listenerApi.fork(() => {
169 listenerApi.dispatch(decrement())
170 listenerApi.dispatch(decrement())
171 listenerApi.dispatch(decrement())
172 })
173 forkedTask.cancel()
174
175 const result = await forkedTask.result
176 storeStateAfter.resolve(listenerApi.getState())
177 },
178 })
179 store.dispatch(increment())
180
181 expect(storeStateAfter).resolves.toEqual({ value: 1 })
182 })
183
184 it.each<{
185 desc: string
186 executor: ForkedTaskExecutor<any>
187 cancelAfterMs?: number
188 expected: TaskResult<any>
189 }>([
190 {
191 desc: 'sync exec - success',
192 executor: () => 42,
193 expected: { status: 'ok', value: 42 },
194 },
195 {
196 desc: 'sync exec - error',
197 executor: () => {
198 throw new Error('2020')
199 },
200 expected: { status: 'rejected', error: new Error('2020') },
201 },
202 {
203 desc: 'sync exec - sync cancel',
204 executor: () => 42,
205 cancelAfterMs: -1,
206 expected: {
207 status: 'cancelled',
208 error: new TaskAbortError(taskCancelled),
209 },
210 },
211 {
212 desc: 'sync exec - async cancel',
213 executor: () => 42,
214 cancelAfterMs: 0,
215 expected: { status: 'ok', value: 42 },
216 },
217 {
218 desc: 'async exec - async cancel',
219 executor: async (forkApi) => {
220 await forkApi.delay(100)
221 throw new Error('2020')
222 },
223 cancelAfterMs: 10,
224 expected: {
225 status: 'cancelled',
226 error: new TaskAbortError(taskCancelled),
227 },
228 },
229 {
230 desc: 'async exec - success',
231 executor: async () => {
232 await delay(20)
233 return Promise.resolve(21)
234 },
235 expected: { status: 'ok', value: 21 },
236 },
237 {
238 desc: 'async exec - error',
239 executor: async () => {
240 await Promise.resolve()
241 throw new Error('2020')
242 },
243 expected: { status: 'rejected', error: new Error('2020') },
244 },
245 {
246 desc: 'async exec - success with forkApi.pause',
247 executor: async (forkApi) => {
248 return forkApi.pause(Promise.resolve(2))
249 },
250 expected: { status: 'ok', value: 2 },
251 },
252 {
253 desc: 'async exec - error with forkApi.pause',
254 executor: async (forkApi) => {
255 return forkApi.pause(Promise.reject(22))
256 },
257 expected: { status: 'rejected', error: 22 },
258 },
259 {
260 desc: 'async exec - success with forkApi.delay',
261 executor: async (forkApi) => {
262 await forkApi.delay(10)
263 return 5
264 },
265 expected: { status: 'ok', value: 5 },
266 },
267 ])('%# - %j', async ({ executor, expected, cancelAfterMs }) => {
268 let deferredResult = deferred()
269 let forkedTask: any = {}
270
271 startListening({
272 predicate: () => true,
273 effect: async (_, listenerApi) => {
274 forkedTask = listenerApi.fork(executor)
275
276 deferredResult.resolve(await forkedTask.result)
277 },
278 })
279
280 store.dispatch({ type: '' })
281
282 if (typeof cancelAfterMs === 'number') {
283 if (cancelAfterMs < 0) {
284 forkedTask.cancel()
285 } else {
286 await delay(cancelAfterMs)
287 forkedTask.cancel()
288 }
289 }
290
291 const result = await deferredResult
292
293 expect(result).toEqual(expected)
294 })
295
296 describe('forkAPI', () => {
297 test('forkApi.delay rejects as soon as the task is cancelled', async () => {
298 let deferredResult = deferred()
299
300 startListening({
301 actionCreator: increment,
302 effect: async (_, listenerApi) => {
303 const forkedTask = listenerApi.fork(async (forkApi) => {
304 await forkApi.delay(100)
305
306 return 4
307 })
308
309 await listenerApi.delay(10)
310 forkedTask.cancel()
311 deferredResult.resolve(await forkedTask.result)
312 },
313 })
314
315 store.dispatch(increment())
316
317 expect(await deferredResult).toEqual({
318 status: 'cancelled',
319 error: new TaskAbortError(taskCancelled),
320 })
321 })
322
323 test('forkApi.delay rejects as soon as the parent listener is cancelled', async () => {
324 let deferredResult = deferred()
325
326 startListening({
327 actionCreator: increment,
328 effect: async (_, listenerApi) => {
329 listenerApi.cancelActiveListeners()
330 await listenerApi.fork(async (forkApi) => {
331 await forkApi
332 .delay(100)
333 .then(deferredResult.resolve, deferredResult.resolve)
334
335 return 4
336 }).result
337
338 deferredResult.resolve(new Error('unreachable'))
339 },
340 })
341
342 store.dispatch(increment())
343
344 await Promise.resolve()
345
346 store.dispatch(increment())
347 expect(await deferredResult).toEqual(
348 new TaskAbortError(listenerCancelled)
349 )
350 })
351
352 test('forkApi.signal listener is invoked as soon as the parent listener is cancelled or completed', async () => {
353 let deferredResult = deferred()
354
355 startListening({
356 actionCreator: increment,
357 async effect(_, listenerApi) {
358 const wronglyDoNotAwaitResultOfTask = listenerApi.fork(
359 async (forkApi) => {
360 forkApi.signal.addEventListener('abort', () => {
361 deferredResult.resolve(
362 (forkApi.signal as AbortSignalWithReason<unknown>).reason
363 )
364 })
365 }
366 )
367 },
368 })
369
370 store.dispatch(increment)
371
372 expect(await deferredResult).toBe(listenerCompleted)
373 })
374
375 test('fork.delay does not trigger unhandledRejections for completed or cancelled tasks', async () => {
376 let deferredCompletedEvt = deferred()
377 let deferredCancelledEvt = deferred()
378
379 // Unfortunately we cannot test declaratively unhandleRejections in jest: https://github.com/facebook/jest/issues/5620
380 // This test just fails if an `unhandledRejection` occurs.
381 startListening({
382 actionCreator: increment,
383 effect: async (_, listenerApi) => {
384 const completedTask = listenerApi.fork(async (forkApi) => {
385 forkApi.signal.addEventListener(
386 'abort',
387 deferredCompletedEvt.resolve,
388 { once: true }
389 )
390 forkApi.delay(100) // missing await
391
392 return 4
393 })
394
395 deferredCompletedEvt.resolve(await completedTask.result)
396
397 const godotPauseTrigger = deferred()
398
399 const cancelledTask = listenerApi.fork(async (forkApi) => {
400 forkApi.signal.addEventListener(
401 'abort',
402 deferredCompletedEvt.resolve,
403 { once: true }
404 )
405 forkApi.delay(1_000) // missing await
406 await forkApi.pause(godotPauseTrigger)
407 return 4
408 })
409
410 await Promise.resolve()
411 cancelledTask.cancel()
412 deferredCancelledEvt.resolve(await cancelledTask.result)
413 },
414 })
415
416 store.dispatch(increment())
417 expect(await deferredCompletedEvt).toBeDefined()
418 expect(await deferredCancelledEvt).toBeDefined()
419 })
420 })
421
422 test('forkApi.pause rejects if task is cancelled', async () => {
423 let deferredResult = deferred()
424 startListening({
425 actionCreator: increment,
426 effect: async (_, listenerApi) => {
427 const forkedTask = listenerApi.fork(async (forkApi) => {
428 await forkApi.pause(delay(1_000))
429
430 return 4
431 })
432
433 await Promise.resolve()
434 forkedTask.cancel()
435 deferredResult.resolve(await forkedTask.result)
436 },
437 })
438
439 store.dispatch(increment())
440
441 expect(await deferredResult).toEqual({
442 status: 'cancelled',
443 error: new TaskAbortError(taskCancelled),
444 })
445 })
446
447 test('forkApi.pause rejects as soon as the parent listener is cancelled', async () => {
448 let deferredResult = deferred()
449
450 startListening({
451 actionCreator: increment,
452 effect: async (_, listenerApi) => {
453 listenerApi.cancelActiveListeners()
454 const forkedTask = listenerApi.fork(async (forkApi) => {
455 await forkApi
456 .pause(delay(100))
457 .then(deferredResult.resolve, deferredResult.resolve)
458
459 return 4
460 })
461
462 await forkedTask.result
463 deferredResult.resolve(new Error('unreachable'))
464 },
465 })
466
467 store.dispatch(increment())
468
469 await Promise.resolve()
470
471 store.dispatch(increment())
472 expect(await deferredResult).toEqual(new TaskAbortError(listenerCancelled))
473 })
474
475 test('forkApi.pause rejects if listener is cancelled', async () => {
476 const incrementByInListener = createAction<number>('incrementByInListener')
477
478 startListening({
479 actionCreator: incrementByInListener,
480 async effect({ payload: amountToIncrement }, listenerApi) {
481 listenerApi.cancelActiveListeners()
482 await listenerApi.fork(async (forkApi) => {
483 await forkApi.pause(delay(10))
484 listenerApi.dispatch(incrementByAmount(amountToIncrement))
485 }).result
486 listenerApi.dispatch(incrementByAmount(2 * amountToIncrement))
487 },
488 })
489
490 store.dispatch(incrementByInListener(10))
491 store.dispatch(incrementByInListener(100))
492
493 await delay(50)
494
495 expect(store.getState().value).toEqual(300)
496 })
497})