UNPKG

6.13 kBJavaScriptView Raw
1"use strict";
2/**
3 * @license
4 * Copyright Google LLC All Rights Reserved.
5 *
6 * Use of this source code is governed by an MIT-style license that can be
7 * found in the LICENSE file at https://angular.io/license
8 */
9Object.defineProperty(exports, "__esModule", { value: true });
10exports.createLoggerJob = exports.createJobFactory = exports.createJobHandler = exports.ChannelAlreadyExistException = void 0;
11const core_1 = require("@angular-devkit/core");
12const rxjs_1 = require("rxjs");
13const operators_1 = require("rxjs/operators");
14const api_1 = require("./api");
15class ChannelAlreadyExistException extends core_1.BaseException {
16 constructor(name) {
17 super(`Channel ${JSON.stringify(name)} already exist.`);
18 }
19}
20exports.ChannelAlreadyExistException = ChannelAlreadyExistException;
21/**
22 * Make a simple job handler that sets start and end from a function that's synchronous.
23 *
24 * @param fn The function to create a handler for.
25 * @param options An optional set of properties to set on the handler. Some fields might be
26 * required by registry or schedulers.
27 */
28function createJobHandler(fn, options = {}) {
29 const handler = (argument, context) => {
30 const description = context.description;
31 const inboundBus = context.inboundBus;
32 const inputChannel = new rxjs_1.Subject();
33 let subscription;
34 return new rxjs_1.Observable((subject) => {
35 function complete() {
36 if (subscription) {
37 subscription.unsubscribe();
38 }
39 subject.next({ kind: api_1.JobOutboundMessageKind.End, description });
40 subject.complete();
41 inputChannel.complete();
42 }
43 // Handle input.
44 const inboundSub = inboundBus.subscribe((message) => {
45 switch (message.kind) {
46 case api_1.JobInboundMessageKind.Ping:
47 subject.next({ kind: api_1.JobOutboundMessageKind.Pong, description, id: message.id });
48 break;
49 case api_1.JobInboundMessageKind.Stop:
50 // There's no way to cancel a promise or a synchronous function, but we do cancel
51 // observables where possible.
52 complete();
53 break;
54 case api_1.JobInboundMessageKind.Input:
55 inputChannel.next(message.value);
56 break;
57 }
58 });
59 // Execute the function with the additional context.
60 const channels = new Map();
61 const newContext = {
62 ...context,
63 input: inputChannel.asObservable(),
64 createChannel(name) {
65 if (channels.has(name)) {
66 throw new ChannelAlreadyExistException(name);
67 }
68 const channelSubject = new rxjs_1.Subject();
69 const channelSub = channelSubject.subscribe((message) => {
70 subject.next({
71 kind: api_1.JobOutboundMessageKind.ChannelMessage,
72 description,
73 name,
74 message,
75 });
76 }, (error) => {
77 subject.next({ kind: api_1.JobOutboundMessageKind.ChannelError, description, name, error });
78 // This can be reopened.
79 channels.delete(name);
80 }, () => {
81 subject.next({ kind: api_1.JobOutboundMessageKind.ChannelComplete, description, name });
82 // This can be reopened.
83 channels.delete(name);
84 });
85 channels.set(name, channelSubject);
86 if (subscription) {
87 subscription.add(channelSub);
88 }
89 return channelSubject;
90 },
91 };
92 subject.next({ kind: api_1.JobOutboundMessageKind.Start, description });
93 let result = fn(argument, newContext);
94 // If the result is a promise, simply wait for it to complete before reporting the result.
95 if ((0, core_1.isPromise)(result)) {
96 result = (0, rxjs_1.from)(result);
97 }
98 else if (!(0, rxjs_1.isObservable)(result)) {
99 result = (0, rxjs_1.of)(result);
100 }
101 subscription = result.subscribe((value) => subject.next({ kind: api_1.JobOutboundMessageKind.Output, description, value }), (error) => subject.error(error), () => complete());
102 subscription.add(inboundSub);
103 return subscription;
104 });
105 };
106 return Object.assign(handler, { jobDescription: options });
107}
108exports.createJobHandler = createJobHandler;
109/**
110 * Lazily create a job using a function.
111 * @param loader A factory function that returns a promise/observable of a JobHandler.
112 * @param options Same options as createJob.
113 */
114function createJobFactory(loader, options = {}) {
115 const handler = (argument, context) => {
116 return (0, rxjs_1.from)(loader()).pipe((0, operators_1.switchMap)((fn) => fn(argument, context)));
117 };
118 return Object.assign(handler, { jobDescription: options });
119}
120exports.createJobFactory = createJobFactory;
121/**
122 * Creates a job that logs out input/output messages of another Job. The messages are still
123 * propagated to the other job.
124 */
125function createLoggerJob(job, logger) {
126 const handler = (argument, context) => {
127 context.inboundBus
128 .pipe((0, operators_1.tap)((message) => logger.info(`Input: ${JSON.stringify(message)}`)))
129 .subscribe();
130 return job(argument, context).pipe((0, operators_1.tap)((message) => logger.info(`Message: ${JSON.stringify(message)}`), (error) => logger.warn(`Error: ${JSON.stringify(error)}`), () => logger.info(`Completed`)));
131 };
132 return Object.assign(handler, job);
133}
134exports.createLoggerJob = createLoggerJob;