UNPKG

6.14 kBJavaScriptView Raw
1"use strict";
2/**
3 * @license
4 * Copyright Google LLC All Rights Reserved.
5 *
6 * Use of this source code is governed by an MIT-style license that can be
7 * found in the LICENSE file at https://angular.io/license
8 */
9Object.defineProperty(exports, "__esModule", { value: true });
10exports.createLoggerJob = exports.createJobFactory = exports.createJobHandler = exports.ChannelAlreadyExistException = void 0;
11const rxjs_1 = require("rxjs");
12const operators_1 = require("rxjs/operators");
13const index_1 = require("../../exception/index");
14const index_2 = require("../../utils/index");
15const api_1 = require("./api");
16class ChannelAlreadyExistException extends index_1.BaseException {
17 constructor(name) {
18 super(`Channel ${JSON.stringify(name)} already exist.`);
19 }
20}
21exports.ChannelAlreadyExistException = ChannelAlreadyExistException;
22/**
23 * Make a simple job handler that sets start and end from a function that's synchronous.
24 *
25 * @param fn The function to create a handler for.
26 * @param options An optional set of properties to set on the handler. Some fields might be
27 * required by registry or schedulers.
28 */
29function createJobHandler(fn, options = {}) {
30 const handler = (argument, context) => {
31 const description = context.description;
32 const inboundBus = context.inboundBus;
33 const inputChannel = new rxjs_1.Subject();
34 let subscription;
35 return new rxjs_1.Observable((subject) => {
36 function complete() {
37 if (subscription) {
38 subscription.unsubscribe();
39 }
40 subject.next({ kind: api_1.JobOutboundMessageKind.End, description });
41 subject.complete();
42 inputChannel.complete();
43 }
44 // Handle input.
45 const inboundSub = inboundBus.subscribe((message) => {
46 switch (message.kind) {
47 case api_1.JobInboundMessageKind.Ping:
48 subject.next({ kind: api_1.JobOutboundMessageKind.Pong, description, id: message.id });
49 break;
50 case api_1.JobInboundMessageKind.Stop:
51 // There's no way to cancel a promise or a synchronous function, but we do cancel
52 // observables where possible.
53 complete();
54 break;
55 case api_1.JobInboundMessageKind.Input:
56 inputChannel.next(message.value);
57 break;
58 }
59 });
60 // Execute the function with the additional context.
61 const channels = new Map();
62 const newContext = {
63 ...context,
64 input: inputChannel.asObservable(),
65 createChannel(name) {
66 if (channels.has(name)) {
67 throw new ChannelAlreadyExistException(name);
68 }
69 const channelSubject = new rxjs_1.Subject();
70 const channelSub = channelSubject.subscribe((message) => {
71 subject.next({
72 kind: api_1.JobOutboundMessageKind.ChannelMessage,
73 description,
74 name,
75 message,
76 });
77 }, (error) => {
78 subject.next({ kind: api_1.JobOutboundMessageKind.ChannelError, description, name, error });
79 // This can be reopened.
80 channels.delete(name);
81 }, () => {
82 subject.next({ kind: api_1.JobOutboundMessageKind.ChannelComplete, description, name });
83 // This can be reopened.
84 channels.delete(name);
85 });
86 channels.set(name, channelSubject);
87 if (subscription) {
88 subscription.add(channelSub);
89 }
90 return channelSubject;
91 },
92 };
93 subject.next({ kind: api_1.JobOutboundMessageKind.Start, description });
94 let result = fn(argument, newContext);
95 // If the result is a promise, simply wait for it to complete before reporting the result.
96 if (index_2.isPromise(result)) {
97 result = rxjs_1.from(result);
98 }
99 else if (!rxjs_1.isObservable(result)) {
100 result = rxjs_1.of(result);
101 }
102 subscription = result.subscribe((value) => subject.next({ kind: api_1.JobOutboundMessageKind.Output, description, value }), (error) => subject.error(error), () => complete());
103 subscription.add(inboundSub);
104 return subscription;
105 });
106 };
107 return Object.assign(handler, { jobDescription: options });
108}
109exports.createJobHandler = createJobHandler;
110/**
111 * Lazily create a job using a function.
112 * @param loader A factory function that returns a promise/observable of a JobHandler.
113 * @param options Same options as createJob.
114 */
115function createJobFactory(loader, options = {}) {
116 const handler = (argument, context) => {
117 return rxjs_1.from(loader()).pipe(operators_1.switchMap((fn) => fn(argument, context)));
118 };
119 return Object.assign(handler, { jobDescription: options });
120}
121exports.createJobFactory = createJobFactory;
122/**
123 * Creates a job that logs out input/output messages of another Job. The messages are still
124 * propagated to the other job.
125 */
126function createLoggerJob(job, logger) {
127 const handler = (argument, context) => {
128 context.inboundBus
129 .pipe(operators_1.tap((message) => logger.info(`Input: ${JSON.stringify(message)}`)))
130 .subscribe();
131 return job(argument, context).pipe(operators_1.tap((message) => logger.info(`Message: ${JSON.stringify(message)}`), (error) => logger.warn(`Error: ${JSON.stringify(error)}`), () => logger.info(`Completed`)));
132 };
133 return Object.assign(handler, job);
134}
135exports.createLoggerJob = createLoggerJob;