1 | import type { EnhancedStore } from '@reduxjs/toolkit'
|
2 | import { configureStore, createSlice, createAction } from '@reduxjs/toolkit'
|
3 |
|
4 | import type { PayloadAction } from '@reduxjs/toolkit'
|
5 | import type {
|
6 | AbortSignalWithReason,
|
7 | ForkedTaskExecutor,
|
8 | TaskResult,
|
9 | } from '../types'
|
10 | import { createListenerMiddleware, TaskAbortError } from '../index'
|
11 | import {
|
12 | listenerCancelled,
|
13 | listenerCompleted,
|
14 | taskCancelled,
|
15 | } from '../exceptions'
|
16 |
|
17 | function delay(ms: number) {
|
18 | return new Promise((resolve) => setTimeout(resolve, ms))
|
19 | }
|
20 |
|
21 |
|
22 | export interface Deferred<T> extends Promise<T> {
|
23 | resolve(value?: T | PromiseLike<T>): void
|
24 | reject(reason?: any): void
|
25 | }
|
26 |
|
27 |
|
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 | export function deferred<T>(): Deferred<T> {
|
35 | let methods
|
36 | const promise = new Promise<T>((resolve, reject): void => {
|
37 | methods = { resolve, reject }
|
38 | })
|
39 | return Object.assign(promise, methods) as Deferred<T>
|
40 | }
|
41 |
|
42 | interface CounterSlice {
|
43 | value: number
|
44 | }
|
45 |
|
46 | describe('fork', () => {
|
47 | const counterSlice = createSlice({
|
48 | name: 'counter',
|
49 | initialState: { value: 0 } as CounterSlice,
|
50 | reducers: {
|
51 | increment(state) {
|
52 | state.value += 1
|
53 | },
|
54 | decrement(state) {
|
55 | state.value -= 1
|
56 | },
|
57 |
|
58 | incrementByAmount: (state, action: PayloadAction<number>) => {
|
59 | state.value += action.payload
|
60 | },
|
61 | },
|
62 | })
|
63 | const { increment, decrement, incrementByAmount } = counterSlice.actions
|
64 | let listenerMiddleware = createListenerMiddleware()
|
65 | let { middleware, startListening, stopListening } = listenerMiddleware
|
66 | let store = configureStore({
|
67 | reducer: counterSlice.reducer,
|
68 | middleware: (gDM) => gDM().prepend(middleware),
|
69 | })
|
70 |
|
71 | beforeEach(() => {
|
72 | listenerMiddleware = createListenerMiddleware()
|
73 | middleware = listenerMiddleware.middleware
|
74 | startListening = listenerMiddleware.startListening
|
75 | stopListening = listenerMiddleware.stopListening
|
76 | store = configureStore({
|
77 | reducer: counterSlice.reducer,
|
78 | middleware: (gDM) => gDM().prepend(middleware),
|
79 | })
|
80 | })
|
81 |
|
82 | it('runs executors in the next microtask', async () => {
|
83 | let hasRunSyncExector = false
|
84 | let hasRunAsyncExecutor = false
|
85 |
|
86 | startListening({
|
87 | actionCreator: increment,
|
88 | effect: async (_, listenerApi) => {
|
89 | listenerApi.fork(() => {
|
90 | hasRunSyncExector = true
|
91 | })
|
92 |
|
93 | listenerApi.fork(async () => {
|
94 | hasRunAsyncExecutor = true
|
95 | })
|
96 | },
|
97 | })
|
98 |
|
99 | store.dispatch(increment())
|
100 |
|
101 | expect(hasRunSyncExector).toBe(false)
|
102 | expect(hasRunAsyncExecutor).toBe(false)
|
103 |
|
104 | await Promise.resolve()
|
105 |
|
106 | expect(hasRunSyncExector).toBe(true)
|
107 | expect(hasRunAsyncExecutor).toBe(true)
|
108 | })
|
109 |
|
110 | test('forkedTask.result rejects TaskAbortError if listener is cancelled', async () => {
|
111 | const deferredForkedTaskError = deferred()
|
112 |
|
113 | startListening({
|
114 | actionCreator: increment,
|
115 | async effect(_, listenerApi) {
|
116 | listenerApi.cancelActiveListeners()
|
117 | listenerApi
|
118 | .fork(async () => {
|
119 | await delay(10)
|
120 |
|
121 | throw new Error('unreachable code')
|
122 | })
|
123 | .result.then(
|
124 | deferredForkedTaskError.resolve,
|
125 | deferredForkedTaskError.resolve
|
126 | )
|
127 | },
|
128 | })
|
129 |
|
130 | store.dispatch(increment())
|
131 | store.dispatch(increment())
|
132 |
|
133 | expect(await deferredForkedTaskError).toEqual(
|
134 | new TaskAbortError(listenerCancelled)
|
135 | )
|
136 | })
|
137 |
|
138 | it('synchronously throws TypeError error if the provided executor is not a function', () => {
|
139 | const invalidExecutors = [null, {}, undefined, 1]
|
140 |
|
141 | startListening({
|
142 | predicate: () => true,
|
143 | effect: async (_, listenerApi) => {
|
144 | invalidExecutors.forEach((invalidExecutor) => {
|
145 | let caughtError
|
146 | try {
|
147 | listenerApi.fork(invalidExecutor as any)
|
148 | } catch (err) {
|
149 | caughtError = err
|
150 | }
|
151 |
|
152 | expect(caughtError).toBeInstanceOf(TypeError)
|
153 | })
|
154 | },
|
155 | })
|
156 |
|
157 | store.dispatch(increment())
|
158 |
|
159 | expect.assertions(invalidExecutors.length)
|
160 | })
|
161 |
|
162 | it('does not run an executor if the task is synchronously cancelled', async () => {
|
163 | const storeStateAfter = deferred()
|
164 |
|
165 | startListening({
|
166 | actionCreator: increment,
|
167 | effect: async (action, listenerApi) => {
|
168 | const forkedTask = listenerApi.fork(() => {
|
169 | listenerApi.dispatch(decrement())
|
170 | listenerApi.dispatch(decrement())
|
171 | listenerApi.dispatch(decrement())
|
172 | })
|
173 | forkedTask.cancel()
|
174 |
|
175 | const result = await forkedTask.result
|
176 | storeStateAfter.resolve(listenerApi.getState())
|
177 | },
|
178 | })
|
179 | store.dispatch(increment())
|
180 |
|
181 | expect(storeStateAfter).resolves.toEqual({ value: 1 })
|
182 | })
|
183 |
|
184 | it.each<{
|
185 | desc: string
|
186 | executor: ForkedTaskExecutor<any>
|
187 | cancelAfterMs?: number
|
188 | expected: TaskResult<any>
|
189 | }>([
|
190 | {
|
191 | desc: 'sync exec - success',
|
192 | executor: () => 42,
|
193 | expected: { status: 'ok', value: 42 },
|
194 | },
|
195 | {
|
196 | desc: 'sync exec - error',
|
197 | executor: () => {
|
198 | throw new Error('2020')
|
199 | },
|
200 | expected: { status: 'rejected', error: new Error('2020') },
|
201 | },
|
202 | {
|
203 | desc: 'sync exec - sync cancel',
|
204 | executor: () => 42,
|
205 | cancelAfterMs: -1,
|
206 | expected: {
|
207 | status: 'cancelled',
|
208 | error: new TaskAbortError(taskCancelled),
|
209 | },
|
210 | },
|
211 | {
|
212 | desc: 'sync exec - async cancel',
|
213 | executor: () => 42,
|
214 | cancelAfterMs: 0,
|
215 | expected: { status: 'ok', value: 42 },
|
216 | },
|
217 | {
|
218 | desc: 'async exec - async cancel',
|
219 | executor: async (forkApi) => {
|
220 | await forkApi.delay(100)
|
221 | throw new Error('2020')
|
222 | },
|
223 | cancelAfterMs: 10,
|
224 | expected: {
|
225 | status: 'cancelled',
|
226 | error: new TaskAbortError(taskCancelled),
|
227 | },
|
228 | },
|
229 | {
|
230 | desc: 'async exec - success',
|
231 | executor: async () => {
|
232 | await delay(20)
|
233 | return Promise.resolve(21)
|
234 | },
|
235 | expected: { status: 'ok', value: 21 },
|
236 | },
|
237 | {
|
238 | desc: 'async exec - error',
|
239 | executor: async () => {
|
240 | await Promise.resolve()
|
241 | throw new Error('2020')
|
242 | },
|
243 | expected: { status: 'rejected', error: new Error('2020') },
|
244 | },
|
245 | {
|
246 | desc: 'async exec - success with forkApi.pause',
|
247 | executor: async (forkApi) => {
|
248 | return forkApi.pause(Promise.resolve(2))
|
249 | },
|
250 | expected: { status: 'ok', value: 2 },
|
251 | },
|
252 | {
|
253 | desc: 'async exec - error with forkApi.pause',
|
254 | executor: async (forkApi) => {
|
255 | return forkApi.pause(Promise.reject(22))
|
256 | },
|
257 | expected: { status: 'rejected', error: 22 },
|
258 | },
|
259 | {
|
260 | desc: 'async exec - success with forkApi.delay',
|
261 | executor: async (forkApi) => {
|
262 | await forkApi.delay(10)
|
263 | return 5
|
264 | },
|
265 | expected: { status: 'ok', value: 5 },
|
266 | },
|
267 | ])('%# - %j', async ({ executor, expected, cancelAfterMs }) => {
|
268 | let deferredResult = deferred()
|
269 | let forkedTask: any = {}
|
270 |
|
271 | startListening({
|
272 | predicate: () => true,
|
273 | effect: async (_, listenerApi) => {
|
274 | forkedTask = listenerApi.fork(executor)
|
275 |
|
276 | deferredResult.resolve(await forkedTask.result)
|
277 | },
|
278 | })
|
279 |
|
280 | store.dispatch({ type: '' })
|
281 |
|
282 | if (typeof cancelAfterMs === 'number') {
|
283 | if (cancelAfterMs < 0) {
|
284 | forkedTask.cancel()
|
285 | } else {
|
286 | await delay(cancelAfterMs)
|
287 | forkedTask.cancel()
|
288 | }
|
289 | }
|
290 |
|
291 | const result = await deferredResult
|
292 |
|
293 | expect(result).toEqual(expected)
|
294 | })
|
295 |
|
296 | describe('forkAPI', () => {
|
297 | test('forkApi.delay rejects as soon as the task is cancelled', async () => {
|
298 | let deferredResult = deferred()
|
299 |
|
300 | startListening({
|
301 | actionCreator: increment,
|
302 | effect: async (_, listenerApi) => {
|
303 | const forkedTask = listenerApi.fork(async (forkApi) => {
|
304 | await forkApi.delay(100)
|
305 |
|
306 | return 4
|
307 | })
|
308 |
|
309 | await listenerApi.delay(10)
|
310 | forkedTask.cancel()
|
311 | deferredResult.resolve(await forkedTask.result)
|
312 | },
|
313 | })
|
314 |
|
315 | store.dispatch(increment())
|
316 |
|
317 | expect(await deferredResult).toEqual({
|
318 | status: 'cancelled',
|
319 | error: new TaskAbortError(taskCancelled),
|
320 | })
|
321 | })
|
322 |
|
323 | test('forkApi.delay rejects as soon as the parent listener is cancelled', async () => {
|
324 | let deferredResult = deferred()
|
325 |
|
326 | startListening({
|
327 | actionCreator: increment,
|
328 | effect: async (_, listenerApi) => {
|
329 | listenerApi.cancelActiveListeners()
|
330 | await listenerApi.fork(async (forkApi) => {
|
331 | await forkApi
|
332 | .delay(100)
|
333 | .then(deferredResult.resolve, deferredResult.resolve)
|
334 |
|
335 | return 4
|
336 | }).result
|
337 |
|
338 | deferredResult.resolve(new Error('unreachable'))
|
339 | },
|
340 | })
|
341 |
|
342 | store.dispatch(increment())
|
343 |
|
344 | await Promise.resolve()
|
345 |
|
346 | store.dispatch(increment())
|
347 | expect(await deferredResult).toEqual(
|
348 | new TaskAbortError(listenerCancelled)
|
349 | )
|
350 | })
|
351 |
|
352 | test('forkApi.signal listener is invoked as soon as the parent listener is cancelled or completed', async () => {
|
353 | let deferredResult = deferred()
|
354 |
|
355 | startListening({
|
356 | actionCreator: increment,
|
357 | async effect(_, listenerApi) {
|
358 | const wronglyDoNotAwaitResultOfTask = listenerApi.fork(
|
359 | async (forkApi) => {
|
360 | forkApi.signal.addEventListener('abort', () => {
|
361 | deferredResult.resolve(
|
362 | (forkApi.signal as AbortSignalWithReason<unknown>).reason
|
363 | )
|
364 | })
|
365 | }
|
366 | )
|
367 | },
|
368 | })
|
369 |
|
370 | store.dispatch(increment)
|
371 |
|
372 | expect(await deferredResult).toBe(listenerCompleted)
|
373 | })
|
374 |
|
375 | test('fork.delay does not trigger unhandledRejections for completed or cancelled tasks', async () => {
|
376 | let deferredCompletedEvt = deferred()
|
377 | let deferredCancelledEvt = deferred()
|
378 |
|
379 |
|
380 |
|
381 | startListening({
|
382 | actionCreator: increment,
|
383 | effect: async (_, listenerApi) => {
|
384 | const completedTask = listenerApi.fork(async (forkApi) => {
|
385 | forkApi.signal.addEventListener(
|
386 | 'abort',
|
387 | deferredCompletedEvt.resolve,
|
388 | { once: true }
|
389 | )
|
390 | forkApi.delay(100)
|
391 |
|
392 | return 4
|
393 | })
|
394 |
|
395 | deferredCompletedEvt.resolve(await completedTask.result)
|
396 |
|
397 | const godotPauseTrigger = deferred()
|
398 |
|
399 | const cancelledTask = listenerApi.fork(async (forkApi) => {
|
400 | forkApi.signal.addEventListener(
|
401 | 'abort',
|
402 | deferredCompletedEvt.resolve,
|
403 | { once: true }
|
404 | )
|
405 | forkApi.delay(1_000)
|
406 | await forkApi.pause(godotPauseTrigger)
|
407 | return 4
|
408 | })
|
409 |
|
410 | await Promise.resolve()
|
411 | cancelledTask.cancel()
|
412 | deferredCancelledEvt.resolve(await cancelledTask.result)
|
413 | },
|
414 | })
|
415 |
|
416 | store.dispatch(increment())
|
417 | expect(await deferredCompletedEvt).toBeDefined()
|
418 | expect(await deferredCancelledEvt).toBeDefined()
|
419 | })
|
420 | })
|
421 |
|
422 | test('forkApi.pause rejects if task is cancelled', async () => {
|
423 | let deferredResult = deferred()
|
424 | startListening({
|
425 | actionCreator: increment,
|
426 | effect: async (_, listenerApi) => {
|
427 | const forkedTask = listenerApi.fork(async (forkApi) => {
|
428 | await forkApi.pause(delay(1_000))
|
429 |
|
430 | return 4
|
431 | })
|
432 |
|
433 | await Promise.resolve()
|
434 | forkedTask.cancel()
|
435 | deferredResult.resolve(await forkedTask.result)
|
436 | },
|
437 | })
|
438 |
|
439 | store.dispatch(increment())
|
440 |
|
441 | expect(await deferredResult).toEqual({
|
442 | status: 'cancelled',
|
443 | error: new TaskAbortError(taskCancelled),
|
444 | })
|
445 | })
|
446 |
|
447 | test('forkApi.pause rejects as soon as the parent listener is cancelled', async () => {
|
448 | let deferredResult = deferred()
|
449 |
|
450 | startListening({
|
451 | actionCreator: increment,
|
452 | effect: async (_, listenerApi) => {
|
453 | listenerApi.cancelActiveListeners()
|
454 | const forkedTask = listenerApi.fork(async (forkApi) => {
|
455 | await forkApi
|
456 | .pause(delay(100))
|
457 | .then(deferredResult.resolve, deferredResult.resolve)
|
458 |
|
459 | return 4
|
460 | })
|
461 |
|
462 | await forkedTask.result
|
463 | deferredResult.resolve(new Error('unreachable'))
|
464 | },
|
465 | })
|
466 |
|
467 | store.dispatch(increment())
|
468 |
|
469 | await Promise.resolve()
|
470 |
|
471 | store.dispatch(increment())
|
472 | expect(await deferredResult).toEqual(new TaskAbortError(listenerCancelled))
|
473 | })
|
474 |
|
475 | test('forkApi.pause rejects if listener is cancelled', async () => {
|
476 | const incrementByInListener = createAction<number>('incrementByInListener')
|
477 |
|
478 | startListening({
|
479 | actionCreator: incrementByInListener,
|
480 | async effect({ payload: amountToIncrement }, listenerApi) {
|
481 | listenerApi.cancelActiveListeners()
|
482 | await listenerApi.fork(async (forkApi) => {
|
483 | await forkApi.pause(delay(10))
|
484 | listenerApi.dispatch(incrementByAmount(amountToIncrement))
|
485 | }).result
|
486 | listenerApi.dispatch(incrementByAmount(2 * amountToIncrement))
|
487 | },
|
488 | })
|
489 |
|
490 | store.dispatch(incrementByInListener(10))
|
491 | store.dispatch(incrementByInListener(100))
|
492 |
|
493 | await delay(50)
|
494 |
|
495 | expect(store.getState().value).toEqual(300)
|
496 | })
|
497 | })
|