1 | import {
|
2 | configureStore,
|
3 | createAction,
|
4 | createSlice,
|
5 | isAnyOf,
|
6 | } from '@reduxjs/toolkit'
|
7 |
|
8 | import type { PayloadAction } from '@reduxjs/toolkit'
|
9 |
|
10 | import { createListenerMiddleware } from '../index'
|
11 |
|
12 | import type { TypedAddListener } from '../index'
|
13 | import { TaskAbortError } from '../exceptions'
|
14 |
|
15 | interface CounterState {
|
16 | value: number
|
17 | }
|
18 |
|
19 | const counterSlice = createSlice({
|
20 | name: 'counter',
|
21 | initialState: { value: 0 } as CounterState,
|
22 | reducers: {
|
23 | increment(state) {
|
24 | state.value += 1
|
25 | },
|
26 | decrement(state) {
|
27 | state.value -= 1
|
28 | },
|
29 |
|
30 | incrementByAmount: (state, action: PayloadAction<number>) => {
|
31 | state.value += action.payload
|
32 | },
|
33 | },
|
34 | })
|
35 | const { increment, decrement, incrementByAmount } = counterSlice.actions
|
36 |
|
37 | describe('Saga-style Effects Scenarios', () => {
|
38 | let listenerMiddleware = createListenerMiddleware<CounterState>()
|
39 | let { middleware, startListening, stopListening } = listenerMiddleware
|
40 |
|
41 | let store = configureStore({
|
42 | reducer: counterSlice.reducer,
|
43 | middleware: (gDM) => gDM().prepend(middleware),
|
44 | })
|
45 |
|
46 | const testAction1 = createAction<string>('testAction1')
|
47 | type TestAction1 = ReturnType<typeof testAction1>
|
48 | const testAction2 = createAction<string>('testAction2')
|
49 | type TestAction2 = ReturnType<typeof testAction2>
|
50 | const testAction3 = createAction<string>('testAction3')
|
51 | type TestAction3 = ReturnType<typeof testAction3>
|
52 |
|
53 | type RootState = ReturnType<typeof store.getState>
|
54 |
|
55 | function delay(ms: number) {
|
56 | return new Promise((resolve) => setTimeout(resolve, ms))
|
57 | }
|
58 |
|
59 | beforeEach(() => {
|
60 | listenerMiddleware = createListenerMiddleware<CounterState>()
|
61 | middleware = listenerMiddleware.middleware
|
62 | startListening = listenerMiddleware.startListening
|
63 | store = configureStore({
|
64 | reducer: counterSlice.reducer,
|
65 | middleware: (gDM) => gDM().prepend(middleware),
|
66 | })
|
67 | })
|
68 |
|
69 | test('Long polling loop', async () => {
|
70 |
|
71 |
|
72 |
|
73 |
|
74 | const eventPollingStarted = createAction('serverPolling/started')
|
75 | const eventPollingStopped = createAction('serverPolling/stopped')
|
76 |
|
77 |
|
78 |
|
79 |
|
80 |
|
81 | let createNanoEvents = () => ({
|
82 | events: {} as Record<string, any>,
|
83 | emit(event: string, ...args: any[]) {
|
84 | ;(this.events[event] || []).forEach((i: any) => i(...args))
|
85 | },
|
86 | on(event: string, cb: (...args: any[]) => void) {
|
87 | ;(this.events[event] = this.events[event] || []).push(cb)
|
88 | return () =>
|
89 | (this.events[event] = (this.events[event] || []).filter(
|
90 | (l: any) => l !== cb
|
91 | ))
|
92 | },
|
93 | })
|
94 | const emitter = createNanoEvents()
|
95 |
|
96 |
|
97 | function pollForEvent() {
|
98 | return new Promise<{ type: string }>((resolve, reject) => {
|
99 | const unsubscribe = emitter.on('serverEvent', (arg1: string) => {
|
100 | unsubscribe()
|
101 | resolve({ type: arg1 })
|
102 | })
|
103 | })
|
104 | }
|
105 |
|
106 | // Track how many times each message was processed by the loop
|
107 | const receivedMessages = {
|
108 | a: 0,
|
109 | b: 0,
|
110 | c: 0,
|
111 | }
|
112 |
|
113 | let pollingTaskStarted = false
|
114 | let pollingTaskCanceled = false
|
115 |
|
116 | startListening({
|
117 | actionCreator: eventPollingStarted,
|
118 | effect: async (action, listenerApi) => {
|
119 | listenerApi.unsubscribe()
|
120 |
|
121 |
|
122 | const pollingTask = listenerApi.fork(async (forkApi) => {
|
123 | pollingTaskStarted = true
|
124 | try {
|
125 | while (true) {
|
126 |
|
127 | const serverEvent = await forkApi.pause(pollForEvent())
|
128 |
|
129 | if (serverEvent.type in receivedMessages) {
|
130 | receivedMessages[
|
131 | serverEvent.type as keyof typeof receivedMessages
|
132 | ]++
|
133 | }
|
134 | }
|
135 | } catch (err) {
|
136 | if (err instanceof TaskAbortError) {
|
137 | pollingTaskCanceled = true
|
138 | }
|
139 | }
|
140 | return 0
|
141 | })
|
142 |
|
143 |
|
144 | await listenerApi.condition(eventPollingStopped.match)
|
145 | pollingTask.cancel()
|
146 | },
|
147 | })
|
148 |
|
149 | store.dispatch(eventPollingStarted())
|
150 | await delay(5)
|
151 | expect(pollingTaskStarted).toBe(true)
|
152 |
|
153 | await delay(5)
|
154 | emitter.emit('serverEvent', 'a')
|
155 | // Promise resolution
|
156 | await delay(1)
|
157 | emitter.emit('serverEvent', 'b')
|
158 | // Promise resolution
|
159 | await delay(1)
|
160 |
|
161 | store.dispatch(eventPollingStopped())
|
162 |
|
163 | // Have to break out of the event loop to let the cancelation promise
|
164 | // kick in - emitting before this would still resolve pollForEvent()
|
165 | await delay(1)
|
166 | emitter.emit('serverEvent', 'c')
|
167 |
|
168 | // A and B were processed earlier. The first C was processed because the
|
169 | // emitter synchronously resolved the `pollForEvents` promise before
|
170 | // the cancelation took effect, but after another pause, the
|
171 | // cancelation kicked in and the second C is ignored.
|
172 | expect(receivedMessages).toEqual({ a: 1, b: 1, c: 0 })
|
173 | expect(pollingTaskCanceled).toBe(true)
|
174 | })
|
175 | })
|
176 |
|
\ | No newline at end of file |