UNPKG

4.91 kBPlain TextView Raw
1import {
2 action,
3 noop,
4 die,
5 isFunction,
6 Annotation,
7 isStringish,
8 storeAnnotation,
9 createFlowAnnotation,
10 createDecoratorAnnotation
11} from "../internal"
12
13export const FLOW = "flow"
14
15let generatorId = 0
16
17export function FlowCancellationError() {
18 this.message = "FLOW_CANCELLED"
19}
20FlowCancellationError.prototype = Object.create(Error.prototype)
21
22export function isFlowCancellationError(error: Error) {
23 return error instanceof FlowCancellationError
24}
25
26export type CancellablePromise<T> = Promise<T> & { cancel(): void }
27
28interface Flow extends Annotation, PropertyDecorator {
29 <R, Args extends any[]>(
30 generator: (...args: Args) => Generator<any, R, any> | AsyncGenerator<any, R, any>
31 ): (...args: Args) => CancellablePromise<R>
32 bound: Annotation & PropertyDecorator
33}
34
35const flowAnnotation = createFlowAnnotation("flow")
36const flowBoundAnnotation = createFlowAnnotation("flow.bound", { bound: true })
37
38export const flow: Flow = Object.assign(
39 function flow(arg1, arg2?) {
40 // @flow
41 if (isStringish(arg2)) {
42 return storeAnnotation(arg1, arg2, flowAnnotation)
43 }
44 // flow(fn)
45 if (__DEV__ && arguments.length !== 1)
46 die(`Flow expects single argument with generator function`)
47 const generator = arg1
48 const name = generator.name || "<unnamed flow>"
49
50 // Implementation based on https://github.com/tj/co/blob/master/index.js
51 const res = function () {
52 const ctx = this
53 const args = arguments
54 const runId = ++generatorId
55 const gen = action(`${name} - runid: ${runId} - init`, generator).apply(ctx, args)
56 let rejector: (error: any) => void
57 let pendingPromise: CancellablePromise<any> | undefined = undefined
58
59 const promise = new Promise(function (resolve, reject) {
60 let stepId = 0
61 rejector = reject
62
63 function onFulfilled(res: any) {
64 pendingPromise = undefined
65 let ret
66 try {
67 ret = action(
68 `${name} - runid: ${runId} - yield ${stepId++}`,
69 gen.next
70 ).call(gen, res)
71 } catch (e) {
72 return reject(e)
73 }
74
75 next(ret)
76 }
77
78 function onRejected(err: any) {
79 pendingPromise = undefined
80 let ret
81 try {
82 ret = action(
83 `${name} - runid: ${runId} - yield ${stepId++}`,
84 gen.throw!
85 ).call(gen, err)
86 } catch (e) {
87 return reject(e)
88 }
89 next(ret)
90 }
91
92 function next(ret: any) {
93 if (isFunction(ret?.then)) {
94 // an async iterator
95 ret.then(next, reject)
96 return
97 }
98 if (ret.done) return resolve(ret.value)
99 pendingPromise = Promise.resolve(ret.value) as any
100 return pendingPromise!.then(onFulfilled, onRejected)
101 }
102
103 onFulfilled(undefined) // kick off the process
104 }) as any
105
106 promise.cancel = action(`${name} - runid: ${runId} - cancel`, function () {
107 try {
108 if (pendingPromise) cancelPromise(pendingPromise)
109 // Finally block can return (or yield) stuff..
110 const res = gen.return!(undefined as any)
111 // eat anything that promise would do, it's cancelled!
112 const yieldedPromise = Promise.resolve(res.value)
113 yieldedPromise.then(noop, noop)
114 cancelPromise(yieldedPromise) // maybe it can be cancelled :)
115 // reject our original promise
116 rejector(new FlowCancellationError())
117 } catch (e) {
118 rejector(e) // there could be a throwing finally block
119 }
120 })
121 return promise
122 }
123 res.isMobXFlow = true
124 return res
125 } as any,
126 flowAnnotation
127)
128
129flow.bound = createDecoratorAnnotation(flowBoundAnnotation)
130
131function cancelPromise(promise) {
132 if (isFunction(promise.cancel)) promise.cancel()
133}
134
135export function flowResult<T>(
136 result: T
137): T extends Generator<any, infer R, any>
138 ? CancellablePromise<R>
139 : T extends CancellablePromise<any>
140 ? T
141 : never {
142 return result as any // just tricking TypeScript :)
143}
144
145export function isFlow(fn: any): boolean {
146 return fn?.isMobXFlow === true
147}