UNPKG

8.8 kBPlain TextView Raw
1import { utils } from '@neo-one/utils';
2import { BehaviorSubject, combineLatest, Observable, ReplaySubject, Subscription } from 'rxjs';
3import { map } from 'rxjs/operators';
4import { getTaskError, getTasksError, isTaskDone } from './tasks';
5import { TaskStatus } from './types';
6
7// tslint:disable-next-line no-any
8export type TaskContext = any;
9type SkipFn = (ctx: TaskContext) => string | boolean;
10type EnabledFn = (ctx: TaskContext) => boolean;
11type OnErrorFn = (error: Error, ctx: TaskContext) => void;
12type OnDoneFn = (failed: boolean) => void;
13
14export interface Task {
15 readonly skip?: SkipFn;
16 readonly enabled?: EnabledFn;
17 readonly task: (ctx: TaskContext) => Promise<void> | Promise<string> | Observable<string> | TaskList | void;
18 readonly title: string;
19}
20
21export interface TaskListOptions {
22 readonly tasks: ReadonlyArray<Task>;
23 readonly concurrent?: boolean;
24 readonly onError?: OnErrorFn;
25 readonly onComplete?: () => void;
26 readonly onDone?: OnDoneFn;
27 readonly initialContext?: TaskContext;
28 readonly freshContext?: boolean;
29 readonly collapse?: boolean;
30}
31
32class TaskWrapper {
33 public readonly status$: BehaviorSubject<TaskStatus | undefined>;
34 private readonly task: Task;
35 private readonly taskList: TaskList;
36 private readonly skip: SkipFn;
37 private readonly getEnabled: EnabledFn;
38 private mutableAborted: boolean;
39
40 public constructor({
41 task,
42 taskList,
43 collapse,
44 }: {
45 readonly task: Task;
46 readonly taskList: TaskList;
47 readonly collapse: boolean;
48 }) {
49 this.task = task;
50 this.taskList = taskList;
51 this.status$ = new BehaviorSubject<TaskStatus | undefined>({
52 id: task.title,
53 title: task.title,
54 collapse,
55 });
56
57 this.skip = task.skip === undefined ? (_ctx) => false : task.skip;
58 this.getEnabled = task.enabled === undefined ? (_ctx) => true : task.enabled;
59 this.mutableAborted = false;
60 }
61
62 public check(ctx: TaskContext): void {
63 if (this.enabled && !this.pending && !this.done && !this.getEnabled(ctx)) {
64 this.status$.next(undefined);
65 }
66 }
67
68 public get enabled(): boolean {
69 return this.status$.getValue() !== undefined;
70 }
71
72 public get pending(): boolean {
73 const status = this.status$.getValue();
74
75 return status !== undefined && status.pending === true;
76 }
77
78 public get done(): boolean {
79 const status = this.status$.getValue();
80
81 return status !== undefined && isTaskDone(status);
82 }
83
84 public get error(): string | undefined {
85 const status = this.status$.getValue();
86
87 return status === undefined ? undefined : getTaskError(status);
88 }
89
90 public abort(): void {
91 this.mutableAborted = true;
92 const status = this.status$.getValue();
93 if (status !== undefined) {
94 this.status$.next({ ...status, skipped: 'Aborted' });
95 }
96 this.status$.complete();
97 }
98
99 public complete(): void {
100 this.status$.complete();
101 }
102
103 public async run(ctx: TaskContext): Promise<void> {
104 const statusIn = this.status$.getValue();
105 if (statusIn === undefined) {
106 this.status$.complete();
107
108 return;
109 }
110 if (this.mutableAborted) {
111 return;
112 }
113
114 let status = { ...statusIn, pending: true };
115
116 const onError = (error: Error) => {
117 this.taskList.onError(error, ctx);
118 this.taskList.mutableSuperOnError(error);
119 };
120
121 try {
122 const skip = this.skip(ctx);
123 if (skip !== false) {
124 status = { ...status, pending: false, skipped: skip };
125 this.status$.next(status);
126 } else {
127 this.status$.next(status);
128
129 // tslint:disable-next-line rxjs-finnish
130 const result = this.task.task(ctx);
131
132 let error;
133 let message: string | undefined | void;
134 if (result instanceof Observable) {
135 await result
136 .pipe(
137 map((msg) => {
138 status = { ...status, message: msg };
139 this.status$.next(status);
140 }),
141 )
142 .toPromise();
143 } else if (result instanceof Promise) {
144 message = await result;
145 } else if (result instanceof TaskList) {
146 result.setSuperOnError(onError);
147 // tslint:disable-next-line no-floating-promises
148 result.run(ctx);
149 const finalSubtasks = await result.status$
150 .pipe(
151 map((subtasks) => {
152 status = { ...status, subtasks };
153 this.status$.next(status);
154
155 return subtasks;
156 }),
157 )
158 .toPromise();
159 error = getTasksError(finalSubtasks);
160 }
161
162 this.status$.next({
163 ...status,
164 pending: false,
165 complete: error === undefined,
166 message: message === undefined ? undefined : message,
167 error,
168 });
169 }
170 } catch (error) {
171 const message = error.stack == undefined ? error.message : error.stack;
172 this.status$.next({
173 ...status,
174 pending: false,
175 error: message == undefined || message === '' ? 'Something went wrong.' : message,
176 });
177
178 onError(error);
179 }
180
181 this.status$.complete();
182 }
183}
184
185export class TaskList {
186 public mutableSuperOnError: (error: Error) => void;
187 public readonly onError: OnErrorFn;
188 private readonly tasks: ReadonlyArray<TaskWrapper>;
189 private readonly concurrent: boolean;
190 private readonly onComplete: () => void;
191 private readonly onDone: OnDoneFn;
192 private readonly initialContext: TaskContext;
193 private readonly freshContext: boolean;
194 private readonly statusInternal$: ReplaySubject<ReadonlyArray<TaskStatus>>;
195 private mutableSubscription: Subscription | undefined;
196
197 public constructor({
198 tasks,
199 concurrent = false,
200 onError,
201 onComplete,
202 onDone,
203 initialContext = {},
204 freshContext = false,
205 collapse = true,
206 }: TaskListOptions) {
207 this.tasks = tasks.map(
208 (task) =>
209 new TaskWrapper({
210 task,
211 taskList: this,
212 collapse,
213 }),
214 );
215
216 this.concurrent = concurrent;
217 this.onError =
218 onError === undefined
219 ? (_error, _ctx) => {
220 // do nothing
221 }
222 : onError;
223 this.onComplete =
224 onComplete === undefined
225 ? () => {
226 // do nothing
227 }
228 : onComplete;
229 this.onDone =
230 onDone === undefined
231 ? (_failed) => {
232 // do nothing
233 }
234 : onDone;
235 this.initialContext = initialContext;
236 this.freshContext = freshContext;
237 this.mutableSuperOnError = (_error) => {
238 // do nothing
239 };
240
241 this.statusInternal$ = new ReplaySubject(1);
242 }
243
244 public get status$(): Observable<ReadonlyArray<TaskStatus>> {
245 this.run().catch((error) => this.onError(error, {}));
246
247 return this.statusInternal$;
248 }
249
250 public async toPromise(): Promise<void> {
251 const result = await this.status$.toPromise();
252 const error = getTasksError(result);
253 if (error !== undefined) {
254 throw new Error(error);
255 }
256 }
257
258 public async abort(): Promise<void> {
259 await this.abort$().toPromise();
260 }
261
262 public abort$(): Observable<ReadonlyArray<TaskStatus>> {
263 this.tasks.forEach((task) => task.abort());
264
265 return this.status$;
266 }
267
268 public setSuperOnError(onError: (error: Error) => void): void {
269 this.mutableSuperOnError = onError;
270 }
271
272 public async run(ctxIn: TaskContext = {}): Promise<void> {
273 if (this.mutableSubscription !== undefined) {
274 return;
275 }
276
277 const ctx = this.freshContext ? {} : ctxIn;
278 Object.entries(this.initialContext).forEach(([key, value]) => {
279 // tslint:disable-next-line no-object-mutation
280 ctx[key] = value;
281 });
282 this.checkAll(ctx);
283
284 this.mutableSubscription = combineLatest(this.tasks.map((task) => task.status$))
285 .pipe(map((statuses): ReadonlyArray<TaskStatus> => statuses.filter(utils.notNull)))
286 .subscribe(this.statusInternal$);
287
288 await this.runTasks(ctx);
289 const err = getTasksError(this.tasks.map((task) => task.status$.getValue()).filter(utils.notNull));
290
291 if (err === undefined) {
292 this.onComplete();
293 }
294 this.onDone(err !== undefined);
295 }
296
297 private async runTasks(ctx: TaskContext): Promise<void> {
298 if (this.tasks.length === 0) {
299 this.statusInternal$.next([]);
300
301 return;
302 }
303
304 if (this.concurrent) {
305 await Promise.all(this.tasks.map(async (task) => task.run(ctx)));
306 } else {
307 let error: string | undefined;
308 // tslint:disable-next-line no-loop-statement
309 for (const task of this.tasks) {
310 if (error === undefined) {
311 await task.run(ctx);
312 } else {
313 task.complete();
314 }
315 error = task.error;
316 }
317 }
318 }
319
320 private checkAll(ctx: TaskContext): void {
321 this.tasks.forEach((task) => task.check(ctx));
322 }
323}