1 | # Overview
|
2 |
|
3 | Jobs is a high-order API that adds inputs, runtime type checking, sequencing, and other
|
4 | functionality on top of RxJS' `Observable`s.
|
5 |
|
6 | # Background
|
7 |
|
8 | An `Observable` (at a higher level) is a function that receives a `Subscriber`, and outputs
|
9 | multiple values, and finishes once it calls the `Subscriber.prototype.complete()` method (in
|
10 | JavaScript):
|
11 |
|
12 | ```javascript
|
13 | const output1To10EverySecond = function (subscriber) {
|
14 | let t = 0;
|
15 | const i = setInterval(() => {
|
16 | t++;
|
17 | subscriber.next(t);
|
18 | if (t === 10) {
|
19 | subscriber.complete(t);
|
20 | }
|
21 | }, 1000);
|
22 | return () => clearInterval(i);
|
23 | };
|
24 |
|
25 | const stream$ = new Observable(output1To10EverySecond);
|
26 | // Start the function, and output 1 to 100, once per line.
|
27 | stream$.subscribe((x) => console.log(x));
|
28 | ```
|
29 |
|
30 | This, of course, can be typed in TypeScript, but those types are not enforced at runtime.
|
31 |
|
32 | # Glossary
|
33 |
|
34 | - `job handler`. The function that implements the job's logic.
|
35 | - `raw input`. The input observable sending messages to the job. These messages are of type
|
36 | `JobInboundMessage`.
|
37 | - `raw output`. The output observer returned from the `job handler`. Messages on this observable
|
38 | are of type `JobOutboundMessage`.
|
39 |
|
40 | # Description
|
41 |
|
42 | A `JobHandler`, similar to observables, is a function that receives an argument and a context, and
|
43 | returns an `Observable` of messages, which can include outputs that are typed at runtime (using a
|
44 | Json Schema):
|
45 |
|
46 | ```javascript
|
47 | const output1ToXEverySecond = function (x, context) {
|
48 | return new Observable((subscriber) => {
|
49 | let t = 0;
|
50 |
|
51 | // Notify our users that the actual work is started.
|
52 | subscriber.next({ kind: JobOutboundMessageKind.Start });
|
53 | const i = setInterval(() => {
|
54 | t++;
|
55 | subscriber.next({ kind: JobOutboundMessageKind.Output, value: t });
|
56 | if (t === x) {
|
57 | subscriber.next({ kind: JobOutboundMessageKind.End });
|
58 | subscriber.complete();
|
59 | }
|
60 | }, 1000);
|
61 |
|
62 | return () => {
|
63 | clearInterval(i);
|
64 | };
|
65 | });
|
66 | };
|
67 |
|
68 | // For now, jobs can not be called without a registry and scheduler.
|
69 | const registry = new SimpleJobRegistry();
|
70 | registry.register('output-from-1-to-x', output1ToXEverySecond, {
|
71 | argument: { type: 'number' },
|
72 | output: { type: 'number' },
|
73 | });
|
74 | const scheduler = new SimpleScheduler(registry);
|
75 |
|
76 | // Need to keep the same name that the registry would understand.
|
77 | // Count from 1 to 10.
|
78 | const job = scheduler.schedule('output-from-1-to-x', 10);
|
79 |
|
80 | // A Job<> instance has more members, but we only want the output values here.
|
81 | job.output.subscribe((x) => console.log(x));
|
82 | ```
|
83 |
|
84 | This seems like a lot of boilerplate in comparison, but there are a few advantages;
|
85 |
|
86 | 1. lifecycle. Jobs can tell when they start doing work and when work is done.
|
87 | 1. everything is typed, even at runtime.
|
88 | 1. the context also contains an input Observable that receives typed input messages, including
|
89 | input values, and stop requests.
|
90 | 1. jobs can also schedule other jobs and wait for them, even if they don't know if a job is
|
91 | implemented in the system.
|
92 |
|
93 | ## Diagram
|
94 |
|
95 | A simpler way to think about jobs in contrast to observables is that job are closer to a Unix
|
96 | process. It has an argument (command line flags), receive inputs (STDIN and interrupt signals),
|
97 | and output values (STDOUT) as well as diagnostic (STDERR). They can be plugged one into another
|
98 | (piping), and can be transformed, synchronized and scheduled (fork, exec, cron).
|
99 |
|
100 | ```plain
|
101 | - given A the type of the argument
|
102 | - given I the type of the input
|
103 | - given O the type of the output
|
104 |
|
105 | ,______________________
|
106 | JobInboundMessage<I> --> | handler(argument: A) | --> JobOutboundMessage<O>
|
107 | - JobOutboundMessageKind.Output
|
108 | - ...
|
109 | ```
|
110 |
|
111 | `JobInboundMessage` includes:
|
112 |
|
113 | 1. `JobInboundMessageKind.Ping`. A simple message that should be answered with
|
114 | `JobOutboundMessageKind.Pong` when the job is responsive. The `id` field of the message should
|
115 | be used when returning `Pong`.
|
116 | 1. `JobInboundMessageKind.Stop`. The job should be stopped. This is used when
|
117 | cancelling/unsubscribing from the `output` (or by calling `stop()`). Any inputs or outputs
|
118 | after this message will be ignored.
|
119 | 1. `JobInboundMessageKind.Input` is used when sending inputs to a job. These correspond to the
|
120 | `next` methods of an `Observer` and are reported to the job through its `context.input`
|
121 | Observable. There is no way to communicate an error to the job.
|
122 |
|
123 | `JobOutboundMessage` includes:
|
124 |
|
125 | 1. `JobOutboundMessageKind.Ready`. The `Job<>` was created, its dependencies are done, and the
|
126 | library is validating Argument and calling the internal job code.
|
127 | 1. `JobOutboundMessageKind.Start`. The job code itself should send that message when started.
|
128 | `createJobHandler()` will do it automatically.
|
129 | 1. `JobOutboundMessageKind.End`. The job has ended. This is done by the job itself and should
|
130 | always be sent when completed. The scheduler will listen to this message to set the state and
|
131 | unblock dependent jobs. `createJobHandler()` automatically send this message.
|
132 | 1. `JobOutboundMessageKind.Pong`. The job should answer a `JobInboundMessageKind.Ping` message with
|
133 | this. Automatically done by `createJobHandler()`.
|
134 | 1. `JobOutboundMessageKind.Output`. An `Output` has been generated by the job.
|
135 | 1. `JobOutboundMessageKind.ChannelMessage`, `JobOutboundMessageKind.ChannelError` and
|
136 | `JobOutboundMessageKind.ChannelComplete` are used for output channels. These correspond to
|
137 | the `next`, `error` and `complete` methods of an `Observer` and are available to the callee
|
138 | through the `job.channels` map of Observable.
|
139 |
|
140 | Utilities should have some filtering and dispatching to separate observables, as a convenience for
|
141 | the user. An example of this would be the `Job.prototype.output` observable which only contains
|
142 | the value contained by messages of type `JobOutboundMessageKind.Output`.
|
143 |
|
144 | # Higher Order Jobs
|
145 |
|
146 | Because jobs are expected to be pure functions, they can be composed or transformed to create
|
147 | more complex behaviour, similar to how RxJS operators can transform observables.
|
148 |
|
149 | ```javascript
|
150 | // Runs a job on the hour, every hour, regardless of how long the job takes.
|
151 | // This creates a job function that can be registered by itself.
|
152 | function scheduleJobOnTheHour(jobFunction) {
|
153 | return function (argument, context) {
|
154 | return new Observable((observer) => {
|
155 | let timeout = 0;
|
156 |
|
157 | function _timeoutToNextHour() {
|
158 | // Just wait until the next hour.
|
159 | const t = new Date();
|
160 | const secondsToNextHour = 3600 - t.getSeconds() - t.getMinutes() * 60;
|
161 | timeout = setTimeout(_scheduleJobAndWaitAnHour, secondsToNextHour);
|
162 | }
|
163 |
|
164 | function _scheduleJobAndWaitAnHour() {
|
165 | jobFunction(argument, context).subscribe(
|
166 | (message) => observer.next(message),
|
167 | (error) => observer.error(error),
|
168 | // Do not forward completion, but use it to schedule the next job run.
|
169 | () => {
|
170 | _timeoutToNextHour();
|
171 | },
|
172 | );
|
173 | }
|
174 |
|
175 | // Kick off by waiting for next hour.
|
176 | _timeoutToNextHour();
|
177 |
|
178 | return () => clearTimeout(timeout);
|
179 | });
|
180 | };
|
181 | }
|
182 | ```
|
183 |
|
184 | Another way to compose jobs is to schedule jobs based on their name, from other jobs.
|
185 |
|
186 | ```javascript
|
187 | // Runs a job on the hour, every hour, regardless of how long the job takes.
|
188 | // This creates a high order job by getting a job name and an argument, and scheduling the job
|
189 | // every hour.
|
190 | function scheduleJobOnTheHour(job, context) {
|
191 | const { name, argument } = job; // Destructure our input.
|
192 |
|
193 | return new Observable((observer) => {
|
194 | let timeout = 0;
|
195 |
|
196 | function _timeoutToNextHour() {
|
197 | // Just wait until the next hour.
|
198 | const t = new Date();
|
199 | const secondsToNextHour = 3600 - t.getSeconds() - t.getMinutes() * 60;
|
200 | timeout = setTimeout(_scheduleJobAndWaitAnHour, secondsToNextHour);
|
201 | }
|
202 |
|
203 | function _scheduleJobAndWaitAnHour() {
|
204 | const subJob = context.scheduler.schedule(name, argument);
|
205 | // We do not forward the input to the sub-job but that would be a valid example as well.
|
206 | subJob.outboundBus.subscribe(
|
207 | (message) => observer.next(message),
|
208 | (error) => observer.error(error),
|
209 | // Do not forward completion, but use it to schedule the next job run.
|
210 | () => {
|
211 | _timeoutToNextHour();
|
212 | },
|
213 | );
|
214 | }
|
215 |
|
216 | // Kick off by waiting for next hour.
|
217 | _timeoutToNextHour();
|
218 |
|
219 | return () => clearTimeout(timeout);
|
220 | });
|
221 | }
|
222 |
|
223 | const registry = new SimpleJobRegistry();
|
224 | registry.register('schedule-job-on-the-hour', scheduleJobOnTheHour, {
|
225 | argument: {
|
226 | properties: {
|
227 | name: { type: 'string' },
|
228 | argument: { type: true },
|
229 | },
|
230 | },
|
231 | });
|
232 |
|
233 | // Implementation left to the reader.
|
234 | registry.register('copy-files-from-a-to-b', require('some-package/copy-job'));
|
235 |
|
236 | const scheduler = new SimpleScheduler(registry);
|
237 |
|
238 | // A rudimentary backup system.
|
239 | const job = scheduler.schedule('schedule-job-on-the-hour', {
|
240 | name: 'copy-files-from-a-to-b',
|
241 | argument: {
|
242 | from: '/some-directory/to/backup',
|
243 | to: '/volumes/usb-key',
|
244 | },
|
245 | });
|
246 | job.output.subscribe((x) => console.log(x));
|
247 | ```
|
248 |
|
249 | # Limitations
|
250 |
|
251 | Jobs input, output and argument must be serializable to JSONs. This is a big limitation in usage,
|
252 | but comes with the benefit that jobs can be serialized and called across memory boundaries. An
|
253 | example would be an operator that takes a module path and run the job from that path in a separate
|
254 | process. Or even a separate server, using HTTP calls.
|
255 |
|
256 | Another limitation is that the boilerplate is complex. Manually managing start/end life cycle, and
|
257 | other messages such as ping/pong, etc. is tedious and requires a lot of code. A good way to keep
|
258 | this limitation under control is to provide helpers to create `JobHandler`s which manage those
|
259 | messages for the developer. A simple handler could be to get a `Promise` and return the output of
|
260 | that promise automatically.
|