UNPKG

9.96 kBMarkdownView Raw
1# Overview
2
3Jobs is a high-order API that adds inputs, runtime type checking, sequencing, and other
4functionality on top of RxJS' `Observable`s.
5
6# Background
7
8An `Observable` (at a higher level) is a function that receives a `Subscriber`, and outputs
9multiple values, and finishes once it calls the `Subscriber.prototype.complete()` method (in
10JavaScript):
11
12```javascript
13const output1To10EverySecond = function (subscriber) {
14 let t = 0;
15 const i = setInterval(() => {
16 t++;
17 subscriber.next(t);
18 if (t === 10) {
19 subscriber.complete(t);
20 }
21 }, 1000);
22 return () => clearInterval(i);
23};
24
25const stream$ = new Observable(output1To10EverySecond);
26// Start the function, and output 1 to 100, once per line.
27stream$.subscribe((x) => console.log(x));
28```
29
30This, of course, can be typed in TypeScript, but those types are not enforced at runtime.
31
32# Glossary
33
34- `job handler`. The function that implements the job's logic.
35- `raw input`. The input observable sending messages to the job. These messages are of type
36 `JobInboundMessage`.
37- `raw output`. The output observer returned from the `job handler`. Messages on this observable
38 are of type `JobOutboundMessage`.
39
40# Description
41
42A `JobHandler`, similar to observables, is a function that receives an argument and a context, and
43returns an `Observable` of messages, which can include outputs that are typed at runtime (using a
44Json Schema):
45
46```javascript
47const output1ToXEverySecond = function (x, context) {
48 return new Observable((subscriber) => {
49 let t = 0;
50
51 // Notify our users that the actual work is started.
52 subscriber.next({ kind: JobOutboundMessageKind.Start });
53 const i = setInterval(() => {
54 t++;
55 subscriber.next({ kind: JobOutboundMessageKind.Output, value: t });
56 if (t === x) {
57 subscriber.next({ kind: JobOutboundMessageKind.End });
58 subscriber.complete();
59 }
60 }, 1000);
61
62 return () => {
63 clearInterval(i);
64 };
65 });
66};
67
68// For now, jobs can not be called without a registry and scheduler.
69const registry = new SimpleJobRegistry();
70registry.register('output-from-1-to-x', output1ToXEverySecond, {
71 argument: { type: 'number' },
72 output: { type: 'number' },
73});
74const scheduler = new SimpleScheduler(registry);
75
76// Need to keep the same name that the registry would understand.
77// Count from 1 to 10.
78const job = scheduler.schedule('output-from-1-to-x', 10);
79
80// A Job<> instance has more members, but we only want the output values here.
81job.output.subscribe((x) => console.log(x));
82```
83
84This seems like a lot of boilerplate in comparison, but there are a few advantages;
85
861. lifecycle. Jobs can tell when they start doing work and when work is done.
871. everything is typed, even at runtime.
881. the context also contains an input Observable that receives typed input messages, including
89 input values, and stop requests.
901. jobs can also schedule other jobs and wait for them, even if they don't know if a job is
91 implemented in the system.
92
93## Diagram
94
95A simpler way to think about jobs in contrast to observables is that job are closer to a Unix
96process. It has an argument (command line flags), receive inputs (STDIN and interrupt signals),
97and output values (STDOUT) as well as diagnostic (STDERR). They can be plugged one into another
98(piping), and can be transformed, synchronized and scheduled (fork, exec, cron).
99
100```plain
101- given A the type of the argument
102- given I the type of the input
103- given O the type of the output
104
105 ,______________________
106 JobInboundMessage<I> --> | handler(argument: A) | --> JobOutboundMessage<O>
107 - JobOutboundMessageKind.Output
108 - ...
109```
110
111`JobInboundMessage` includes:
112
1131. `JobInboundMessageKind.Ping`. A simple message that should be answered with
114 `JobOutboundMessageKind.Pong` when the job is responsive. The `id` field of the message should
115 be used when returning `Pong`.
1161. `JobInboundMessageKind.Stop`. The job should be stopped. This is used when
117 cancelling/unsubscribing from the `output` (or by calling `stop()`). Any inputs or outputs
118 after this message will be ignored.
1191. `JobInboundMessageKind.Input` is used when sending inputs to a job. These correspond to the
120 `next` methods of an `Observer` and are reported to the job through its `context.input`
121 Observable. There is no way to communicate an error to the job.
122
123`JobOutboundMessage` includes:
124
1251. `JobOutboundMessageKind.Ready`. The `Job<>` was created, its dependencies are done, and the
126 library is validating Argument and calling the internal job code.
1271. `JobOutboundMessageKind.Start`. The job code itself should send that message when started.
128 `createJobHandler()` will do it automatically.
1291. `JobOutboundMessageKind.End`. The job has ended. This is done by the job itself and should
130 always be sent when completed. The scheduler will listen to this message to set the state and
131 unblock dependent jobs. `createJobHandler()` automatically send this message.
1321. `JobOutboundMessageKind.Pong`. The job should answer a `JobInboundMessageKind.Ping` message with
133 this. Automatically done by `createJobHandler()`.
1341. `JobOutboundMessageKind.Output`. An `Output` has been generated by the job.
1351. `JobOutboundMessageKind.ChannelMessage`, `JobOutboundMessageKind.ChannelError` and
136 `JobOutboundMessageKind.ChannelComplete` are used for output channels. These correspond to
137 the `next`, `error` and `complete` methods of an `Observer` and are available to the callee
138 through the `job.channels` map of Observable.
139
140Utilities should have some filtering and dispatching to separate observables, as a convenience for
141the user. An example of this would be the `Job.prototype.output` observable which only contains
142the value contained by messages of type `JobOutboundMessageKind.Output`.
143
144# Higher Order Jobs
145
146Because jobs are expected to be pure functions, they can be composed or transformed to create
147more complex behaviour, similar to how RxJS operators can transform observables.
148
149```javascript
150// Runs a job on the hour, every hour, regardless of how long the job takes.
151// This creates a job function that can be registered by itself.
152function scheduleJobOnTheHour(jobFunction) {
153 return function (argument, context) {
154 return new Observable((observer) => {
155 let timeout = 0;
156
157 function _timeoutToNextHour() {
158 // Just wait until the next hour.
159 const t = new Date();
160 const secondsToNextHour = 3600 - t.getSeconds() - t.getMinutes() * 60;
161 timeout = setTimeout(_scheduleJobAndWaitAnHour, secondsToNextHour);
162 }
163
164 function _scheduleJobAndWaitAnHour() {
165 jobFunction(argument, context).subscribe(
166 (message) => observer.next(message),
167 (error) => observer.error(error),
168 // Do not forward completion, but use it to schedule the next job run.
169 () => {
170 _timeoutToNextHour();
171 },
172 );
173 }
174
175 // Kick off by waiting for next hour.
176 _timeoutToNextHour();
177
178 return () => clearTimeout(timeout);
179 });
180 };
181}
182```
183
184Another way to compose jobs is to schedule jobs based on their name, from other jobs.
185
186```javascript
187// Runs a job on the hour, every hour, regardless of how long the job takes.
188// This creates a high order job by getting a job name and an argument, and scheduling the job
189// every hour.
190function scheduleJobOnTheHour(job, context) {
191 const { name, argument } = job; // Destructure our input.
192
193 return new Observable((observer) => {
194 let timeout = 0;
195
196 function _timeoutToNextHour() {
197 // Just wait until the next hour.
198 const t = new Date();
199 const secondsToNextHour = 3600 - t.getSeconds() - t.getMinutes() * 60;
200 timeout = setTimeout(_scheduleJobAndWaitAnHour, secondsToNextHour);
201 }
202
203 function _scheduleJobAndWaitAnHour() {
204 const subJob = context.scheduler.schedule(name, argument);
205 // We do not forward the input to the sub-job but that would be a valid example as well.
206 subJob.outboundBus.subscribe(
207 (message) => observer.next(message),
208 (error) => observer.error(error),
209 // Do not forward completion, but use it to schedule the next job run.
210 () => {
211 _timeoutToNextHour();
212 },
213 );
214 }
215
216 // Kick off by waiting for next hour.
217 _timeoutToNextHour();
218
219 return () => clearTimeout(timeout);
220 });
221}
222
223const registry = new SimpleJobRegistry();
224registry.register('schedule-job-on-the-hour', scheduleJobOnTheHour, {
225 argument: {
226 properties: {
227 name: { type: 'string' },
228 argument: { type: true },
229 },
230 },
231});
232
233// Implementation left to the reader.
234registry.register('copy-files-from-a-to-b', require('some-package/copy-job'));
235
236const scheduler = new SimpleScheduler(registry);
237
238// A rudimentary backup system.
239const job = scheduler.schedule('schedule-job-on-the-hour', {
240 name: 'copy-files-from-a-to-b',
241 argument: {
242 from: '/some-directory/to/backup',
243 to: '/volumes/usb-key',
244 },
245});
246job.output.subscribe((x) => console.log(x));
247```
248
249# Limitations
250
251Jobs input, output and argument must be serializable to JSONs. This is a big limitation in usage,
252but comes with the benefit that jobs can be serialized and called across memory boundaries. An
253example would be an operator that takes a module path and run the job from that path in a separate
254process. Or even a separate server, using HTTP calls.
255
256Another limitation is that the boilerplate is complex. Manually managing start/end life cycle, and
257other messages such as ping/pong, etc. is tedious and requires a lot of code. A good way to keep
258this limitation under control is to provide helpers to create `JobHandler`s which manage those
259messages for the developer. A simple handler could be to get a `Promise` and return the output of
260that promise automatically.