1 | import { is, check, remove, MATCH, internalErr, SAGA_ACTION } from './utils'
|
2 | import { buffers } from './buffers'
|
3 | import { asap } from './scheduler'
|
4 |
|
5 | const CHANNEL_END_TYPE = '@@redux-saga/CHANNEL_END'
|
6 | export const END = { type: CHANNEL_END_TYPE }
|
7 | export const isEnd = a => a && a.type === CHANNEL_END_TYPE
|
8 |
|
9 | export function emitter() {
|
10 | const subscribers = []
|
11 |
|
12 | function subscribe(sub) {
|
13 | subscribers.push(sub)
|
14 | return () => remove(subscribers, sub)
|
15 | }
|
16 |
|
17 | function emit(item) {
|
18 | const arr = subscribers.slice()
|
19 | for (var i = 0, len = arr.length; i < len; i++) {
|
20 | arr[i](item)
|
21 | }
|
22 | }
|
23 |
|
24 | return {
|
25 | subscribe,
|
26 | emit,
|
27 | }
|
28 | }
|
29 |
|
30 | export const INVALID_BUFFER = 'invalid buffer passed to channel factory function'
|
31 | export var UNDEFINED_INPUT_ERROR = 'Saga was provided with an undefined action'
|
32 |
|
33 | if (process.env.NODE_ENV !== 'production') {
|
34 | UNDEFINED_INPUT_ERROR += `\nHints:
|
35 | - check that your Action Creator returns a non-undefined value
|
36 | - if the Saga was started using runSaga, check that your subscribe source provides the action to its listeners
|
37 | `
|
38 | }
|
39 |
|
40 | export function channel(buffer = buffers.fixed()) {
|
41 | let closed = false
|
42 | let takers = []
|
43 |
|
44 | check(buffer, is.buffer, INVALID_BUFFER)
|
45 |
|
46 | function checkForbiddenStates() {
|
47 | if (closed && takers.length) {
|
48 | throw internalErr('Cannot have a closed channel with pending takers')
|
49 | }
|
50 | if (takers.length && !buffer.isEmpty()) {
|
51 | throw internalErr('Cannot have pending takers with non empty buffer')
|
52 | }
|
53 | }
|
54 |
|
55 | function put(input) {
|
56 | checkForbiddenStates()
|
57 | check(input, is.notUndef, UNDEFINED_INPUT_ERROR)
|
58 | if (closed) {
|
59 | return
|
60 | }
|
61 | if (!takers.length) {
|
62 | return buffer.put(input)
|
63 | }
|
64 | for (var i = 0; i < takers.length; i++) {
|
65 | const cb = takers[i]
|
66 | if (!cb[MATCH] || cb[MATCH](input)) {
|
67 | takers.splice(i, 1)
|
68 | return cb(input)
|
69 | }
|
70 | }
|
71 | }
|
72 |
|
73 | function take(cb) {
|
74 | checkForbiddenStates()
|
75 | check(cb, is.func, "channel.take's callback must be a function")
|
76 |
|
77 | if (closed && buffer.isEmpty()) {
|
78 | cb(END)
|
79 | } else if (!buffer.isEmpty()) {
|
80 | cb(buffer.take())
|
81 | } else {
|
82 | takers.push(cb)
|
83 | cb.cancel = () => remove(takers, cb)
|
84 | }
|
85 | }
|
86 |
|
87 | function flush(cb) {
|
88 | checkForbiddenStates()
|
89 | check(cb, is.func, "channel.flush' callback must be a function")
|
90 | if (closed && buffer.isEmpty()) {
|
91 | cb(END)
|
92 | return
|
93 | }
|
94 | cb(buffer.flush())
|
95 | }
|
96 |
|
97 | function close() {
|
98 | checkForbiddenStates()
|
99 | if (!closed) {
|
100 | closed = true
|
101 | if (takers.length) {
|
102 | const arr = takers
|
103 | takers = []
|
104 | for (let i = 0, len = arr.length; i < len; i++) {
|
105 | arr[i](END)
|
106 | }
|
107 | }
|
108 | }
|
109 | }
|
110 |
|
111 | return {
|
112 | take,
|
113 | put,
|
114 | flush,
|
115 | close,
|
116 | get __takers__() {
|
117 | return takers
|
118 | },
|
119 | get __closed__() {
|
120 | return closed
|
121 | },
|
122 | }
|
123 | }
|
124 |
|
125 | export function eventChannel(subscribe, buffer = buffers.none(), matcher) {
|
126 | |
127 |
|
128 |
|
129 |
|
130 | if (arguments.length > 2) {
|
131 | check(matcher, is.func, 'Invalid match function passed to eventChannel')
|
132 | }
|
133 |
|
134 | const chan = channel(buffer)
|
135 | const close = () => {
|
136 | if (!chan.__closed__) {
|
137 | if (unsubscribe) {
|
138 | unsubscribe()
|
139 | }
|
140 | chan.close()
|
141 | }
|
142 | }
|
143 | const unsubscribe = subscribe(input => {
|
144 | if (isEnd(input)) {
|
145 | close()
|
146 | return
|
147 | }
|
148 | if (matcher && !matcher(input)) {
|
149 | return
|
150 | }
|
151 | chan.put(input)
|
152 | })
|
153 | if (chan.__closed__) {
|
154 | unsubscribe()
|
155 | }
|
156 |
|
157 | if (!is.func(unsubscribe)) {
|
158 | throw new Error('in eventChannel: subscribe should return a function to unsubscribe')
|
159 | }
|
160 |
|
161 | return {
|
162 | take: chan.take,
|
163 | flush: chan.flush,
|
164 | close,
|
165 | }
|
166 | }
|
167 |
|
168 | export function stdChannel(subscribe) {
|
169 | const chan = eventChannel(cb =>
|
170 | subscribe(input => {
|
171 | if (input[SAGA_ACTION]) {
|
172 | cb(input)
|
173 | return
|
174 | }
|
175 | asap(() => cb(input))
|
176 | }),
|
177 | )
|
178 |
|
179 | return {
|
180 | ...chan,
|
181 | take(cb, matcher) {
|
182 | if (arguments.length > 1) {
|
183 | check(matcher, is.func, "channel.take's matcher argument must be a function")
|
184 | cb[MATCH] = matcher
|
185 | }
|
186 | chan.take(cb)
|
187 | },
|
188 | }
|
189 | }
|