1 | import {
|
2 | noop,
|
3 | kTrue,
|
4 | is,
|
5 | log as _log,
|
6 | check,
|
7 | deferred,
|
8 | uid as nextEffectId,
|
9 | array,
|
10 | remove,
|
11 | object,
|
12 | TASK,
|
13 | CANCEL,
|
14 | SELF_CANCELLATION,
|
15 | makeIterator,
|
16 | createSetContextWarning,
|
17 | deprecate,
|
18 | updateIncentive,
|
19 | } from './utils'
|
20 | import { asap, suspend, flush } from './scheduler'
|
21 | import { asEffect } from './io'
|
22 | import { stdChannel as _stdChannel, eventChannel, isEnd } from './channel'
|
23 | import { buffers } from './buffers'
|
24 |
|
25 | export const NOT_ITERATOR_ERROR = 'proc first argument (Saga function result) must be an iterator'
|
26 |
|
27 | export const CHANNEL_END = {
|
28 | toString() {
|
29 | return '@@redux-saga/CHANNEL_END'
|
30 | },
|
31 | }
|
32 | export const TASK_CANCEL = {
|
33 | toString() {
|
34 | return '@@redux-saga/TASK_CANCEL'
|
35 | },
|
36 | }
|
37 |
|
38 | const matchers = {
|
39 | wildcard: () => kTrue,
|
40 | default: pattern =>
|
41 | typeof pattern === 'symbol' ? input => input.type === pattern : input => input.type === String(pattern),
|
42 | array: patterns => input => patterns.some(p => matcher(p)(input)),
|
43 | predicate: predicate => input => predicate(input),
|
44 | }
|
45 |
|
46 | function matcher(pattern) {
|
47 |
|
48 | return (
|
49 | pattern === '*' ? matchers.wildcard
|
50 | : is.array(pattern) ? matchers.array
|
51 | : is.stringableFunc(pattern) ? matchers.default
|
52 | : is.func(pattern) ? matchers.predicate
|
53 | : matchers.default
|
54 | )(pattern)
|
55 | }
|
56 |
|
57 |
|
58 |
|
59 |
|
60 |
|
61 |
|
62 |
|
63 |
|
64 |
|
65 |
|
66 |
|
67 |
|
68 |
|
69 |
|
70 |
|
71 |
|
72 | function forkQueue(name, mainTask, cb) {
|
73 | let tasks = [],
|
74 | result,
|
75 | completed = false
|
76 | addTask(mainTask)
|
77 |
|
78 | function abort(err) {
|
79 | cancelAll()
|
80 | cb(err, true)
|
81 | }
|
82 |
|
83 | function addTask(task) {
|
84 | tasks.push(task)
|
85 | task.cont = (res, isErr) => {
|
86 | if (completed) {
|
87 | return
|
88 | }
|
89 |
|
90 | remove(tasks, task)
|
91 | task.cont = noop
|
92 | if (isErr) {
|
93 | abort(res)
|
94 | } else {
|
95 | if (task === mainTask) {
|
96 | result = res
|
97 | }
|
98 | if (!tasks.length) {
|
99 | completed = true
|
100 | cb(result)
|
101 | }
|
102 | }
|
103 | }
|
104 |
|
105 | }
|
106 |
|
107 | function cancelAll() {
|
108 | if (completed) {
|
109 | return
|
110 | }
|
111 | completed = true
|
112 | tasks.forEach(t => {
|
113 | t.cont = noop
|
114 | t.cancel()
|
115 | })
|
116 | tasks = []
|
117 | }
|
118 |
|
119 | return {
|
120 | addTask,
|
121 | cancelAll,
|
122 | abort,
|
123 | getTasks: () => tasks,
|
124 | taskNames: () => tasks.map(t => t.name),
|
125 | }
|
126 | }
|
127 |
|
128 | function createTaskIterator({ context, fn, args }) {
|
129 | if (is.iterator(fn)) {
|
130 | return fn
|
131 | }
|
132 |
|
133 |
|
134 | let result, error
|
135 | try {
|
136 | result = fn.apply(context, args)
|
137 | } catch (err) {
|
138 | error = err
|
139 | }
|
140 |
|
141 |
|
142 | if (is.iterator(result)) {
|
143 | return result
|
144 | }
|
145 |
|
146 |
|
147 |
|
148 | return error
|
149 | ? makeIterator(() => {
|
150 | throw error
|
151 | })
|
152 | : makeIterator(
|
153 | (function() {
|
154 | let pc
|
155 | const eff = { done: false, value: result }
|
156 | const ret = value => ({ done: true, value })
|
157 | return arg => {
|
158 | if (!pc) {
|
159 | pc = true
|
160 | return eff
|
161 | } else {
|
162 | return ret(arg)
|
163 | }
|
164 | }
|
165 | })(),
|
166 | )
|
167 | }
|
168 |
|
169 | const wrapHelper = helper => ({ fn: helper })
|
170 |
|
171 | export default function proc(
|
172 | iterator,
|
173 | subscribe = () => noop,
|
174 | dispatch = noop,
|
175 | getState = noop,
|
176 | parentContext = {},
|
177 | options = {},
|
178 | parentEffectId = 0,
|
179 | name = 'anonymous',
|
180 | cont,
|
181 | ) {
|
182 | check(iterator, is.iterator, NOT_ITERATOR_ERROR)
|
183 |
|
184 | const effectsString = '[...effects]'
|
185 | const runParallelEffect = deprecate(runAllEffect, updateIncentive(effectsString, `all(${effectsString})`))
|
186 |
|
187 | const { sagaMonitor, logger, onError } = options
|
188 | const log = logger || _log
|
189 | const stdChannel = _stdChannel(subscribe)
|
190 | const taskContext = Object.create(parentContext)
|
191 | |
192 |
|
193 |
|
194 |
|
195 |
|
196 | next.cancel = noop
|
197 |
|
198 | |
199 |
|
200 |
|
201 |
|
202 | const task = newTask(parentEffectId, name, iterator, cont)
|
203 | const mainTask = { name, cancel: cancelMain, isRunning: true }
|
204 | const taskQueue = forkQueue(name, mainTask, end)
|
205 |
|
206 | |
207 |
|
208 |
|
209 | function cancelMain() {
|
210 | if (mainTask.isRunning && !mainTask.isCancelled) {
|
211 | mainTask.isCancelled = true
|
212 | next(TASK_CANCEL)
|
213 | }
|
214 | }
|
215 |
|
216 | |
217 |
|
218 |
|
219 |
|
220 |
|
221 |
|
222 |
|
223 |
|
224 |
|
225 | function cancel() {
|
226 | |
227 |
|
228 |
|
229 |
|
230 | if (iterator._isRunning && !iterator._isCancelled) {
|
231 | iterator._isCancelled = true
|
232 | taskQueue.cancelAll()
|
233 | |
234 |
|
235 |
|
236 | end(TASK_CANCEL)
|
237 | }
|
238 | }
|
239 | |
240 |
|
241 |
|
242 |
|
243 | cont && (cont.cancel = cancel)
|
244 |
|
245 |
|
246 | iterator._isRunning = true
|
247 |
|
248 |
|
249 | next()
|
250 |
|
251 |
|
252 | return task
|
253 |
|
254 | |
255 |
|
256 |
|
257 |
|
258 |
|
259 | function next(arg, isErr) {
|
260 |
|
261 | if (!mainTask.isRunning) {
|
262 | throw new Error('Trying to resume an already finished generator')
|
263 | }
|
264 |
|
265 | try {
|
266 | let result
|
267 | if (isErr) {
|
268 | result = iterator.throw(arg)
|
269 | } else if (arg === TASK_CANCEL) {
|
270 | |
271 |
|
272 |
|
273 |
|
274 |
|
275 |
|
276 |
|
277 | mainTask.isCancelled = true
|
278 | |
279 |
|
280 |
|
281 | next.cancel()
|
282 | |
283 |
|
284 |
|
285 |
|
286 | result = is.func(iterator.return) ? iterator.return(TASK_CANCEL) : { done: true, value: TASK_CANCEL }
|
287 | } else if (arg === CHANNEL_END) {
|
288 |
|
289 | result = is.func(iterator.return) ? iterator.return() : { done: true }
|
290 | } else {
|
291 | result = iterator.next(arg)
|
292 | }
|
293 |
|
294 | if (!result.done) {
|
295 | runEffect(result.value, parentEffectId, '', next)
|
296 | } else {
|
297 | |
298 |
|
299 |
|
300 | mainTask.isMainRunning = false
|
301 | mainTask.cont && mainTask.cont(result.value)
|
302 | }
|
303 | } catch (error) {
|
304 | if (mainTask.isCancelled) {
|
305 | log('error', `uncaught at ${name}`, error.message)
|
306 | }
|
307 | mainTask.isMainRunning = false
|
308 | mainTask.cont(error, true)
|
309 | }
|
310 | }
|
311 |
|
312 | function end(result, isErr) {
|
313 | iterator._isRunning = false
|
314 | stdChannel.close()
|
315 | if (!isErr) {
|
316 | if (process.env.NODE_ENV === 'development' && result === TASK_CANCEL) {
|
317 | log('info', `${name} has been cancelled`, '')
|
318 | }
|
319 | iterator._result = result
|
320 | iterator._deferredEnd && iterator._deferredEnd.resolve(result)
|
321 | } else {
|
322 | if (result instanceof Error) {
|
323 | result.sagaStack = `at ${name} \n ${result.sagaStack || result.stack}`
|
324 | }
|
325 | if (!task.cont) {
|
326 | log('error', `uncaught`, result.sagaStack || result.stack)
|
327 | if (result instanceof Error && onError) {
|
328 | onError(result)
|
329 | }
|
330 | }
|
331 | iterator._error = result
|
332 | iterator._isAborted = true
|
333 | iterator._deferredEnd && iterator._deferredEnd.reject(result)
|
334 | }
|
335 | task.cont && task.cont(result, isErr)
|
336 | task.joiners.forEach(j => j.cb(result, isErr))
|
337 | task.joiners = null
|
338 | }
|
339 |
|
340 | function runEffect(effect, parentEffectId, label = '', cb) {
|
341 | const effectId = nextEffectId()
|
342 | sagaMonitor && sagaMonitor.effectTriggered({ effectId, parentEffectId, label, effect })
|
343 |
|
344 | |
345 |
|
346 |
|
347 |
|
348 |
|
349 | let effectSettled
|
350 |
|
351 |
|
352 | function currCb(res, isErr) {
|
353 | if (effectSettled) {
|
354 | return
|
355 | }
|
356 |
|
357 | effectSettled = true
|
358 | cb.cancel = noop
|
359 | if (sagaMonitor) {
|
360 | isErr ? sagaMonitor.effectRejected(effectId, res) : sagaMonitor.effectResolved(effectId, res)
|
361 | }
|
362 | cb(res, isErr)
|
363 | }
|
364 |
|
365 | currCb.cancel = noop
|
366 |
|
367 |
|
368 | cb.cancel = () => {
|
369 |
|
370 | if (effectSettled) {
|
371 | return
|
372 | }
|
373 |
|
374 | effectSettled = true
|
375 | |
376 |
|
377 |
|
378 |
|
379 |
|
380 | try {
|
381 | currCb.cancel()
|
382 | } catch (err) {
|
383 | log('error', `uncaught at ${name}`, err.message)
|
384 | }
|
385 | currCb.cancel = noop
|
386 |
|
387 | sagaMonitor && sagaMonitor.effectCancelled(effectId)
|
388 | }
|
389 |
|
390 | |
391 |
|
392 |
|
393 |
|
394 |
|
395 |
|
396 |
|
397 |
|
398 |
|
399 |
|
400 |
|
401 |
|
402 |
|
403 |
|
404 |
|
405 | let data
|
406 |
|
407 | return (
|
408 |
|
409 | is.promise(effect) ? resolvePromise(effect, currCb)
|
410 | : is.helper(effect) ? runForkEffect(wrapHelper(effect), effectId, currCb)
|
411 | : is.iterator(effect) ? resolveIterator(effect, effectId, name, currCb)
|
412 |
|
413 |
|
414 | : is.array(effect) ? runParallelEffect(effect, effectId, currCb)
|
415 | : (data = asEffect.take(effect)) ? runTakeEffect(data, currCb)
|
416 | : (data = asEffect.put(effect)) ? runPutEffect(data, currCb)
|
417 | : (data = asEffect.all(effect)) ? runAllEffect(data, effectId, currCb)
|
418 | : (data = asEffect.race(effect)) ? runRaceEffect(data, effectId, currCb)
|
419 | : (data = asEffect.call(effect)) ? runCallEffect(data, effectId, currCb)
|
420 | : (data = asEffect.cps(effect)) ? runCPSEffect(data, currCb)
|
421 | : (data = asEffect.fork(effect)) ? runForkEffect(data, effectId, currCb)
|
422 | : (data = asEffect.join(effect)) ? runJoinEffect(data, currCb)
|
423 | : (data = asEffect.cancel(effect)) ? runCancelEffect(data, currCb)
|
424 | : (data = asEffect.select(effect)) ? runSelectEffect(data, currCb)
|
425 | : (data = asEffect.actionChannel(effect)) ? runChannelEffect(data, currCb)
|
426 | : (data = asEffect.flush(effect)) ? runFlushEffect(data, currCb)
|
427 | : (data = asEffect.cancelled(effect)) ? runCancelledEffect(data, currCb)
|
428 | : (data = asEffect.getContext(effect)) ? runGetContextEffect(data, currCb)
|
429 | : (data = asEffect.setContext(effect)) ? runSetContextEffect(data, currCb)
|
430 | : currCb(effect)
|
431 | )
|
432 | }
|
433 |
|
434 | function resolvePromise(promise, cb) {
|
435 | const cancelPromise = promise[CANCEL]
|
436 | if (is.func(cancelPromise)) {
|
437 | cb.cancel = cancelPromise
|
438 | } else if (is.func(promise.abort)) {
|
439 | cb.cancel = () => promise.abort()
|
440 |
|
441 |
|
442 | }
|
443 | promise.then(cb, error => cb(error, true))
|
444 | }
|
445 |
|
446 | function resolveIterator(iterator, effectId, name, cb) {
|
447 | proc(iterator, subscribe, dispatch, getState, taskContext, options, effectId, name, cb)
|
448 | }
|
449 |
|
450 | function runTakeEffect({ channel, pattern, maybe }, cb) {
|
451 | channel = channel || stdChannel
|
452 | const takeCb = inp => (inp instanceof Error ? cb(inp, true) : isEnd(inp) && !maybe ? cb(CHANNEL_END) : cb(inp))
|
453 | try {
|
454 | channel.take(takeCb, matcher(pattern))
|
455 | } catch (err) {
|
456 | return cb(err, true)
|
457 | }
|
458 | cb.cancel = takeCb.cancel
|
459 | }
|
460 |
|
461 | function runPutEffect({ channel, action, resolve }, cb) {
|
462 | |
463 |
|
464 |
|
465 |
|
466 |
|
467 | asap(() => {
|
468 | let result
|
469 | try {
|
470 | result = (channel ? channel.put : dispatch)(action)
|
471 | } catch (error) {
|
472 |
|
473 | if (channel || resolve) return cb(error, true)
|
474 | log('error', `uncaught at ${name}`, error.stack || error.message || error)
|
475 | }
|
476 |
|
477 | if (resolve && is.promise(result)) {
|
478 | resolvePromise(result, cb)
|
479 | } else {
|
480 | return cb(result)
|
481 | }
|
482 | })
|
483 |
|
484 | }
|
485 |
|
486 | function runCallEffect({ context, fn, args }, effectId, cb) {
|
487 | let result
|
488 |
|
489 | try {
|
490 | result = fn.apply(context, args)
|
491 | } catch (error) {
|
492 | return cb(error, true)
|
493 | }
|
494 | return is.promise(result)
|
495 | ? resolvePromise(result, cb)
|
496 | : is.iterator(result) ? resolveIterator(result, effectId, fn.name, cb) : cb(result)
|
497 | }
|
498 |
|
499 | function runCPSEffect({ context, fn, args }, cb) {
|
500 |
|
501 |
|
502 |
|
503 |
|
504 | try {
|
505 | const cpsCb = (err, res) => (is.undef(err) ? cb(res) : cb(err, true))
|
506 | fn.apply(context, args.concat(cpsCb))
|
507 | if (cpsCb.cancel) {
|
508 | cb.cancel = () => cpsCb.cancel()
|
509 | }
|
510 | } catch (error) {
|
511 | return cb(error, true)
|
512 | }
|
513 | }
|
514 |
|
515 | function runForkEffect({ context, fn, args, detached }, effectId, cb) {
|
516 | const taskIterator = createTaskIterator({ context, fn, args })
|
517 |
|
518 | try {
|
519 | suspend()
|
520 | const task = proc(
|
521 | taskIterator,
|
522 | subscribe,
|
523 | dispatch,
|
524 | getState,
|
525 | taskContext,
|
526 | options,
|
527 | effectId,
|
528 | fn.name,
|
529 | detached ? null : noop,
|
530 | )
|
531 |
|
532 | if (detached) {
|
533 | cb(task)
|
534 | } else {
|
535 | if (taskIterator._isRunning) {
|
536 | taskQueue.addTask(task)
|
537 | cb(task)
|
538 | } else if (taskIterator._error) {
|
539 | taskQueue.abort(taskIterator._error)
|
540 | } else {
|
541 | cb(task)
|
542 | }
|
543 | }
|
544 | } finally {
|
545 | flush()
|
546 | }
|
547 |
|
548 | }
|
549 |
|
550 | function runJoinEffect(t, cb) {
|
551 | if (t.isRunning()) {
|
552 | const joiner = { task, cb }
|
553 | cb.cancel = () => remove(t.joiners, joiner)
|
554 | t.joiners.push(joiner)
|
555 | } else {
|
556 | t.isAborted() ? cb(t.error(), true) : cb(t.result())
|
557 | }
|
558 | }
|
559 |
|
560 | function runCancelEffect(taskToCancel, cb) {
|
561 | if (taskToCancel === SELF_CANCELLATION) {
|
562 | taskToCancel = task
|
563 | }
|
564 | if (taskToCancel.isRunning()) {
|
565 | taskToCancel.cancel()
|
566 | }
|
567 | cb()
|
568 |
|
569 | }
|
570 |
|
571 | function runAllEffect(effects, effectId, cb) {
|
572 | const keys = Object.keys(effects)
|
573 |
|
574 | if (!keys.length) {
|
575 | return cb(is.array(effects) ? [] : {})
|
576 | }
|
577 |
|
578 | let completedCount = 0
|
579 | let completed
|
580 | const results = {}
|
581 | const childCbs = {}
|
582 |
|
583 | function checkEffectEnd() {
|
584 | if (completedCount === keys.length) {
|
585 | completed = true
|
586 | cb(is.array(effects) ? array.from({ ...results, length: keys.length }) : results)
|
587 | }
|
588 | }
|
589 |
|
590 | keys.forEach(key => {
|
591 | const chCbAtKey = (res, isErr) => {
|
592 | if (completed) {
|
593 | return
|
594 | }
|
595 | if (isErr || isEnd(res) || res === CHANNEL_END || res === TASK_CANCEL) {
|
596 | cb.cancel()
|
597 | cb(res, isErr)
|
598 | } else {
|
599 | results[key] = res
|
600 | completedCount++
|
601 | checkEffectEnd()
|
602 | }
|
603 | }
|
604 | chCbAtKey.cancel = noop
|
605 | childCbs[key] = chCbAtKey
|
606 | })
|
607 |
|
608 | cb.cancel = () => {
|
609 | if (!completed) {
|
610 | completed = true
|
611 | keys.forEach(key => childCbs[key].cancel())
|
612 | }
|
613 | }
|
614 |
|
615 | keys.forEach(key => runEffect(effects[key], effectId, key, childCbs[key]))
|
616 | }
|
617 |
|
618 | function runRaceEffect(effects, effectId, cb) {
|
619 | let completed
|
620 | const keys = Object.keys(effects)
|
621 | const childCbs = {}
|
622 |
|
623 | keys.forEach(key => {
|
624 | const chCbAtKey = (res, isErr) => {
|
625 | if (completed) {
|
626 | return
|
627 | }
|
628 |
|
629 | if (isErr) {
|
630 |
|
631 | cb.cancel()
|
632 | cb(res, true)
|
633 | } else if (!isEnd(res) && res !== CHANNEL_END && res !== TASK_CANCEL) {
|
634 | cb.cancel()
|
635 | completed = true
|
636 | cb({ [key]: res })
|
637 | }
|
638 | }
|
639 | chCbAtKey.cancel = noop
|
640 | childCbs[key] = chCbAtKey
|
641 | })
|
642 |
|
643 | cb.cancel = () => {
|
644 |
|
645 | if (!completed) {
|
646 | completed = true
|
647 | keys.forEach(key => childCbs[key].cancel())
|
648 | }
|
649 | }
|
650 | keys.forEach(key => {
|
651 | if (completed) {
|
652 | return
|
653 | }
|
654 | runEffect(effects[key], effectId, key, childCbs[key])
|
655 | })
|
656 | }
|
657 |
|
658 | function runSelectEffect({ selector, args }, cb) {
|
659 | try {
|
660 | const state = selector(getState(), ...args)
|
661 | cb(state)
|
662 | } catch (error) {
|
663 | cb(error, true)
|
664 | }
|
665 | }
|
666 |
|
667 | function runChannelEffect({ pattern, buffer }, cb) {
|
668 | const match = matcher(pattern)
|
669 | match.pattern = pattern
|
670 | cb(eventChannel(subscribe, buffer || buffers.fixed(), match))
|
671 | }
|
672 |
|
673 | function runCancelledEffect(data, cb) {
|
674 | cb(!!mainTask.isCancelled)
|
675 | }
|
676 |
|
677 | function runFlushEffect(channel, cb) {
|
678 | channel.flush(cb)
|
679 | }
|
680 |
|
681 | function runGetContextEffect(prop, cb) {
|
682 | cb(taskContext[prop])
|
683 | }
|
684 |
|
685 | function runSetContextEffect(props, cb) {
|
686 | object.assign(taskContext, props)
|
687 | cb()
|
688 | }
|
689 |
|
690 | function newTask(id, name, iterator, cont) {
|
691 | iterator._deferredEnd = null
|
692 | return {
|
693 | [TASK]: true,
|
694 | id,
|
695 | name,
|
696 | get done() {
|
697 | if (iterator._deferredEnd) {
|
698 | return iterator._deferredEnd.promise
|
699 | } else {
|
700 | const def = deferred()
|
701 | iterator._deferredEnd = def
|
702 | if (!iterator._isRunning) {
|
703 | iterator._error ? def.reject(iterator._error) : def.resolve(iterator._result)
|
704 | }
|
705 | return def.promise
|
706 | }
|
707 | },
|
708 | cont,
|
709 | joiners: [],
|
710 | cancel,
|
711 | isRunning: () => iterator._isRunning,
|
712 | isCancelled: () => iterator._isCancelled,
|
713 | isAborted: () => iterator._isAborted,
|
714 | result: () => iterator._result,
|
715 | error: () => iterator._error,
|
716 | setContext(props) {
|
717 | check(props, is.object, createSetContextWarning('task', props))
|
718 | object.assign(taskContext, props)
|
719 | },
|
720 | }
|
721 | }
|
722 | }
|