1 | "use strict";
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 | Object.defineProperty(exports, "__esModule", { value: true });
|
10 | exports.SimpleScheduler = exports.JobOutputSchemaValidationError = exports.JobInboundMessageSchemaValidationError = exports.JobArgumentSchemaValidationError = void 0;
|
11 | const core_1 = require("@angular-devkit/core");
|
12 | const rxjs_1 = require("rxjs");
|
13 | const operators_1 = require("rxjs/operators");
|
14 | const api_1 = require("./api");
|
15 | const exception_1 = require("./exception");
|
16 | class JobArgumentSchemaValidationError extends core_1.schema.SchemaValidationException {
|
17 | constructor(errors) {
|
18 | super(errors, 'Job Argument failed to validate. Errors: ');
|
19 | }
|
20 | }
|
21 | exports.JobArgumentSchemaValidationError = JobArgumentSchemaValidationError;
|
22 | class JobInboundMessageSchemaValidationError extends core_1.schema.SchemaValidationException {
|
23 | constructor(errors) {
|
24 | super(errors, 'Job Inbound Message failed to validate. Errors: ');
|
25 | }
|
26 | }
|
27 | exports.JobInboundMessageSchemaValidationError = JobInboundMessageSchemaValidationError;
|
28 | class JobOutputSchemaValidationError extends core_1.schema.SchemaValidationException {
|
29 | constructor(errors) {
|
30 | super(errors, 'Job Output failed to validate. Errors: ');
|
31 | }
|
32 | }
|
33 | exports.JobOutputSchemaValidationError = JobOutputSchemaValidationError;
|
34 | function _jobShare() {
|
35 |
|
36 |
|
37 | return (source) => {
|
38 | let refCount = 0;
|
39 | let subject;
|
40 | let hasError = false;
|
41 | let isComplete = false;
|
42 | let subscription;
|
43 | return new rxjs_1.Observable((subscriber) => {
|
44 | let innerSub;
|
45 | refCount++;
|
46 | if (!subject) {
|
47 | subject = new rxjs_1.Subject();
|
48 | innerSub = subject.subscribe(subscriber);
|
49 | subscription = source.subscribe({
|
50 | next(value) {
|
51 | subject.next(value);
|
52 | },
|
53 | error(err) {
|
54 | hasError = true;
|
55 | subject.error(err);
|
56 | },
|
57 | complete() {
|
58 | isComplete = true;
|
59 | subject.complete();
|
60 | },
|
61 | });
|
62 | }
|
63 | else {
|
64 | innerSub = subject.subscribe(subscriber);
|
65 | }
|
66 | return () => {
|
67 | refCount--;
|
68 | innerSub.unsubscribe();
|
69 | if (subscription && refCount === 0 && (isComplete || hasError)) {
|
70 | subscription.unsubscribe();
|
71 | }
|
72 | };
|
73 | });
|
74 | };
|
75 | }
|
76 |
|
77 |
|
78 |
|
79 | class SimpleScheduler {
|
80 | constructor(_jobRegistry, _schemaRegistry = new core_1.schema.CoreSchemaRegistry()) {
|
81 | this._jobRegistry = _jobRegistry;
|
82 | this._schemaRegistry = _schemaRegistry;
|
83 | this._internalJobDescriptionMap = new Map();
|
84 | this._queue = [];
|
85 | this._pauseCounter = 0;
|
86 | }
|
87 | _getInternalDescription(name) {
|
88 | const maybeHandler = this._internalJobDescriptionMap.get(name);
|
89 | if (maybeHandler !== undefined) {
|
90 | return (0, rxjs_1.of)(maybeHandler);
|
91 | }
|
92 | const handler = this._jobRegistry.get(name);
|
93 | return handler.pipe((0, operators_1.switchMap)((handler) => {
|
94 | if (handler === null) {
|
95 | return (0, rxjs_1.of)(null);
|
96 | }
|
97 | const description = {
|
98 |
|
99 | ...JSON.parse(JSON.stringify(handler.jobDescription)),
|
100 | name: handler.jobDescription.name || name,
|
101 | argument: handler.jobDescription.argument || true,
|
102 | input: handler.jobDescription.input || true,
|
103 | output: handler.jobDescription.output || true,
|
104 | channels: handler.jobDescription.channels || {},
|
105 | };
|
106 | const handlerWithExtra = Object.assign(handler.bind(undefined), {
|
107 | jobDescription: description,
|
108 | argumentV: this._schemaRegistry.compile(description.argument).pipe((0, operators_1.shareReplay)(1)),
|
109 | inputV: this._schemaRegistry.compile(description.input).pipe((0, operators_1.shareReplay)(1)),
|
110 | outputV: this._schemaRegistry.compile(description.output).pipe((0, operators_1.shareReplay)(1)),
|
111 | });
|
112 | this._internalJobDescriptionMap.set(name, handlerWithExtra);
|
113 | return (0, rxjs_1.of)(handlerWithExtra);
|
114 | }));
|
115 | }
|
116 | |
117 |
|
118 |
|
119 |
|
120 |
|
121 |
|
122 | getDescription(name) {
|
123 | return (0, rxjs_1.concat)(this._getInternalDescription(name).pipe((0, operators_1.map)((x) => x && x.jobDescription)), (0, rxjs_1.of)(null)).pipe((0, operators_1.first)());
|
124 | }
|
125 | |
126 |
|
127 |
|
128 |
|
129 |
|
130 | has(name) {
|
131 | return this.getDescription(name).pipe((0, operators_1.map)((x) => x !== null));
|
132 | }
|
133 | |
134 |
|
135 |
|
136 |
|
137 |
|
138 |
|
139 |
|
140 |
|
141 | pause() {
|
142 | let called = false;
|
143 | this._pauseCounter++;
|
144 | return () => {
|
145 | if (!called) {
|
146 | called = true;
|
147 | if (--this._pauseCounter == 0) {
|
148 |
|
149 | const q = this._queue;
|
150 | this._queue = [];
|
151 | q.forEach((fn) => fn());
|
152 | }
|
153 | }
|
154 | };
|
155 | }
|
156 | |
157 |
|
158 |
|
159 |
|
160 |
|
161 |
|
162 |
|
163 | schedule(name, argument, options) {
|
164 | if (this._pauseCounter > 0) {
|
165 | const waitable = new rxjs_1.Subject();
|
166 | this._queue.push(() => waitable.complete());
|
167 | return this._scheduleJob(name, argument, options || {}, waitable);
|
168 | }
|
169 | return this._scheduleJob(name, argument, options || {}, rxjs_1.EMPTY);
|
170 | }
|
171 | |
172 |
|
173 |
|
174 |
|
175 | _filterJobOutboundMessages(message, state) {
|
176 | switch (message.kind) {
|
177 | case api_1.JobOutboundMessageKind.OnReady:
|
178 | return state == api_1.JobState.Queued;
|
179 | case api_1.JobOutboundMessageKind.Start:
|
180 | return state == api_1.JobState.Ready;
|
181 | case api_1.JobOutboundMessageKind.End:
|
182 | return state == api_1.JobState.Started || state == api_1.JobState.Ready;
|
183 | }
|
184 | return true;
|
185 | }
|
186 | |
187 |
|
188 |
|
189 |
|
190 | _updateState(message, state) {
|
191 | switch (message.kind) {
|
192 | case api_1.JobOutboundMessageKind.OnReady:
|
193 | return api_1.JobState.Ready;
|
194 | case api_1.JobOutboundMessageKind.Start:
|
195 | return api_1.JobState.Started;
|
196 | case api_1.JobOutboundMessageKind.End:
|
197 | return api_1.JobState.Ended;
|
198 | }
|
199 | return state;
|
200 | }
|
201 | |
202 |
|
203 |
|
204 |
|
205 | _createJob(name, argument, handler, inboundBus, outboundBus) {
|
206 | const schemaRegistry = this._schemaRegistry;
|
207 | const channelsSubject = new Map();
|
208 | const channels = new Map();
|
209 | let state = api_1.JobState.Queued;
|
210 | let pingId = 0;
|
211 |
|
212 | const input = new rxjs_1.Subject();
|
213 | input
|
214 | .pipe((0, operators_1.concatMap)((message) => handler.pipe((0, operators_1.switchMap)((handler) => {
|
215 | if (handler === null) {
|
216 | throw new exception_1.JobDoesNotExistException(name);
|
217 | }
|
218 | else {
|
219 | return handler.inputV.pipe((0, operators_1.switchMap)((validate) => validate(message)));
|
220 | }
|
221 | }))), (0, operators_1.filter)((result) => result.success), (0, operators_1.map)((result) => result.data))
|
222 | .subscribe((value) => inboundBus.next({ kind: api_1.JobInboundMessageKind.Input, value }));
|
223 | outboundBus = (0, rxjs_1.concat)(outboundBus,
|
224 |
|
225 |
|
226 | handler.pipe((0, operators_1.switchMap)((handler) => {
|
227 | if (handler) {
|
228 | return (0, rxjs_1.of)({
|
229 | kind: api_1.JobOutboundMessageKind.End,
|
230 | description: handler.jobDescription,
|
231 | });
|
232 | }
|
233 | else {
|
234 | return rxjs_1.EMPTY;
|
235 | }
|
236 | }))).pipe((0, operators_1.filter)((message) => this._filterJobOutboundMessages(message, state)),
|
237 |
|
238 | (0, operators_1.tap)((message) => {
|
239 |
|
240 | state = this._updateState(message, state);
|
241 | switch (message.kind) {
|
242 | case api_1.JobOutboundMessageKind.ChannelCreate: {
|
243 | const maybeSubject = channelsSubject.get(message.name);
|
244 |
|
245 | if (!maybeSubject) {
|
246 | const s = new rxjs_1.Subject();
|
247 | channelsSubject.set(message.name, s);
|
248 | channels.set(message.name, s.asObservable());
|
249 | }
|
250 | break;
|
251 | }
|
252 | case api_1.JobOutboundMessageKind.ChannelMessage: {
|
253 | const maybeSubject = channelsSubject.get(message.name);
|
254 | if (maybeSubject) {
|
255 | maybeSubject.next(message.message);
|
256 | }
|
257 | break;
|
258 | }
|
259 | case api_1.JobOutboundMessageKind.ChannelComplete: {
|
260 | const maybeSubject = channelsSubject.get(message.name);
|
261 | if (maybeSubject) {
|
262 | maybeSubject.complete();
|
263 | channelsSubject.delete(message.name);
|
264 | }
|
265 | break;
|
266 | }
|
267 | case api_1.JobOutboundMessageKind.ChannelError: {
|
268 | const maybeSubject = channelsSubject.get(message.name);
|
269 | if (maybeSubject) {
|
270 | maybeSubject.error(message.error);
|
271 | channelsSubject.delete(message.name);
|
272 | }
|
273 | break;
|
274 | }
|
275 | }
|
276 | }, () => {
|
277 | state = api_1.JobState.Errored;
|
278 | }),
|
279 |
|
280 |
|
281 | (0, operators_1.concatMap)((message) => {
|
282 | if (message.kind !== api_1.JobOutboundMessageKind.Output) {
|
283 | return (0, rxjs_1.of)(message);
|
284 | }
|
285 | return handler.pipe((0, operators_1.switchMap)((handler) => {
|
286 | if (handler === null) {
|
287 | throw new exception_1.JobDoesNotExistException(name);
|
288 | }
|
289 | else {
|
290 | return handler.outputV.pipe((0, operators_1.switchMap)((validate) => validate(message.value)), (0, operators_1.switchMap)((output) => {
|
291 | if (!output.success) {
|
292 | throw new JobOutputSchemaValidationError(output.errors);
|
293 | }
|
294 | return (0, rxjs_1.of)({
|
295 | ...message,
|
296 | output: output.data,
|
297 | });
|
298 | }));
|
299 | }
|
300 | }));
|
301 | }), _jobShare());
|
302 | const output = outboundBus.pipe((0, operators_1.filter)((x) => x.kind == api_1.JobOutboundMessageKind.Output), (0, operators_1.map)((x) => x.value), (0, operators_1.shareReplay)(1));
|
303 |
|
304 | return {
|
305 | get state() {
|
306 | return state;
|
307 | },
|
308 | argument,
|
309 | description: handler.pipe((0, operators_1.switchMap)((handler) => {
|
310 | if (handler === null) {
|
311 | throw new exception_1.JobDoesNotExistException(name);
|
312 | }
|
313 | else {
|
314 | return (0, rxjs_1.of)(handler.jobDescription);
|
315 | }
|
316 | })),
|
317 | output,
|
318 | getChannel(name, schema = true) {
|
319 | let maybeObservable = channels.get(name);
|
320 | if (!maybeObservable) {
|
321 | const s = new rxjs_1.Subject();
|
322 | channelsSubject.set(name, s);
|
323 | channels.set(name, s.asObservable());
|
324 | maybeObservable = s.asObservable();
|
325 | }
|
326 | return maybeObservable.pipe(
|
327 |
|
328 | (0, operators_1.concatMap)((message) => {
|
329 | return schemaRegistry.compile(schema).pipe((0, operators_1.switchMap)((validate) => validate(message)), (0, operators_1.filter)((x) => x.success), (0, operators_1.map)((x) => x.data));
|
330 | }));
|
331 | },
|
332 | ping() {
|
333 | const id = pingId++;
|
334 | inboundBus.next({ kind: api_1.JobInboundMessageKind.Ping, id });
|
335 | return outboundBus.pipe((0, operators_1.filter)((x) => x.kind === api_1.JobOutboundMessageKind.Pong && x.id == id), (0, operators_1.first)(), (0, operators_1.ignoreElements)());
|
336 | },
|
337 | stop() {
|
338 | inboundBus.next({ kind: api_1.JobInboundMessageKind.Stop });
|
339 | },
|
340 | input,
|
341 | inboundBus,
|
342 | outboundBus,
|
343 | };
|
344 | }
|
345 | _scheduleJob(name, argument, options, waitable) {
|
346 |
|
347 | const handler = this._getInternalDescription(name);
|
348 | const optionsDeps = (options && options.dependencies) || [];
|
349 | const dependencies = Array.isArray(optionsDeps) ? optionsDeps : [optionsDeps];
|
350 | const inboundBus = new rxjs_1.Subject();
|
351 | const outboundBus = (0, rxjs_1.concat)(
|
352 |
|
353 |
|
354 | (0, rxjs_1.merge)(...dependencies.map((x) => x.outboundBus)).pipe((0, operators_1.ignoreElements)()),
|
355 |
|
356 | waitable, (0, rxjs_1.from)(handler).pipe((0, operators_1.switchMap)((handler) => new rxjs_1.Observable((subscriber) => {
|
357 | if (!handler) {
|
358 | throw new exception_1.JobDoesNotExistException(name);
|
359 | }
|
360 |
|
361 | return handler.argumentV
|
362 | .pipe((0, operators_1.switchMap)((validate) => validate(argument)), (0, operators_1.switchMap)((output) => {
|
363 | if (!output.success) {
|
364 | throw new JobArgumentSchemaValidationError(output.errors);
|
365 | }
|
366 | const argument = output.data;
|
367 | const description = handler.jobDescription;
|
368 | subscriber.next({ kind: api_1.JobOutboundMessageKind.OnReady, description });
|
369 | const context = {
|
370 | description,
|
371 | dependencies: [...dependencies],
|
372 | inboundBus: inboundBus.asObservable(),
|
373 | scheduler: this,
|
374 | };
|
375 | return handler(argument, context);
|
376 | }))
|
377 | .subscribe(subscriber);
|
378 | }))));
|
379 | return this._createJob(name, argument, handler, inboundBus, outboundBus);
|
380 | }
|
381 | }
|
382 | exports.SimpleScheduler = SimpleScheduler;
|