1 | import { utils } from '@neo-one/utils';
|
2 | import { BehaviorSubject, combineLatest, Observable, ReplaySubject, Subscription } from 'rxjs';
|
3 | import { map } from 'rxjs/operators';
|
4 | import { getTaskError, getTasksError, isTaskDone } from './tasks';
|
5 | import { TaskStatus } from './types';
|
6 |
|
7 |
|
8 | export type TaskContext = any;
|
9 | type SkipFn = (ctx: TaskContext) => string | boolean;
|
10 | type EnabledFn = (ctx: TaskContext) => boolean;
|
11 | type OnErrorFn = (error: Error, ctx: TaskContext) => void;
|
12 | type OnDoneFn = (failed: boolean) => void;
|
13 |
|
14 | export interface Task {
|
15 | readonly skip?: SkipFn;
|
16 | readonly enabled?: EnabledFn;
|
17 | readonly task: (ctx: TaskContext) => Promise<void> | Promise<string> | Observable<string> | TaskList | void;
|
18 | readonly title: string;
|
19 | }
|
20 |
|
21 | export interface TaskListOptions {
|
22 | readonly tasks: ReadonlyArray<Task>;
|
23 | readonly concurrent?: boolean;
|
24 | readonly onError?: OnErrorFn;
|
25 | readonly onComplete?: () => void;
|
26 | readonly onDone?: OnDoneFn;
|
27 | readonly initialContext?: TaskContext;
|
28 | readonly freshContext?: boolean;
|
29 | readonly collapse?: boolean;
|
30 | }
|
31 |
|
32 | class TaskWrapper {
|
33 | public readonly status$: BehaviorSubject<TaskStatus | undefined>;
|
34 | private readonly task: Task;
|
35 | private readonly taskList: TaskList;
|
36 | private readonly skip: SkipFn;
|
37 | private readonly getEnabled: EnabledFn;
|
38 | private mutableAborted: boolean;
|
39 |
|
40 | public constructor({
|
41 | task,
|
42 | taskList,
|
43 | collapse,
|
44 | }: {
|
45 | readonly task: Task;
|
46 | readonly taskList: TaskList;
|
47 | readonly collapse: boolean;
|
48 | }) {
|
49 | this.task = task;
|
50 | this.taskList = taskList;
|
51 | this.status$ = new BehaviorSubject<TaskStatus | undefined>({
|
52 | id: task.title,
|
53 | title: task.title,
|
54 | collapse,
|
55 | });
|
56 |
|
57 | this.skip = task.skip === undefined ? (_ctx) => false : task.skip;
|
58 | this.getEnabled = task.enabled === undefined ? (_ctx) => true : task.enabled;
|
59 | this.mutableAborted = false;
|
60 | }
|
61 |
|
62 | public check(ctx: TaskContext): void {
|
63 | if (this.enabled && !this.pending && !this.done && !this.getEnabled(ctx)) {
|
64 | this.status$.next(undefined);
|
65 | }
|
66 | }
|
67 |
|
68 | public get enabled(): boolean {
|
69 | return this.status$.getValue() !== undefined;
|
70 | }
|
71 |
|
72 | public get pending(): boolean {
|
73 | const status = this.status$.getValue();
|
74 |
|
75 | return status !== undefined && status.pending === true;
|
76 | }
|
77 |
|
78 | public get done(): boolean {
|
79 | const status = this.status$.getValue();
|
80 |
|
81 | return status !== undefined && isTaskDone(status);
|
82 | }
|
83 |
|
84 | public get error(): string | undefined {
|
85 | const status = this.status$.getValue();
|
86 |
|
87 | return status === undefined ? undefined : getTaskError(status);
|
88 | }
|
89 |
|
90 | public abort(): void {
|
91 | this.mutableAborted = true;
|
92 | const status = this.status$.getValue();
|
93 | if (status !== undefined) {
|
94 | this.status$.next({ ...status, skipped: 'Aborted' });
|
95 | }
|
96 | this.status$.complete();
|
97 | }
|
98 |
|
99 | public complete(): void {
|
100 | this.status$.complete();
|
101 | }
|
102 |
|
103 | public async run(ctx: TaskContext): Promise<void> {
|
104 | const statusIn = this.status$.getValue();
|
105 | if (statusIn === undefined) {
|
106 | this.status$.complete();
|
107 |
|
108 | return;
|
109 | }
|
110 | if (this.mutableAborted) {
|
111 | return;
|
112 | }
|
113 |
|
114 | let status = { ...statusIn, pending: true };
|
115 |
|
116 | const onError = (error: Error) => {
|
117 | this.taskList.onError(error, ctx);
|
118 | this.taskList.mutableSuperOnError(error);
|
119 | };
|
120 |
|
121 | try {
|
122 | const skip = this.skip(ctx);
|
123 | if (skip !== false) {
|
124 | status = { ...status, pending: false, skipped: skip };
|
125 | this.status$.next(status);
|
126 | } else {
|
127 | this.status$.next(status);
|
128 |
|
129 |
|
130 | const result = this.task.task(ctx);
|
131 |
|
132 | let error;
|
133 | let message: string | undefined | void;
|
134 | if (result instanceof Observable) {
|
135 | await result
|
136 | .pipe(
|
137 | map((msg) => {
|
138 | status = { ...status, message: msg };
|
139 | this.status$.next(status);
|
140 | }),
|
141 | )
|
142 | .toPromise();
|
143 | } else if (result instanceof Promise) {
|
144 | message = await result;
|
145 | } else if (result instanceof TaskList) {
|
146 | result.setSuperOnError(onError);
|
147 |
|
148 | result.run(ctx);
|
149 | const finalSubtasks = await result.status$
|
150 | .pipe(
|
151 | map((subtasks) => {
|
152 | status = { ...status, subtasks };
|
153 | this.status$.next(status);
|
154 |
|
155 | return subtasks;
|
156 | }),
|
157 | )
|
158 | .toPromise();
|
159 | error = getTasksError(finalSubtasks);
|
160 | }
|
161 |
|
162 | this.status$.next({
|
163 | ...status,
|
164 | pending: false,
|
165 | complete: error === undefined,
|
166 | message: message === undefined ? undefined : message,
|
167 | error,
|
168 | });
|
169 | }
|
170 | } catch (error) {
|
171 | const message = error.stack == undefined ? error.message : error.stack;
|
172 | this.status$.next({
|
173 | ...status,
|
174 | pending: false,
|
175 | error: message == undefined || message === '' ? 'Something went wrong.' : message,
|
176 | });
|
177 |
|
178 | onError(error);
|
179 | }
|
180 |
|
181 | this.status$.complete();
|
182 | }
|
183 | }
|
184 |
|
185 | export class TaskList {
|
186 | public mutableSuperOnError: (error: Error) => void;
|
187 | public readonly onError: OnErrorFn;
|
188 | private readonly tasks: ReadonlyArray<TaskWrapper>;
|
189 | private readonly concurrent: boolean;
|
190 | private readonly onComplete: () => void;
|
191 | private readonly onDone: OnDoneFn;
|
192 | private readonly initialContext: TaskContext;
|
193 | private readonly freshContext: boolean;
|
194 | private readonly statusInternal$: ReplaySubject<ReadonlyArray<TaskStatus>>;
|
195 | private mutableSubscription: Subscription | undefined;
|
196 |
|
197 | public constructor({
|
198 | tasks,
|
199 | concurrent = false,
|
200 | onError,
|
201 | onComplete,
|
202 | onDone,
|
203 | initialContext = {},
|
204 | freshContext = false,
|
205 | collapse = true,
|
206 | }: TaskListOptions) {
|
207 | this.tasks = tasks.map(
|
208 | (task) =>
|
209 | new TaskWrapper({
|
210 | task,
|
211 | taskList: this,
|
212 | collapse,
|
213 | }),
|
214 | );
|
215 |
|
216 | this.concurrent = concurrent;
|
217 | this.onError =
|
218 | onError === undefined
|
219 | ? (_error, _ctx) => {
|
220 |
|
221 | }
|
222 | : onError;
|
223 | this.onComplete =
|
224 | onComplete === undefined
|
225 | ? () => {
|
226 |
|
227 | }
|
228 | : onComplete;
|
229 | this.onDone =
|
230 | onDone === undefined
|
231 | ? (_failed) => {
|
232 |
|
233 | }
|
234 | : onDone;
|
235 | this.initialContext = initialContext;
|
236 | this.freshContext = freshContext;
|
237 | this.mutableSuperOnError = (_error) => {
|
238 |
|
239 | };
|
240 |
|
241 | this.statusInternal$ = new ReplaySubject(1);
|
242 | }
|
243 |
|
244 | public get status$(): Observable<ReadonlyArray<TaskStatus>> {
|
245 | this.run().catch((error) => this.onError(error, {}));
|
246 |
|
247 | return this.statusInternal$;
|
248 | }
|
249 |
|
250 | public async toPromise(): Promise<void> {
|
251 | const result = await this.status$.toPromise();
|
252 | const error = getTasksError(result);
|
253 | if (error !== undefined) {
|
254 | throw new Error(error);
|
255 | }
|
256 | }
|
257 |
|
258 | public async abort(): Promise<void> {
|
259 | await this.abort$().toPromise();
|
260 | }
|
261 |
|
262 | public abort$(): Observable<ReadonlyArray<TaskStatus>> {
|
263 | this.tasks.forEach((task) => task.abort());
|
264 |
|
265 | return this.status$;
|
266 | }
|
267 |
|
268 | public setSuperOnError(onError: (error: Error) => void): void {
|
269 | this.mutableSuperOnError = onError;
|
270 | }
|
271 |
|
272 | public async run(ctxIn: TaskContext = {}): Promise<void> {
|
273 | if (this.mutableSubscription !== undefined) {
|
274 | return;
|
275 | }
|
276 |
|
277 | const ctx = this.freshContext ? {} : ctxIn;
|
278 | Object.entries(this.initialContext).forEach(([key, value]) => {
|
279 |
|
280 | ctx[key] = value;
|
281 | });
|
282 | this.checkAll(ctx);
|
283 |
|
284 | this.mutableSubscription = combineLatest(this.tasks.map((task) => task.status$))
|
285 | .pipe(map((statuses): ReadonlyArray<TaskStatus> => statuses.filter(utils.notNull)))
|
286 | .subscribe(this.statusInternal$);
|
287 |
|
288 | await this.runTasks(ctx);
|
289 | const err = getTasksError(this.tasks.map((task) => task.status$.getValue()).filter(utils.notNull));
|
290 |
|
291 | if (err === undefined) {
|
292 | this.onComplete();
|
293 | }
|
294 | this.onDone(err !== undefined);
|
295 | }
|
296 |
|
297 | private async runTasks(ctx: TaskContext): Promise<void> {
|
298 | if (this.tasks.length === 0) {
|
299 | this.statusInternal$.next([]);
|
300 |
|
301 | return;
|
302 | }
|
303 |
|
304 | if (this.concurrent) {
|
305 | await Promise.all(this.tasks.map(async (task) => task.run(ctx)));
|
306 | } else {
|
307 | let error: string | undefined;
|
308 |
|
309 | for (const task of this.tasks) {
|
310 | if (error === undefined) {
|
311 | await task.run(ctx);
|
312 | } else {
|
313 | task.complete();
|
314 | }
|
315 | error = task.error;
|
316 | }
|
317 | }
|
318 | }
|
319 |
|
320 | private checkAll(ctx: TaskContext): void {
|
321 | this.tasks.forEach((task) => task.check(ctx));
|
322 | }
|
323 | }
|