1 | "use strict";
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 | Object.defineProperty(exports, "__esModule", { value: true });
|
10 | exports.createLoggerJob = exports.createJobFactory = exports.createJobHandler = exports.ChannelAlreadyExistException = void 0;
|
11 | const rxjs_1 = require("rxjs");
|
12 | const operators_1 = require("rxjs/operators");
|
13 | const index_1 = require("../../exception/index");
|
14 | const index_2 = require("../../utils/index");
|
15 | const api_1 = require("./api");
|
16 | class ChannelAlreadyExistException extends index_1.BaseException {
|
17 | constructor(name) {
|
18 | super(`Channel ${JSON.stringify(name)} already exist.`);
|
19 | }
|
20 | }
|
21 | exports.ChannelAlreadyExistException = ChannelAlreadyExistException;
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 | function createJobHandler(fn, options = {}) {
|
30 | const handler = (argument, context) => {
|
31 | const description = context.description;
|
32 | const inboundBus = context.inboundBus;
|
33 | const inputChannel = new rxjs_1.Subject();
|
34 | let subscription;
|
35 | return new rxjs_1.Observable((subject) => {
|
36 | function complete() {
|
37 | if (subscription) {
|
38 | subscription.unsubscribe();
|
39 | }
|
40 | subject.next({ kind: api_1.JobOutboundMessageKind.End, description });
|
41 | subject.complete();
|
42 | inputChannel.complete();
|
43 | }
|
44 |
|
45 | const inboundSub = inboundBus.subscribe((message) => {
|
46 | switch (message.kind) {
|
47 | case api_1.JobInboundMessageKind.Ping:
|
48 | subject.next({ kind: api_1.JobOutboundMessageKind.Pong, description, id: message.id });
|
49 | break;
|
50 | case api_1.JobInboundMessageKind.Stop:
|
51 |
|
52 |
|
53 | complete();
|
54 | break;
|
55 | case api_1.JobInboundMessageKind.Input:
|
56 | inputChannel.next(message.value);
|
57 | break;
|
58 | }
|
59 | });
|
60 |
|
61 | const channels = new Map();
|
62 | const newContext = {
|
63 | ...context,
|
64 | input: inputChannel.asObservable(),
|
65 | createChannel(name) {
|
66 | if (channels.has(name)) {
|
67 | throw new ChannelAlreadyExistException(name);
|
68 | }
|
69 | const channelSubject = new rxjs_1.Subject();
|
70 | const channelSub = channelSubject.subscribe((message) => {
|
71 | subject.next({
|
72 | kind: api_1.JobOutboundMessageKind.ChannelMessage,
|
73 | description,
|
74 | name,
|
75 | message,
|
76 | });
|
77 | }, (error) => {
|
78 | subject.next({ kind: api_1.JobOutboundMessageKind.ChannelError, description, name, error });
|
79 |
|
80 | channels.delete(name);
|
81 | }, () => {
|
82 | subject.next({ kind: api_1.JobOutboundMessageKind.ChannelComplete, description, name });
|
83 |
|
84 | channels.delete(name);
|
85 | });
|
86 | channels.set(name, channelSubject);
|
87 | if (subscription) {
|
88 | subscription.add(channelSub);
|
89 | }
|
90 | return channelSubject;
|
91 | },
|
92 | };
|
93 | subject.next({ kind: api_1.JobOutboundMessageKind.Start, description });
|
94 | let result = fn(argument, newContext);
|
95 |
|
96 | if ((0, index_2.isPromise)(result)) {
|
97 | result = (0, rxjs_1.from)(result);
|
98 | }
|
99 | else if (!(0, rxjs_1.isObservable)(result)) {
|
100 | result = (0, rxjs_1.of)(result);
|
101 | }
|
102 | subscription = result.subscribe((value) => subject.next({ kind: api_1.JobOutboundMessageKind.Output, description, value }), (error) => subject.error(error), () => complete());
|
103 | subscription.add(inboundSub);
|
104 | return subscription;
|
105 | });
|
106 | };
|
107 | return Object.assign(handler, { jobDescription: options });
|
108 | }
|
109 | exports.createJobHandler = createJobHandler;
|
110 |
|
111 |
|
112 |
|
113 |
|
114 |
|
115 | function createJobFactory(loader, options = {}) {
|
116 | const handler = (argument, context) => {
|
117 | return (0, rxjs_1.from)(loader()).pipe((0, operators_1.switchMap)((fn) => fn(argument, context)));
|
118 | };
|
119 | return Object.assign(handler, { jobDescription: options });
|
120 | }
|
121 | exports.createJobFactory = createJobFactory;
|
122 |
|
123 |
|
124 |
|
125 |
|
126 | function createLoggerJob(job, logger) {
|
127 | const handler = (argument, context) => {
|
128 | context.inboundBus
|
129 | .pipe((0, operators_1.tap)((message) => logger.info(`Input: ${JSON.stringify(message)}`)))
|
130 | .subscribe();
|
131 | return job(argument, context).pipe((0, operators_1.tap)((message) => logger.info(`Message: ${JSON.stringify(message)}`), (error) => logger.warn(`Error: ${JSON.stringify(error)}`), () => logger.info(`Completed`)));
|
132 | };
|
133 | return Object.assign(handler, job);
|
134 | }
|
135 | exports.createLoggerJob = createLoggerJob;
|