UNPKG

4.27 kBJavaScriptView Raw
1import { is, check, remove, MATCH, internalErr, SAGA_ACTION } from './utils'
2import { buffers } from './buffers'
3import { asap } from './scheduler'
4
5const CHANNEL_END_TYPE = '@@redux-saga/CHANNEL_END'
6export const END = { type: CHANNEL_END_TYPE }
7export const isEnd = a => a && a.type === CHANNEL_END_TYPE
8
9export function emitter() {
10 const subscribers = []
11
12 function subscribe(sub) {
13 subscribers.push(sub)
14 return () => remove(subscribers, sub)
15 }
16
17 function emit(item) {
18 const arr = subscribers.slice()
19 for (var i = 0, len = arr.length; i < len; i++) {
20 arr[i](item)
21 }
22 }
23
24 return {
25 subscribe,
26 emit,
27 }
28}
29
30export const INVALID_BUFFER = 'invalid buffer passed to channel factory function'
31export var UNDEFINED_INPUT_ERROR = 'Saga was provided with an undefined action'
32
33if (process.env.NODE_ENV !== 'production') {
34 UNDEFINED_INPUT_ERROR += `\nHints:
35 - check that your Action Creator returns a non-undefined value
36 - if the Saga was started using runSaga, check that your subscribe source provides the action to its listeners
37 `
38}
39
40export function channel(buffer = buffers.fixed()) {
41 let closed = false
42 let takers = []
43
44 check(buffer, is.buffer, INVALID_BUFFER)
45
46 function checkForbiddenStates() {
47 if (closed && takers.length) {
48 throw internalErr('Cannot have a closed channel with pending takers')
49 }
50 if (takers.length && !buffer.isEmpty()) {
51 throw internalErr('Cannot have pending takers with non empty buffer')
52 }
53 }
54
55 function put(input) {
56 checkForbiddenStates()
57 check(input, is.notUndef, UNDEFINED_INPUT_ERROR)
58 if (closed) {
59 return
60 }
61 if (!takers.length) {
62 return buffer.put(input)
63 }
64 for (var i = 0; i < takers.length; i++) {
65 const cb = takers[i]
66 if (!cb[MATCH] || cb[MATCH](input)) {
67 takers.splice(i, 1)
68 return cb(input)
69 }
70 }
71 }
72
73 function take(cb) {
74 checkForbiddenStates()
75 check(cb, is.func, "channel.take's callback must be a function")
76
77 if (closed && buffer.isEmpty()) {
78 cb(END)
79 } else if (!buffer.isEmpty()) {
80 cb(buffer.take())
81 } else {
82 takers.push(cb)
83 cb.cancel = () => remove(takers, cb)
84 }
85 }
86
87 function flush(cb) {
88 checkForbiddenStates() // TODO: check if some new state should be forbidden now
89 check(cb, is.func, "channel.flush' callback must be a function")
90 if (closed && buffer.isEmpty()) {
91 cb(END)
92 return
93 }
94 cb(buffer.flush())
95 }
96
97 function close() {
98 checkForbiddenStates()
99 if (!closed) {
100 closed = true
101 if (takers.length) {
102 const arr = takers
103 takers = []
104 for (let i = 0, len = arr.length; i < len; i++) {
105 arr[i](END)
106 }
107 }
108 }
109 }
110
111 return {
112 take,
113 put,
114 flush,
115 close,
116 get __takers__() {
117 return takers
118 },
119 get __closed__() {
120 return closed
121 },
122 }
123}
124
125export function eventChannel(subscribe, buffer = buffers.none(), matcher) {
126 /**
127 should be if(typeof matcher !== undefined) instead?
128 see PR #273 for a background discussion
129 **/
130 if (arguments.length > 2) {
131 check(matcher, is.func, 'Invalid match function passed to eventChannel')
132 }
133
134 const chan = channel(buffer)
135 const close = () => {
136 if (!chan.__closed__) {
137 if (unsubscribe) {
138 unsubscribe()
139 }
140 chan.close()
141 }
142 }
143 const unsubscribe = subscribe(input => {
144 if (isEnd(input)) {
145 close()
146 return
147 }
148 if (matcher && !matcher(input)) {
149 return
150 }
151 chan.put(input)
152 })
153 if (chan.__closed__) {
154 unsubscribe()
155 }
156
157 if (!is.func(unsubscribe)) {
158 throw new Error('in eventChannel: subscribe should return a function to unsubscribe')
159 }
160
161 return {
162 take: chan.take,
163 flush: chan.flush,
164 close,
165 }
166}
167
168export function stdChannel(subscribe) {
169 const chan = eventChannel(cb =>
170 subscribe(input => {
171 if (input[SAGA_ACTION]) {
172 cb(input)
173 return
174 }
175 asap(() => cb(input))
176 }),
177 )
178
179 return {
180 ...chan,
181 take(cb, matcher) {
182 if (arguments.length > 1) {
183 check(matcher, is.func, "channel.take's matcher argument must be a function")
184 cb[MATCH] = matcher
185 }
186 chan.take(cb)
187 },
188 }
189}