UNPKG

21.1 kBJavaScriptView Raw
1import {
2 noop,
3 kTrue,
4 is,
5 log as _log,
6 check,
7 deferred,
8 uid as nextEffectId,
9 array,
10 remove,
11 object,
12 TASK,
13 CANCEL,
14 SELF_CANCELLATION,
15 makeIterator,
16 createSetContextWarning,
17 deprecate,
18 updateIncentive,
19} from './utils'
20import { asap, suspend, flush } from './scheduler'
21import { asEffect } from './io'
22import { stdChannel as _stdChannel, eventChannel, isEnd } from './channel'
23import { buffers } from './buffers'
24
25export const NOT_ITERATOR_ERROR = 'proc first argument (Saga function result) must be an iterator'
26
27export const CHANNEL_END = {
28 toString() {
29 return '@@redux-saga/CHANNEL_END'
30 },
31}
32export const TASK_CANCEL = {
33 toString() {
34 return '@@redux-saga/TASK_CANCEL'
35 },
36}
37
38const matchers = {
39 wildcard: () => kTrue,
40 default: pattern =>
41 typeof pattern === 'symbol' ? input => input.type === pattern : input => input.type === String(pattern),
42 array: patterns => input => patterns.some(p => matcher(p)(input)),
43 predicate: predicate => input => predicate(input),
44}
45
46function matcher(pattern) {
47 // prettier-ignore
48 return (
49 pattern === '*' ? matchers.wildcard
50 : is.array(pattern) ? matchers.array
51 : is.stringableFunc(pattern) ? matchers.default
52 : is.func(pattern) ? matchers.predicate
53 : matchers.default
54 )(pattern)
55}
56
57/**
58 Used to track a parent task and its forks
59 In the new fork model, forked tasks are attached by default to their parent
60 We model this using the concept of Parent task && main Task
61 main task is the main flow of the current Generator, the parent tasks is the
62 aggregation of the main tasks + all its forked tasks.
63 Thus the whole model represents an execution tree with multiple branches (vs the
64 linear execution tree in sequential (non parallel) programming)
65
66 A parent tasks has the following semantics
67 - It completes if all its forks either complete or all cancelled
68 - If it's cancelled, all forks are cancelled as well
69 - It aborts if any uncaught error bubbles up from forks
70 - If it completes, the return value is the one returned by the main task
71**/
72function forkQueue(name, mainTask, cb) {
73 let tasks = [],
74 result,
75 completed = false
76 addTask(mainTask)
77
78 function abort(err) {
79 cancelAll()
80 cb(err, true)
81 }
82
83 function addTask(task) {
84 tasks.push(task)
85 task.cont = (res, isErr) => {
86 if (completed) {
87 return
88 }
89
90 remove(tasks, task)
91 task.cont = noop
92 if (isErr) {
93 abort(res)
94 } else {
95 if (task === mainTask) {
96 result = res
97 }
98 if (!tasks.length) {
99 completed = true
100 cb(result)
101 }
102 }
103 }
104 // task.cont.cancel = task.cancel
105 }
106
107 function cancelAll() {
108 if (completed) {
109 return
110 }
111 completed = true
112 tasks.forEach(t => {
113 t.cont = noop
114 t.cancel()
115 })
116 tasks = []
117 }
118
119 return {
120 addTask,
121 cancelAll,
122 abort,
123 getTasks: () => tasks,
124 taskNames: () => tasks.map(t => t.name),
125 }
126}
127
128function createTaskIterator({ context, fn, args }) {
129 if (is.iterator(fn)) {
130 return fn
131 }
132
133 // catch synchronous failures; see #152 and #441
134 let result, error
135 try {
136 result = fn.apply(context, args)
137 } catch (err) {
138 error = err
139 }
140
141 // i.e. a generator function returns an iterator
142 if (is.iterator(result)) {
143 return result
144 }
145
146 // do not bubble up synchronous failures for detached forks
147 // instead create a failed task. See #152 and #441
148 return error
149 ? makeIterator(() => {
150 throw error
151 })
152 : makeIterator(
153 (function() {
154 let pc
155 const eff = { done: false, value: result }
156 const ret = value => ({ done: true, value })
157 return arg => {
158 if (!pc) {
159 pc = true
160 return eff
161 } else {
162 return ret(arg)
163 }
164 }
165 })(),
166 )
167}
168
169const wrapHelper = helper => ({ fn: helper })
170
171export default function proc(
172 iterator,
173 subscribe = () => noop,
174 dispatch = noop,
175 getState = noop,
176 parentContext = {},
177 options = {},
178 parentEffectId = 0,
179 name = 'anonymous',
180 cont,
181) {
182 check(iterator, is.iterator, NOT_ITERATOR_ERROR)
183
184 const effectsString = '[...effects]'
185 const runParallelEffect = deprecate(runAllEffect, updateIncentive(effectsString, `all(${effectsString})`))
186
187 const { sagaMonitor, logger, onError } = options
188 const log = logger || _log
189 const stdChannel = _stdChannel(subscribe)
190 const taskContext = Object.create(parentContext)
191 /**
192 Tracks the current effect cancellation
193 Each time the generator progresses. calling runEffect will set a new value
194 on it. It allows propagating cancellation to child effects
195 **/
196 next.cancel = noop
197
198 /**
199 Creates a new task descriptor for this generator, We'll also create a main task
200 to track the main flow (besides other forked tasks)
201 **/
202 const task = newTask(parentEffectId, name, iterator, cont)
203 const mainTask = { name, cancel: cancelMain, isRunning: true }
204 const taskQueue = forkQueue(name, mainTask, end)
205
206 /**
207 cancellation of the main task. We'll simply resume the Generator with a Cancel
208 **/
209 function cancelMain() {
210 if (mainTask.isRunning && !mainTask.isCancelled) {
211 mainTask.isCancelled = true
212 next(TASK_CANCEL)
213 }
214 }
215
216 /**
217 This may be called by a parent generator to trigger/propagate cancellation
218 cancel all pending tasks (including the main task), then end the current task.
219
220 Cancellation propagates down to the whole execution tree holded by this Parent task
221 It's also propagated to all joiners of this task and their execution tree/joiners
222
223 Cancellation is noop for terminated/Cancelled tasks tasks
224 **/
225 function cancel() {
226 /**
227 We need to check both Running and Cancelled status
228 Tasks can be Cancelled but still Running
229 **/
230 if (iterator._isRunning && !iterator._isCancelled) {
231 iterator._isCancelled = true
232 taskQueue.cancelAll()
233 /**
234 Ending with a Never result will propagate the Cancellation to all joiners
235 **/
236 end(TASK_CANCEL)
237 }
238 }
239 /**
240 attaches cancellation logic to this task's continuation
241 this will permit cancellation to propagate down the call chain
242 **/
243 cont && (cont.cancel = cancel)
244
245 // tracks the running status
246 iterator._isRunning = true
247
248 // kicks up the generator
249 next()
250
251 // then return the task descriptor to the caller
252 return task
253
254 /**
255 This is the generator driver
256 It's a recursive async/continuation function which calls itself
257 until the generator terminates or throws
258 **/
259 function next(arg, isErr) {
260 // Preventive measure. If we end up here, then there is really something wrong
261 if (!mainTask.isRunning) {
262 throw new Error('Trying to resume an already finished generator')
263 }
264
265 try {
266 let result
267 if (isErr) {
268 result = iterator.throw(arg)
269 } else if (arg === TASK_CANCEL) {
270 /**
271 getting TASK_CANCEL automatically cancels the main task
272 We can get this value here
273
274 - By cancelling the parent task manually
275 - By joining a Cancelled task
276 **/
277 mainTask.isCancelled = true
278 /**
279 Cancels the current effect; this will propagate the cancellation down to any called tasks
280 **/
281 next.cancel()
282 /**
283 If this Generator has a `return` method then invokes it
284 This will jump to the finally block
285 **/
286 result = is.func(iterator.return) ? iterator.return(TASK_CANCEL) : { done: true, value: TASK_CANCEL }
287 } else if (arg === CHANNEL_END) {
288 // We get CHANNEL_END by taking from a channel that ended using `take` (and not `takem` used to trap End of channels)
289 result = is.func(iterator.return) ? iterator.return() : { done: true }
290 } else {
291 result = iterator.next(arg)
292 }
293
294 if (!result.done) {
295 runEffect(result.value, parentEffectId, '', next)
296 } else {
297 /**
298 This Generator has ended, terminate the main task and notify the fork queue
299 **/
300 mainTask.isMainRunning = false
301 mainTask.cont && mainTask.cont(result.value)
302 }
303 } catch (error) {
304 if (mainTask.isCancelled) {
305 log('error', `uncaught at ${name}`, error.message)
306 }
307 mainTask.isMainRunning = false
308 mainTask.cont(error, true)
309 }
310 }
311
312 function end(result, isErr) {
313 iterator._isRunning = false
314 stdChannel.close()
315 if (!isErr) {
316 if (process.env.NODE_ENV === 'development' && result === TASK_CANCEL) {
317 log('info', `${name} has been cancelled`, '')
318 }
319 iterator._result = result
320 iterator._deferredEnd && iterator._deferredEnd.resolve(result)
321 } else {
322 if (result instanceof Error) {
323 result.sagaStack = `at ${name} \n ${result.sagaStack || result.stack}`
324 }
325 if (!task.cont) {
326 log('error', `uncaught`, result.sagaStack || result.stack)
327 if (result instanceof Error && onError) {
328 onError(result)
329 }
330 }
331 iterator._error = result
332 iterator._isAborted = true
333 iterator._deferredEnd && iterator._deferredEnd.reject(result)
334 }
335 task.cont && task.cont(result, isErr)
336 task.joiners.forEach(j => j.cb(result, isErr))
337 task.joiners = null
338 }
339
340 function runEffect(effect, parentEffectId, label = '', cb) {
341 const effectId = nextEffectId()
342 sagaMonitor && sagaMonitor.effectTriggered({ effectId, parentEffectId, label, effect })
343
344 /**
345 completion callback and cancel callback are mutually exclusive
346 We can't cancel an already completed effect
347 And We can't complete an already cancelled effectId
348 **/
349 let effectSettled
350
351 // Completion callback passed to the appropriate effect runner
352 function currCb(res, isErr) {
353 if (effectSettled) {
354 return
355 }
356
357 effectSettled = true
358 cb.cancel = noop // defensive measure
359 if (sagaMonitor) {
360 isErr ? sagaMonitor.effectRejected(effectId, res) : sagaMonitor.effectResolved(effectId, res)
361 }
362 cb(res, isErr)
363 }
364 // tracks down the current cancel
365 currCb.cancel = noop
366
367 // setup cancellation logic on the parent cb
368 cb.cancel = () => {
369 // prevents cancelling an already completed effect
370 if (effectSettled) {
371 return
372 }
373
374 effectSettled = true
375 /**
376 propagates cancel downward
377 catch uncaught cancellations errors; since we can no longer call the completion
378 callback, log errors raised during cancellations into the console
379 **/
380 try {
381 currCb.cancel()
382 } catch (err) {
383 log('error', `uncaught at ${name}`, err.message)
384 }
385 currCb.cancel = noop // defensive measure
386
387 sagaMonitor && sagaMonitor.effectCancelled(effectId)
388 }
389
390 /**
391 each effect runner must attach its own logic of cancellation to the provided callback
392 it allows this generator to propagate cancellation downward.
393
394 ATTENTION! effect runners must setup the cancel logic by setting cb.cancel = [cancelMethod]
395 And the setup must occur before calling the callback
396
397 This is a sort of inversion of control: called async functions are responsible
398 for completing the flow by calling the provided continuation; while caller functions
399 are responsible for aborting the current flow by calling the attached cancel function
400
401 Library users can attach their own cancellation logic to promises by defining a
402 promise[CANCEL] method in their returned promises
403 ATTENTION! calling cancel must have no effect on an already completed or cancelled effect
404 **/
405 let data
406 // prettier-ignore
407 return (
408 // Non declarative effect
409 is.promise(effect) ? resolvePromise(effect, currCb)
410 : is.helper(effect) ? runForkEffect(wrapHelper(effect), effectId, currCb)
411 : is.iterator(effect) ? resolveIterator(effect, effectId, name, currCb)
412
413 // declarative effects
414 : is.array(effect) ? runParallelEffect(effect, effectId, currCb)
415 : (data = asEffect.take(effect)) ? runTakeEffect(data, currCb)
416 : (data = asEffect.put(effect)) ? runPutEffect(data, currCb)
417 : (data = asEffect.all(effect)) ? runAllEffect(data, effectId, currCb)
418 : (data = asEffect.race(effect)) ? runRaceEffect(data, effectId, currCb)
419 : (data = asEffect.call(effect)) ? runCallEffect(data, effectId, currCb)
420 : (data = asEffect.cps(effect)) ? runCPSEffect(data, currCb)
421 : (data = asEffect.fork(effect)) ? runForkEffect(data, effectId, currCb)
422 : (data = asEffect.join(effect)) ? runJoinEffect(data, currCb)
423 : (data = asEffect.cancel(effect)) ? runCancelEffect(data, currCb)
424 : (data = asEffect.select(effect)) ? runSelectEffect(data, currCb)
425 : (data = asEffect.actionChannel(effect)) ? runChannelEffect(data, currCb)
426 : (data = asEffect.flush(effect)) ? runFlushEffect(data, currCb)
427 : (data = asEffect.cancelled(effect)) ? runCancelledEffect(data, currCb)
428 : (data = asEffect.getContext(effect)) ? runGetContextEffect(data, currCb)
429 : (data = asEffect.setContext(effect)) ? runSetContextEffect(data, currCb)
430 : /* anything else returned as is */ currCb(effect)
431 )
432 }
433
434 function resolvePromise(promise, cb) {
435 const cancelPromise = promise[CANCEL]
436 if (is.func(cancelPromise)) {
437 cb.cancel = cancelPromise
438 } else if (is.func(promise.abort)) {
439 cb.cancel = () => promise.abort()
440 // TODO: add support for the fetch API, whenever they get around to
441 // adding cancel support
442 }
443 promise.then(cb, error => cb(error, true))
444 }
445
446 function resolveIterator(iterator, effectId, name, cb) {
447 proc(iterator, subscribe, dispatch, getState, taskContext, options, effectId, name, cb)
448 }
449
450 function runTakeEffect({ channel, pattern, maybe }, cb) {
451 channel = channel || stdChannel
452 const takeCb = inp => (inp instanceof Error ? cb(inp, true) : isEnd(inp) && !maybe ? cb(CHANNEL_END) : cb(inp))
453 try {
454 channel.take(takeCb, matcher(pattern))
455 } catch (err) {
456 return cb(err, true)
457 }
458 cb.cancel = takeCb.cancel
459 }
460
461 function runPutEffect({ channel, action, resolve }, cb) {
462 /**
463 Schedule the put in case another saga is holding a lock.
464 The put will be executed atomically. ie nested puts will execute after
465 this put has terminated.
466 **/
467 asap(() => {
468 let result
469 try {
470 result = (channel ? channel.put : dispatch)(action)
471 } catch (error) {
472 // If we have a channel or `put.resolve` was used then bubble up the error.
473 if (channel || resolve) return cb(error, true)
474 log('error', `uncaught at ${name}`, error.stack || error.message || error)
475 }
476
477 if (resolve && is.promise(result)) {
478 resolvePromise(result, cb)
479 } else {
480 return cb(result)
481 }
482 })
483 // Put effects are non cancellables
484 }
485
486 function runCallEffect({ context, fn, args }, effectId, cb) {
487 let result
488 // catch synchronous failures; see #152
489 try {
490 result = fn.apply(context, args)
491 } catch (error) {
492 return cb(error, true)
493 }
494 return is.promise(result)
495 ? resolvePromise(result, cb)
496 : is.iterator(result) ? resolveIterator(result, effectId, fn.name, cb) : cb(result)
497 }
498
499 function runCPSEffect({ context, fn, args }, cb) {
500 // CPS (ie node style functions) can define their own cancellation logic
501 // by setting cancel field on the cb
502
503 // catch synchronous failures; see #152
504 try {
505 const cpsCb = (err, res) => (is.undef(err) ? cb(res) : cb(err, true))
506 fn.apply(context, args.concat(cpsCb))
507 if (cpsCb.cancel) {
508 cb.cancel = () => cpsCb.cancel()
509 }
510 } catch (error) {
511 return cb(error, true)
512 }
513 }
514
515 function runForkEffect({ context, fn, args, detached }, effectId, cb) {
516 const taskIterator = createTaskIterator({ context, fn, args })
517
518 try {
519 suspend()
520 const task = proc(
521 taskIterator,
522 subscribe,
523 dispatch,
524 getState,
525 taskContext,
526 options,
527 effectId,
528 fn.name,
529 detached ? null : noop,
530 )
531
532 if (detached) {
533 cb(task)
534 } else {
535 if (taskIterator._isRunning) {
536 taskQueue.addTask(task)
537 cb(task)
538 } else if (taskIterator._error) {
539 taskQueue.abort(taskIterator._error)
540 } else {
541 cb(task)
542 }
543 }
544 } finally {
545 flush()
546 }
547 // Fork effects are non cancellables
548 }
549
550 function runJoinEffect(t, cb) {
551 if (t.isRunning()) {
552 const joiner = { task, cb }
553 cb.cancel = () => remove(t.joiners, joiner)
554 t.joiners.push(joiner)
555 } else {
556 t.isAborted() ? cb(t.error(), true) : cb(t.result())
557 }
558 }
559
560 function runCancelEffect(taskToCancel, cb) {
561 if (taskToCancel === SELF_CANCELLATION) {
562 taskToCancel = task
563 }
564 if (taskToCancel.isRunning()) {
565 taskToCancel.cancel()
566 }
567 cb()
568 // cancel effects are non cancellables
569 }
570
571 function runAllEffect(effects, effectId, cb) {
572 const keys = Object.keys(effects)
573
574 if (!keys.length) {
575 return cb(is.array(effects) ? [] : {})
576 }
577
578 let completedCount = 0
579 let completed
580 const results = {}
581 const childCbs = {}
582
583 function checkEffectEnd() {
584 if (completedCount === keys.length) {
585 completed = true
586 cb(is.array(effects) ? array.from({ ...results, length: keys.length }) : results)
587 }
588 }
589
590 keys.forEach(key => {
591 const chCbAtKey = (res, isErr) => {
592 if (completed) {
593 return
594 }
595 if (isErr || isEnd(res) || res === CHANNEL_END || res === TASK_CANCEL) {
596 cb.cancel()
597 cb(res, isErr)
598 } else {
599 results[key] = res
600 completedCount++
601 checkEffectEnd()
602 }
603 }
604 chCbAtKey.cancel = noop
605 childCbs[key] = chCbAtKey
606 })
607
608 cb.cancel = () => {
609 if (!completed) {
610 completed = true
611 keys.forEach(key => childCbs[key].cancel())
612 }
613 }
614
615 keys.forEach(key => runEffect(effects[key], effectId, key, childCbs[key]))
616 }
617
618 function runRaceEffect(effects, effectId, cb) {
619 let completed
620 const keys = Object.keys(effects)
621 const childCbs = {}
622
623 keys.forEach(key => {
624 const chCbAtKey = (res, isErr) => {
625 if (completed) {
626 return
627 }
628
629 if (isErr) {
630 // Race Auto cancellation
631 cb.cancel()
632 cb(res, true)
633 } else if (!isEnd(res) && res !== CHANNEL_END && res !== TASK_CANCEL) {
634 cb.cancel()
635 completed = true
636 cb({ [key]: res })
637 }
638 }
639 chCbAtKey.cancel = noop
640 childCbs[key] = chCbAtKey
641 })
642
643 cb.cancel = () => {
644 // prevents unnecessary cancellation
645 if (!completed) {
646 completed = true
647 keys.forEach(key => childCbs[key].cancel())
648 }
649 }
650 keys.forEach(key => {
651 if (completed) {
652 return
653 }
654 runEffect(effects[key], effectId, key, childCbs[key])
655 })
656 }
657
658 function runSelectEffect({ selector, args }, cb) {
659 try {
660 const state = selector(getState(), ...args)
661 cb(state)
662 } catch (error) {
663 cb(error, true)
664 }
665 }
666
667 function runChannelEffect({ pattern, buffer }, cb) {
668 const match = matcher(pattern)
669 match.pattern = pattern
670 cb(eventChannel(subscribe, buffer || buffers.fixed(), match))
671 }
672
673 function runCancelledEffect(data, cb) {
674 cb(!!mainTask.isCancelled)
675 }
676
677 function runFlushEffect(channel, cb) {
678 channel.flush(cb)
679 }
680
681 function runGetContextEffect(prop, cb) {
682 cb(taskContext[prop])
683 }
684
685 function runSetContextEffect(props, cb) {
686 object.assign(taskContext, props)
687 cb()
688 }
689
690 function newTask(id, name, iterator, cont) {
691 iterator._deferredEnd = null
692 return {
693 [TASK]: true,
694 id,
695 name,
696 get done() {
697 if (iterator._deferredEnd) {
698 return iterator._deferredEnd.promise
699 } else {
700 const def = deferred()
701 iterator._deferredEnd = def
702 if (!iterator._isRunning) {
703 iterator._error ? def.reject(iterator._error) : def.resolve(iterator._result)
704 }
705 return def.promise
706 }
707 },
708 cont,
709 joiners: [],
710 cancel,
711 isRunning: () => iterator._isRunning,
712 isCancelled: () => iterator._isCancelled,
713 isAborted: () => iterator._isAborted,
714 result: () => iterator._result,
715 error: () => iterator._error,
716 setContext(props) {
717 check(props, is.object, createSetContextWarning('task', props))
718 object.assign(taskContext, props)
719 },
720 }
721 }
722}