UNPKG

16.7 kBJavaScriptView Raw
1"use strict";
2/**
3 * @license
4 * Copyright Google LLC All Rights Reserved.
5 *
6 * Use of this source code is governed by an MIT-style license that can be
7 * found in the LICENSE file at https://angular.io/license
8 */
9Object.defineProperty(exports, "__esModule", { value: true });
10exports.SimpleScheduler = exports.JobOutputSchemaValidationError = exports.JobInboundMessageSchemaValidationError = exports.JobArgumentSchemaValidationError = void 0;
11const core_1 = require("@angular-devkit/core");
12const rxjs_1 = require("rxjs");
13const operators_1 = require("rxjs/operators");
14const api_1 = require("./api");
15const exception_1 = require("./exception");
16class JobArgumentSchemaValidationError extends core_1.schema.SchemaValidationException {
17 constructor(errors) {
18 super(errors, 'Job Argument failed to validate. Errors: ');
19 }
20}
21exports.JobArgumentSchemaValidationError = JobArgumentSchemaValidationError;
22class JobInboundMessageSchemaValidationError extends core_1.schema.SchemaValidationException {
23 constructor(errors) {
24 super(errors, 'Job Inbound Message failed to validate. Errors: ');
25 }
26}
27exports.JobInboundMessageSchemaValidationError = JobInboundMessageSchemaValidationError;
28class JobOutputSchemaValidationError extends core_1.schema.SchemaValidationException {
29 constructor(errors) {
30 super(errors, 'Job Output failed to validate. Errors: ');
31 }
32}
33exports.JobOutputSchemaValidationError = JobOutputSchemaValidationError;
34function _jobShare() {
35 // This is the same code as a `shareReplay()` operator, but uses a dumber Subject rather than a
36 // ReplaySubject.
37 return (source) => {
38 let refCount = 0;
39 let subject;
40 let hasError = false;
41 let isComplete = false;
42 let subscription;
43 return new rxjs_1.Observable((subscriber) => {
44 let innerSub;
45 refCount++;
46 if (!subject) {
47 subject = new rxjs_1.Subject();
48 innerSub = subject.subscribe(subscriber);
49 subscription = source.subscribe({
50 next(value) {
51 subject.next(value);
52 },
53 error(err) {
54 hasError = true;
55 subject.error(err);
56 },
57 complete() {
58 isComplete = true;
59 subject.complete();
60 },
61 });
62 }
63 else {
64 innerSub = subject.subscribe(subscriber);
65 }
66 return () => {
67 refCount--;
68 innerSub.unsubscribe();
69 if (subscription && refCount === 0 && (isComplete || hasError)) {
70 subscription.unsubscribe();
71 }
72 };
73 });
74 };
75}
76/**
77 * Simple scheduler. Should be the base of all registries and schedulers.
78 */
79class SimpleScheduler {
80 constructor(_jobRegistry, _schemaRegistry = new core_1.schema.CoreSchemaRegistry()) {
81 this._jobRegistry = _jobRegistry;
82 this._schemaRegistry = _schemaRegistry;
83 this._internalJobDescriptionMap = new Map();
84 this._queue = [];
85 this._pauseCounter = 0;
86 }
87 _getInternalDescription(name) {
88 const maybeHandler = this._internalJobDescriptionMap.get(name);
89 if (maybeHandler !== undefined) {
90 return (0, rxjs_1.of)(maybeHandler);
91 }
92 const handler = this._jobRegistry.get(name);
93 return handler.pipe((0, operators_1.switchMap)((handler) => {
94 if (handler === null) {
95 return (0, rxjs_1.of)(null);
96 }
97 const description = {
98 // Make a copy of it to be sure it's proper JSON.
99 ...JSON.parse(JSON.stringify(handler.jobDescription)),
100 name: handler.jobDescription.name || name,
101 argument: handler.jobDescription.argument || true,
102 input: handler.jobDescription.input || true,
103 output: handler.jobDescription.output || true,
104 channels: handler.jobDescription.channels || {},
105 };
106 const handlerWithExtra = Object.assign(handler.bind(undefined), {
107 jobDescription: description,
108 argumentV: this._schemaRegistry.compile(description.argument).pipe((0, operators_1.shareReplay)(1)),
109 inputV: this._schemaRegistry.compile(description.input).pipe((0, operators_1.shareReplay)(1)),
110 outputV: this._schemaRegistry.compile(description.output).pipe((0, operators_1.shareReplay)(1)),
111 });
112 this._internalJobDescriptionMap.set(name, handlerWithExtra);
113 return (0, rxjs_1.of)(handlerWithExtra);
114 }));
115 }
116 /**
117 * Get a job description for a named job.
118 *
119 * @param name The name of the job.
120 * @returns A description, or null if the job is not registered.
121 */
122 getDescription(name) {
123 return (0, rxjs_1.concat)(this._getInternalDescription(name).pipe((0, operators_1.map)((x) => x && x.jobDescription)), (0, rxjs_1.of)(null)).pipe((0, operators_1.first)());
124 }
125 /**
126 * Returns true if the job name has been registered.
127 * @param name The name of the job.
128 * @returns True if the job exists, false otherwise.
129 */
130 has(name) {
131 return this.getDescription(name).pipe((0, operators_1.map)((x) => x !== null));
132 }
133 /**
134 * Pause the scheduler, temporary queueing _new_ jobs. Returns a resume function that should be
135 * used to resume execution. If multiple `pause()` were called, all their resume functions must
136 * be called before the Scheduler actually starts new jobs. Additional calls to the same resume
137 * function will have no effect.
138 *
139 * Jobs already running are NOT paused. This is pausing the scheduler only.
140 */
141 pause() {
142 let called = false;
143 this._pauseCounter++;
144 return () => {
145 if (!called) {
146 called = true;
147 if (--this._pauseCounter == 0) {
148 // Resume the queue.
149 const q = this._queue;
150 this._queue = [];
151 q.forEach((fn) => fn());
152 }
153 }
154 };
155 }
156 /**
157 * Schedule a job to be run, using its name.
158 * @param name The name of job to be run.
159 * @param argument The argument to send to the job when starting it.
160 * @param options Scheduling options.
161 * @returns The Job being run.
162 */
163 schedule(name, argument, options) {
164 if (this._pauseCounter > 0) {
165 const waitable = new rxjs_1.Subject();
166 this._queue.push(() => waitable.complete());
167 return this._scheduleJob(name, argument, options || {}, waitable);
168 }
169 return this._scheduleJob(name, argument, options || {}, rxjs_1.EMPTY);
170 }
171 /**
172 * Filter messages.
173 * @private
174 */
175 _filterJobOutboundMessages(message, state) {
176 switch (message.kind) {
177 case api_1.JobOutboundMessageKind.OnReady:
178 return state == api_1.JobState.Queued;
179 case api_1.JobOutboundMessageKind.Start:
180 return state == api_1.JobState.Ready;
181 case api_1.JobOutboundMessageKind.End:
182 return state == api_1.JobState.Started || state == api_1.JobState.Ready;
183 }
184 return true;
185 }
186 /**
187 * Return a new state. This is just to simplify the reading of the _createJob method.
188 * @private
189 */
190 _updateState(message, state) {
191 switch (message.kind) {
192 case api_1.JobOutboundMessageKind.OnReady:
193 return api_1.JobState.Ready;
194 case api_1.JobOutboundMessageKind.Start:
195 return api_1.JobState.Started;
196 case api_1.JobOutboundMessageKind.End:
197 return api_1.JobState.Ended;
198 }
199 return state;
200 }
201 /**
202 * Create the job.
203 * @private
204 */
205 _createJob(name, argument, handler, inboundBus, outboundBus) {
206 const schemaRegistry = this._schemaRegistry;
207 const channelsSubject = new Map();
208 const channels = new Map();
209 let state = api_1.JobState.Queued;
210 let pingId = 0;
211 // Create the input channel by having a filter.
212 const input = new rxjs_1.Subject();
213 input
214 .pipe((0, operators_1.concatMap)((message) => handler.pipe((0, operators_1.switchMap)((handler) => {
215 if (handler === null) {
216 throw new exception_1.JobDoesNotExistException(name);
217 }
218 else {
219 return handler.inputV.pipe((0, operators_1.switchMap)((validate) => validate(message)));
220 }
221 }))), (0, operators_1.filter)((result) => result.success), (0, operators_1.map)((result) => result.data))
222 .subscribe((value) => inboundBus.next({ kind: api_1.JobInboundMessageKind.Input, value }));
223 outboundBus = (0, rxjs_1.concat)(outboundBus,
224 // Add an End message at completion. This will be filtered out if the job actually send an
225 // End.
226 handler.pipe((0, operators_1.switchMap)((handler) => {
227 if (handler) {
228 return (0, rxjs_1.of)({
229 kind: api_1.JobOutboundMessageKind.End,
230 description: handler.jobDescription,
231 });
232 }
233 else {
234 return rxjs_1.EMPTY;
235 }
236 }))).pipe((0, operators_1.filter)((message) => this._filterJobOutboundMessages(message, state)),
237 // Update internal logic and Job<> members.
238 (0, operators_1.tap)((message) => {
239 // Update the state.
240 state = this._updateState(message, state);
241 switch (message.kind) {
242 case api_1.JobOutboundMessageKind.ChannelCreate: {
243 const maybeSubject = channelsSubject.get(message.name);
244 // If it doesn't exist or it's closed on the other end.
245 if (!maybeSubject) {
246 const s = new rxjs_1.Subject();
247 channelsSubject.set(message.name, s);
248 channels.set(message.name, s.asObservable());
249 }
250 break;
251 }
252 case api_1.JobOutboundMessageKind.ChannelMessage: {
253 const maybeSubject = channelsSubject.get(message.name);
254 if (maybeSubject) {
255 maybeSubject.next(message.message);
256 }
257 break;
258 }
259 case api_1.JobOutboundMessageKind.ChannelComplete: {
260 const maybeSubject = channelsSubject.get(message.name);
261 if (maybeSubject) {
262 maybeSubject.complete();
263 channelsSubject.delete(message.name);
264 }
265 break;
266 }
267 case api_1.JobOutboundMessageKind.ChannelError: {
268 const maybeSubject = channelsSubject.get(message.name);
269 if (maybeSubject) {
270 maybeSubject.error(message.error);
271 channelsSubject.delete(message.name);
272 }
273 break;
274 }
275 }
276 }, () => {
277 state = api_1.JobState.Errored;
278 }),
279 // Do output validation (might include default values so this might have side
280 // effects). We keep all messages in order.
281 (0, operators_1.concatMap)((message) => {
282 if (message.kind !== api_1.JobOutboundMessageKind.Output) {
283 return (0, rxjs_1.of)(message);
284 }
285 return handler.pipe((0, operators_1.switchMap)((handler) => {
286 if (handler === null) {
287 throw new exception_1.JobDoesNotExistException(name);
288 }
289 else {
290 return handler.outputV.pipe((0, operators_1.switchMap)((validate) => validate(message.value)), (0, operators_1.switchMap)((output) => {
291 if (!output.success) {
292 throw new JobOutputSchemaValidationError(output.errors);
293 }
294 return (0, rxjs_1.of)({
295 ...message,
296 output: output.data,
297 });
298 }));
299 }
300 }));
301 }), _jobShare());
302 const output = outboundBus.pipe((0, operators_1.filter)((x) => x.kind == api_1.JobOutboundMessageKind.Output), (0, operators_1.map)((x) => x.value), (0, operators_1.shareReplay)(1));
303 // Return the Job.
304 return {
305 get state() {
306 return state;
307 },
308 argument,
309 description: handler.pipe((0, operators_1.switchMap)((handler) => {
310 if (handler === null) {
311 throw new exception_1.JobDoesNotExistException(name);
312 }
313 else {
314 return (0, rxjs_1.of)(handler.jobDescription);
315 }
316 })),
317 output,
318 getChannel(name, schema = true) {
319 let maybeObservable = channels.get(name);
320 if (!maybeObservable) {
321 const s = new rxjs_1.Subject();
322 channelsSubject.set(name, s);
323 channels.set(name, s.asObservable());
324 maybeObservable = s.asObservable();
325 }
326 return maybeObservable.pipe(
327 // Keep the order of messages.
328 (0, operators_1.concatMap)((message) => {
329 return schemaRegistry.compile(schema).pipe((0, operators_1.switchMap)((validate) => validate(message)), (0, operators_1.filter)((x) => x.success), (0, operators_1.map)((x) => x.data));
330 }));
331 },
332 ping() {
333 const id = pingId++;
334 inboundBus.next({ kind: api_1.JobInboundMessageKind.Ping, id });
335 return outboundBus.pipe((0, operators_1.filter)((x) => x.kind === api_1.JobOutboundMessageKind.Pong && x.id == id), (0, operators_1.first)(), (0, operators_1.ignoreElements)());
336 },
337 stop() {
338 inboundBus.next({ kind: api_1.JobInboundMessageKind.Stop });
339 },
340 input,
341 inboundBus,
342 outboundBus,
343 };
344 }
345 _scheduleJob(name, argument, options, waitable) {
346 // Get handler first, since this can error out if there's no handler for the job name.
347 const handler = this._getInternalDescription(name);
348 const optionsDeps = (options && options.dependencies) || [];
349 const dependencies = Array.isArray(optionsDeps) ? optionsDeps : [optionsDeps];
350 const inboundBus = new rxjs_1.Subject();
351 const outboundBus = (0, rxjs_1.concat)(
352 // Wait for dependencies, make sure to not report messages from dependencies. Subscribe to
353 // all dependencies at the same time so they run concurrently.
354 (0, rxjs_1.merge)(...dependencies.map((x) => x.outboundBus)).pipe((0, operators_1.ignoreElements)()),
355 // Wait for pause() to clear (if necessary).
356 waitable, (0, rxjs_1.from)(handler).pipe((0, operators_1.switchMap)((handler) => new rxjs_1.Observable((subscriber) => {
357 if (!handler) {
358 throw new exception_1.JobDoesNotExistException(name);
359 }
360 // Validate the argument.
361 return handler.argumentV
362 .pipe((0, operators_1.switchMap)((validate) => validate(argument)), (0, operators_1.switchMap)((output) => {
363 if (!output.success) {
364 throw new JobArgumentSchemaValidationError(output.errors);
365 }
366 const argument = output.data;
367 const description = handler.jobDescription;
368 subscriber.next({ kind: api_1.JobOutboundMessageKind.OnReady, description });
369 const context = {
370 description,
371 dependencies: [...dependencies],
372 inboundBus: inboundBus.asObservable(),
373 scheduler: this,
374 };
375 return handler(argument, context);
376 }))
377 .subscribe(subscriber);
378 }))));
379 return this._createJob(name, argument, handler, inboundBus, outboundBus);
380 }
381}
382exports.SimpleScheduler = SimpleScheduler;