UNPKG

6.03 kBPlain TextView Raw
1import {
2 configureStore,
3 createAction,
4 createSlice,
5 isAnyOf,
6} from '@reduxjs/toolkit'
7
8import type { PayloadAction } from '@reduxjs/toolkit'
9
10import { createListenerMiddleware } from '../index'
11
12import type { TypedAddListener } from '../index'
13import { TaskAbortError } from '../exceptions'
14
15interface CounterState {
16 value: number
17}
18
19const counterSlice = createSlice({
20 name: 'counter',
21 initialState: { value: 0 } as CounterState,
22 reducers: {
23 increment(state) {
24 state.value += 1
25 },
26 decrement(state) {
27 state.value -= 1
28 },
29 // Use the PayloadAction type to declare the contents of `action.payload`
30 incrementByAmount: (state, action: PayloadAction<number>) => {
31 state.value += action.payload
32 },
33 },
34})
35const { increment, decrement, incrementByAmount } = counterSlice.actions
36
37describe('Saga-style Effects Scenarios', () => {
38 let listenerMiddleware = createListenerMiddleware<CounterState>()
39 let { middleware, startListening, stopListening } = listenerMiddleware
40
41 let store = configureStore({
42 reducer: counterSlice.reducer,
43 middleware: (gDM) => gDM().prepend(middleware),
44 })
45
46 const testAction1 = createAction<string>('testAction1')
47 type TestAction1 = ReturnType<typeof testAction1>
48 const testAction2 = createAction<string>('testAction2')
49 type TestAction2 = ReturnType<typeof testAction2>
50 const testAction3 = createAction<string>('testAction3')
51 type TestAction3 = ReturnType<typeof testAction3>
52
53 type RootState = ReturnType<typeof store.getState>
54
55 function delay(ms: number) {
56 return new Promise((resolve) => setTimeout(resolve, ms))
57 }
58
59 beforeEach(() => {
60 listenerMiddleware = createListenerMiddleware<CounterState>()
61 middleware = listenerMiddleware.middleware
62 startListening = listenerMiddleware.startListening
63 store = configureStore({
64 reducer: counterSlice.reducer,
65 middleware: (gDM) => gDM().prepend(middleware),
66 })
67 })
68
69 test('Long polling loop', async () => {
70 // Reimplementation of a saga-based long-polling loop that is controlled
71 // by "start/stop" actions. The infinite loop waits for a message from the
72 // server, processes it somehow, and waits for the next message.
73 // Ref: https://gist.github.com/markerikson/5203e71a69fa9dff203c9e27c3d84154
74 const eventPollingStarted = createAction('serverPolling/started')
75 const eventPollingStopped = createAction('serverPolling/stopped')
76
77 // For this example, we're going to fake up a "server event poll" async
78 // function by wrapping an event emitter so that every call returns a
79 // promise that is resolved the next time an event is emitted.
80 // This is the tiniest event emitter I could find to copy-paste in here.
81 let createNanoEvents = () => ({
82 events: {} as Record<string, any>,
83 emit(event: string, ...args: any[]) {
84 ;(this.events[event] || []).forEach((i: any) => i(...args))
85 },
86 on(event: string, cb: (...args: any[]) => void) {
87 ;(this.events[event] = this.events[event] || []).push(cb)
88 return () =>
89 (this.events[event] = (this.events[event] || []).filter(
90 (l: any) => l !== cb
91 ))
92 },
93 })
94 const emitter = createNanoEvents()
95
96 // Rig up a dummy "receive a message from the server" API we can trigger manually
97 function pollForEvent() {
98 return new Promise<{ type: string }>((resolve, reject) => {
99 const unsubscribe = emitter.on('serverEvent', (arg1: string) => {
100 unsubscribe()
101 resolve({ type: arg1 })
102 })
103 })
104 }
105
106 // Track how many times each message was processed by the loop
107 const receivedMessages = {
108 a: 0,
109 b: 0,
110 c: 0,
111 }
112
113 let pollingTaskStarted = false
114 let pollingTaskCanceled = false
115
116 startListening({
117 actionCreator: eventPollingStarted,
118 effect: async (action, listenerApi) => {
119 listenerApi.unsubscribe()
120
121 // Start a child job that will infinitely loop receiving messages
122 const pollingTask = listenerApi.fork(async (forkApi) => {
123 pollingTaskStarted = true
124 try {
125 while (true) {
126 // Cancelation-aware pause for a new server message
127 const serverEvent = await forkApi.pause(pollForEvent())
128 // Process the message. In this case, just count the times we've seen this message.
129 if (serverEvent.type in receivedMessages) {
130 receivedMessages[
131 serverEvent.type as keyof typeof receivedMessages
132 ]++
133 }
134 }
135 } catch (err) {
136 if (err instanceof TaskAbortError) {
137 pollingTaskCanceled = true
138 }
139 }
140 return 0
141 })
142
143 // Wait for the "stop polling" action
144 await listenerApi.condition(eventPollingStopped.match)
145 pollingTask.cancel()
146 },
147 })
148
149 store.dispatch(eventPollingStarted())
150 await delay(5)
151 expect(pollingTaskStarted).toBe(true)
152
153 await delay(5)
154 emitter.emit('serverEvent', 'a')
155 // Promise resolution
156 await delay(1)
157 emitter.emit('serverEvent', 'b')
158 // Promise resolution
159 await delay(1)
160
161 store.dispatch(eventPollingStopped())
162
163 // Have to break out of the event loop to let the cancelation promise
164 // kick in - emitting before this would still resolve pollForEvent()
165 await delay(1)
166 emitter.emit('serverEvent', 'c')
167
168 // A and B were processed earlier. The first C was processed because the
169 // emitter synchronously resolved the `pollForEvents` promise before
170 // the cancelation took effect, but after another pause, the
171 // cancelation kicked in and the second C is ignored.
172 expect(receivedMessages).toEqual({ a: 1, b: 1, c: 0 })
173 expect(pollingTaskCanceled).toBe(true)
174 })
175})
176
\No newline at end of file