UNPKG

21.3 kBMarkdownView Raw
1# Description
2
3Jobs is the Angular DevKit subsystem for scheduling and running generic functions with clearly
4typed inputs and outputs. A `Job` instance is a function associated with metadata. You can
5schedule a job, synchronize it with other jobs, and use it to schedule other jobs.
6
7The whole API is serializable, allowing you to use a Node Stream or message channel to
8communicate between the job and the job scheduler.
9
10Jobs are lazy, cold, and guaranteed to execute exactly once when scheduled. Subscribing to a job
11returns messages from the point where the job is at.
12
13## Argument, Input, Output and Channels
14
15A job receives a single argument when scheduled and can also listen to an input channel. It can
16emit multiple outputs, and can also provide multiple output channels that emit asynchronous JSON
17messages, which can be typed.
18
19The I/O model is like that of an executable, where the argument corresponds to arguments on the
20command line, the input channel to STDIN, the output channel to STDOUT, and the channels
21would be additional output streams.
22
23## LifeCycle
24
25A `Job` goes through multiple LifeCycle messages before its completion;
26
271. `JobState.Queued`. The job was queued and is waiting. This is the default state from the
28 scheduler.
291. `JobState.Ready`. The job's dependencies (see
30 ["Synchronizing and Dependencies"](#Dependencies)) are done running, the argument is
31 validated, and the job is ready to execute.
321. `JobState.Started`. The argument has been validated, the job has been called and is running.
33 This is handled by the job itself (or `createJobHandler()`).
341. `JobState.Ended`. The job has ended and is done running. This is handled by the job itself (or
35 `createJobHandler()`).
361. `JobState.Errored`. A unrecoverable error happened.
37
38Each state (except `Queued`) corresponds to a `JobOutboundMessage` on the `outboundBus` observable
39that triggers the state change. The `Scheduler` emits the `Ready` and `Errored` messages; the job
40implementation should not emit them, and if it does they are filtered out. You can listen for
41these messages or use the corresponding state member.
42
43The job implementation should emit the `Start` and `End` messages when it is starting the job logic
44itself. Only the first `Start` and `End` messages will be forwarded. Any more will be filtered out.
45
46The `Queued` state is set as the job is scheduled, so there is no need to listen for the message.
47
48## `Job<OutputType>` Object
49
50The `Job` object that is returned when you schedule a job provides access to the job's status and
51utilities for tracking and modifying the job.
52
531. `id`. A unique symbol that can be used as a Map key.
541. `description`. The description of the job from the scheduler. See `JobDescription` object.
551. `argument`. The argument value that was used to start the job.
561. `input`. An `Observer` that can be used to send validated inputs to the job itself.
571. `output`. An `Observable<OutputType>` that filters out messages to get only the returned output
58 of a job.
591. `promise`. A promise that waits for the last output of a job. Returns the last value outputted
60 (or no value if there's no last value).
611. `state`. The current state of the job (see `LifeCycle`).
621. `channels`. A map of side channels the user can listen to as `Observable`.
631. `ping()`. A function that can be used to ping the job, receiving a `Promise` for when the ping
64 is answered.
651. `stop()`. Sends a `stop` input to the job, which suggests to stop the job. The job itself can
66 choose to ignore this message.
671. `inboundBus`. The raw input `Observer<JobInboundMessage>`. This can be used to send messages to
68 the `context.inboundBus` observable in the job. These are `JobInboundMessage` messages. See
69 ["Communicating With Jobs"](#Communicating).
701. `outboundBus`. The raw output `Observable<JobOutput>`. This can be used to listen to messages
71 from the job. See ["Communicating With Jobs"](#Communicating).
72
73## `JobHandlerContext<I, O>` Object
74
75The `JobHandlerContext<>` is passed to the job handler code in addition to its argument. The
76context contains the following members:
77
781. `description`. The description of the job. Its name and schemas.
791. `scheduler`. A `Scheduler<>` instance that can be used to create additional jobs.
801. `dependencies`. A generic list of other job instances that were run as dependencies when
81 scheduling this job. Their `id` is not guaranteed to match the `id` of the `Job<>` instance
82 itself (those `Job<>`s might just be proxies). The state of those `Job<>` is guaranteed to be
83 `JobState.Ended`, as `JobState.Errored` would have prevented this handler from running.
841. `inboundBus`. The raw input observable, complement of the `inboundBus` observer from the `Job<>`.
85
86# Examples
87
88An example of a job that adds all input together and return the output value. We use a
89simple synchronous job registry and a simple job scheduler.
90
91```typescript
92import { jobs } from '@angular-devkit/core';
93
94const add = jobs.createJobHandle<number[], number>((input) =>
95 input.reduce((total, curr) => total + curr, 0),
96);
97
98// Register the job in a SimpleJobRegistry. Different registries have different API.
99const registry = new jobs.SimpleJobRegistry();
100const scheduler = new jobs.SimpleScheduler(registry);
101registry.register(add, {
102 name: 'add',
103 input: { type: 'array', items: { type: 'number' } },
104 output: { type: 'number' },
105});
106
107scheduler
108 .schedule('add', [1, 2, 3, 4])
109 .promise.then((output) => console.log('1 + 2 + 3 + 4 is ' + output));
110```
111
112# Creating Jobs
113
114A job is at its core a function with a description object attached to it. The description object
115stores the JSON schemas used to validate the types of the argument passed in, the input and
116output values. By default, a job accepts and can output any JSON object.
117
118```typescript
119import { Observable } from 'rxjs';
120import { jobs } from '@angular-devkit/core';
121
122const argument = {
123 type: 'array',
124 items: { type: 'number' },
125};
126const output = {
127 type: 'number',
128};
129
130export function add(argument: number[]): Observable<jobs.JobOutboundMessage<number>> {
131 return new Observable((o) => {
132 o.next({ kind: jobs.JobOutboundMessageKind.Start });
133 o.next({
134 kind: jobs.JobOutboundMessageKind.Output,
135 output: argument.reduce((total, curr) => total + curr, 0),
136 });
137 o.next({ kind: jobs.JobOutboundMessageKind.End });
138 o.complete();
139 });
140}
141
142// Add a property to `add` to make it officially a JobHandler. The Job system does not recognize
143// any function as a JobHandler.
144add.jobDescription = {
145 argument: argument,
146 output: output,
147};
148
149// Call the job with an array as argument, and log its output.
150declare const scheduler: jobs.Scheduler;
151scheduler.schedule('add', [1, 2, 3, 4]).output.subscribe((x) => console.log(x)); // Will output 10.
152```
153
154This is a lot of boilerplate, so we made some helpers to improve readability and manage argument,
155input and output automatically:
156
157```typescript
158// Add is a JobHandler function, like the above.
159export const add = jobs.createJobHandler<number[], number>((argument) =>
160 argument.reduce((total, curr) => total + curr, 0),
161);
162
163// Schedule like above.
164```
165
166You can also return a Promise or an Observable, as jobs are asynchronous. This helper will set
167start and end messages appropriately. It will also manage channels automatically (see below).
168
169A more complex job can be declared like this:
170
171```typescript
172import { Observable } from 'rxjs';
173import { jobs } from '@angular-devkit/core';
174
175// Show progress with each count in a separate output channel. Output "more" in a channel.
176export const count = jobs.createJobHandler<number, number>(
177 // Receive a context that contains additional methods to create channels.
178 (argument: number, { createChannel }) =>
179 new Observable<number>((o) => {
180 const side = createChannel('side', { type: 'string', const: 'more' });
181 const progress = createChannel('progress', { type: 'number' });
182 let i = 0;
183 function doCount() {
184 o.next(i++);
185 progress.next(i / argument);
186 side.next('more');
187
188 if (i < argument) {
189 setTimeout(doCount, 100);
190 } else {
191 o.complete();
192 }
193 }
194 setTimeout(doCount, 100);
195 }),
196 {
197 argument: { type: 'number' },
198 output: { type: 'number' },
199 },
200);
201
202// Get a hold of a scheduler that refers to the job above.
203declare const scheduler: jobs.Scheduler;
204
205const job = scheduler.schedule('count', 0);
206job.getChannel('side').subscribe((x) => console.log(x));
207// You can type a channel too. Messages will be filtered out.
208job.getChannel<number>('progress', { type: 'number' }).subscribe((x) => console.log(x));
209```
210
211## <a name="Communicating"></a>Communicating With Jobs
212
213Jobs can be started and updated in a separate process or thread, and as such communication with a
214job should avoid using global objects (which might not be shared). The jobs API and schedulers
215provide 2 communication streams (one for input and the other for output), named `inboundBus` and
216`outboundBus`.
217
218### Raw Input Stream
219
220The `schedule()` function returns a `Job<>` interface that contains a `inboundBus` member of type
221`Observer<JobInboundMessage>`. All messages sent _to_ the job goes through this stream. The `kind`
222member of the `JobInboundMessage` interface dictates what kind of message it is sending:
223
2241. `JobInboundMessageKind.Ping`. A simple message that should be answered with
225 `JobOutboundMessageKind.Pong` when the job is responsive. The `id` field of the message should
226 be used when returning `Pong`.
2271. `JobInboundMessageKind.Stop`. The job should be stopped. This is used when
228 cancelling/unsubscribing from the `output` (or by calling `stop()`). Any inputs or outputs
229 after this message will be ignored.
2301. `JobInboundMessageKind.Input` is used when sending inputs to a job. These correspond to the
231 `next` methods of an `Observer` and are reported to the job through its `context.input`
232 Observable. There is no way to communicate an error to the job.
233
234Using the `createJobHandler()` helper, all those messages are automatically handled by the
235boilerplate code. If you need direct access to raw inputs, you should subscribe to the
236`context.inboundBus` Observable.
237
238### Raw Output Stream
239
240The `Job<>` interface also contains a `outboundBus` member (of type
241`Observable<JobOutboundMessage<O>>` where `O` is the typed output of the job) which is the output
242complement of `inboundBus`. All messages sent _from_ the job goes through this stream. The `kind`
243member of the `JobOutboundMessage<O>` interface dictates what kind of message it is sending:
244
2451. `JobOutboundMessageKind.Create`. The `Job<>` was created, its dependencies are done, and the
246 library is validating Argument and calling the internal job code.
2471. `JobOutboundMessageKind.Start`. The job code itself should send that message when started.
248 `createJobHandler()` will do it automatically.
2491. `JobOutboundMessageKind.End`. The job has ended. This is done by the job itself and should always
250 be sent when completed. The scheduler will listen to this message to set the state and unblock
251 dependent jobs. `createJobHandler()` automatically send this message.
2521. `JobOutboundMessageKind.Pong`. The job should answer a `JobInboundMessageKind.Ping` message with
253 this. Automatically done by `createJobHandler()`.
2541. `JobOutboundMessageKind.Output`. An `Output` has been generated by the job.
2551. `JobOutboundMessageKind.ChannelMessage`, `JobOutboundMessageKind.ChannelError` and
256 `JobOutboundMessageKind.ChannelComplete` are used for output channels. These correspond to the
257 `next`, `error` and `complete` methods of an `Observer` and are available to the callee through
258 the `job.channels` map of Observable.
259
260Those messages can be accessed directly through the `job.outboundBus` member. The job itself should
261return an `Observable<JobOutboundMessage<O>>`. The `createJobHandler()` helper handles most of use
262cases of this and makes it easier for jobs to handle this.
263
264## Job Dispatchers
265
266Dispatchers are a helper that redirect to different jobs given conditions. To create a job
267dispatcher, use the `createDispatcher()` function:
268
269```typescript
270import { jobs } from '@angular-devkit/core';
271
272// A dispatcher that installs node modules given a user's preference.
273const dispatcher = jobs.createDispatcher({
274 name: 'node-install',
275 argument: { properties: { moduleName: { type: 'string' } } },
276 output: { type: 'boolean' },
277});
278
279const npmInstall = jobs.createJobHandler(/* ... */, { name: 'npm-install' });
280const yarnInstall = jobs.createJobHandler(/* ... */, { name: 'yarn-install' });
281const pnpmInstall = jobs.createJobHandler(/* ... */, { name: 'pnpm-install' });
282
283declare const registry: jobs.SimpleJobRegistry;
284registry.register(dispatcher);
285registry.register(npmInstall);
286registry.register(yarnInstall);
287registry.register(pnpmInstall);
288
289// Default to npm.
290dispatcher.setDefaultDelegate(npmInstall.name);
291// If the user is asking for yarn over npm, uses it.
292dispatcher.addConditionalDelegate(() => userWantsYarn, yarnInstall.name);
293```
294
295## Execution Strategy
296
297Jobs are always run in parallel and will always start, but many helper functions are provided
298when creating a job to help you control the execution strategy;
299
3001. `serialize()`. Multiple runs of this job will be queued with each others.
3011. `memoize(replayMessages = false)` will create a job, or reuse the same job when inputs are
302 matching. If the inputs don't match, a new job will be started and its outputs will be stored.
303
304These strategies can be used when creating the job:
305
306```typescript
307// Same input and output as above.
308
309export const add = jobs.strategy.memoize()(
310 jobs.createJobHandler<number[], number>((argument) =>
311 argument.reduce((total, curr) => total + curr, 0),
312 ),
313);
314```
315
316Strategies can be reused to synchronize between jobs. For example, given jobs `jobA` and `jobB`,
317you can reuse the strategy to serialize both jobs together;
318
319```typescript
320const strategy = jobs.strategy.serialize();
321const jobA = strategy(jobs.createJobHandler(...));
322const jobB = strategy(jobs.createJobHandler(...));
323```
324
325Even further, we can have package A and package B run in serialization, and B and C also be
326serialized. Running A and C will run in parallel, while running B will wait for both A and C
327to finish.
328
329```typescript
330const strategy1 = jobs.strategy.serialize();
331const strategy2 = jobs.strategy.serialize();
332const jobA = strategy1(jobs.createJobHandler(...));
333const jobB = strategy1(strategy2(jobs.createJobHandler(...)));
334const jobC = strategy2(jobs.createJobHandler(...));
335```
336
337# Scheduling Jobs
338
339Jobs can be scheduled using a `Scheduler` interface, which contains a `schedule()` method:
340
341```typescript
342interface Scheduler {
343 /**
344 * Schedule a job to be run, using its name.
345 * @param name The name of job to be run.
346 * @param argument The argument to send to the job when starting it.
347 * @param options Scheduling options.
348 * @returns The Job being run.
349 */
350 schedule<I extends MinimumInputValueT, O extends MinimumOutputValueT>(
351 name: JobName,
352 argument: I,
353 options?: ScheduleJobOptions,
354 ): Job<JsonValue, O>;
355}
356```
357
358The scheduler also has a `getDescription()` method to get a `JobDescription` object for a certain
359name; that description contains schemas for the argument, input, output, and other channels:
360
361```typescript
362interface Scheduler {
363 /**
364 * Get a job description for a named job.
365 *
366 * @param name The name of the job.
367 * @returns A description, or null if the job cannot be scheduled.
368 */
369 getDescription(name: JobName): JobDescription | null;
370
371 /**
372 * Returns true if the job name has been registered.
373 * @param name The name of the job.
374 * @returns True if the job exists, false otherwise.
375 */
376 has(name: JobName): boolean;
377}
378```
379
380Finally, the scheduler interface has a `pause()` method to stop scheduling. This will queue all
381jobs and wait for the unpause function to be called before unblocking all the jobs scheduled.
382This does not affect already running jobs.
383
384```typescript
385interface Scheduler {
386 /**
387 * Pause the scheduler, temporary queueing _new_ jobs. Returns a resume function that should be
388 * used to resume execution. If multiple `pause()` were called, all their resume functions must
389 * be called before the Scheduler actually starts new jobs. Additional calls to the same resume
390 * function will have no effect.
391 *
392 * Jobs already running are NOT paused. This is pausing the scheduler only.
393 *
394 * @returns A function that can be run to resume the scheduler. If multiple `pause()` calls
395 * were made, all their return function must be called (in any order) before the
396 * scheduler can resume.
397 */
398 pause(): () => void;
399}
400```
401
402## <a name="Dependencies"></a>Synchronizing and Dependencies
403
404When scheduling jobs, it is often necessary to run jobs after certain other jobs are finished.
405This is done through the `dependencies` options in the `schedule()` method.
406
407These jobs will also be passed to the job being scheduled, through its context. This can be
408useful if, for example, the output of those jobs are of a known type, or have known side channels.
409
410An example of this would be a compiler that needs to know the output directory of other compilers
411before it, in a tool chain.
412
413### Dependencies
414
415When scheduling jobs, the user can add a `dependencies` field to the scheduling options. The
416scheduler will wait for those dependencies to finish before running the job, and pass those jobs
417in the context of the job.
418
419### Accessing Dependencies
420
421Jobs are called with a `JobHandlerContext` as a second argument, which contains a
422`dependencies: Job<JsonValue>[]` member which contains all dependencies that were used when
423scheduling the job. Those aren't fully typed as they are determined by the user, and not the job
424itself. They also can contain jobs that are not finished, and the job should use the `state`
425member of the job itself before trying to access its content.
426
427### Scheduler Sub Jobs
428
429The `JobHandlerContext` also contains a `scheduler` member which can be used to schedule jobs
430using the same scheduler that was used for the job. This allows jobs to call other jobs
431and wait for them to end.
432
433## Available Schedulers
434
435The Core Angular DevKit library provides 2 implementations for the `Scheduler` interface:
436
437## SimpleJobRegistry
438
439Available in the jobs namespace. A registry that accept job registration, and can also schedule
440jobs.
441
442```typescript
443import { jobs } from '@angular-devkit/core';
444
445const add = jobs.createJobHandler<number[], number>((argument) =>
446 argument.reduce((total, curr) => total + curr, 0),
447);
448
449// Register the job in a SimpleJobRegistry. Different registries have different API.
450const registry = new jobs.SimpleJobRegistry();
451const scheduler = new SimpleJobScheduler(registry);
452
453registry.register(add, {
454 name: 'add',
455 argument: { type: 'array', items: { type: 'number' } },
456 output: { type: 'number' },
457});
458
459scheduler.schedule('add', [1, 2, 3, 4]);
460```
461
462## NodeModuleJobRegistry
463
464Available through `@angular-devkit/core/node`.
465
466A scheduler that loads jobs using their node package names. These jobs need to use the
467`createJobHandler()` helper and report their argument/input/output schemas that way.
468
469```typescript
470declare const registry: NodeModuleJobRegistry;
471const scheduler = new SimpleJobScheduler(registry);
472
473scheduler.schedule('some-node-package#someExport', 'input');
474```
475
476# Gotchas
477
4781. Deadlocking Dependencies
479 It is impossible to add dependencies to an already running job, but it is entirely possible to
480 get locked between jobs. Be aware of your own dependencies.
481
4821. Using `job.promise`
483 `job.promise` waits for the job to ends. Don't rely on it unless you know the job is not
484 watching and running for a long time. If you aren't sure, use
485 `job.output.pipe(first()).toPromise()` instead which will return the first next output,
486 regardless of whether the job watches and rerun or not.
487
488# FAQ
489
4901. Laziness
491 A job is lazy until executed, but its messages will be replayed when resubscribed.
492
4931. Serialize Strategy vs Dependencies
494 Strategies are functions that transform the execution of a job, and can be used when
495 declaring the job, or registering it. Dependencies, on the other hand, are listed when
496 scheduling a job to order jobs during scheduling.
497
498 A job has no control over the way it's scheduled, and its dependencies. It can, however,
499 declare that it shouldn't run at the same time as itself. Alternatively, a user could
500 schedule a job twice and imply that the second run should wait for the first to finish. In
501 practice, this would be equivalent to having the job be serialized, but the important detail
502 is in _whom_ is defining the rules; using the `serialize()` strategy, the job implementation
503 is, while when using dependencies, the user is.
504
505 The user does not need to know how to job needs to synchronize with itself, and the job does
506 not need to know how it synchronizes with other jobs that it doesn't know about. That's part
507 of the strength of this system as every job can be developed in a vacuum, only caring about
508 its contracts (argument, input and output) and its own synchronization.