1 | "use strict";
|
2 | var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
|
3 | function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
|
4 | return new (P || (P = Promise))(function (resolve, reject) {
|
5 | function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
|
6 | function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
|
7 | function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
|
8 | step((generator = generator.apply(thisArg, _arguments || [])).next());
|
9 | });
|
10 | };
|
11 | Object.defineProperty(exports, "__esModule", { value: true });
|
12 | const pubsub_async_iterator_1 = require("./pubsub-async-iterator");
|
13 | const rabbitmq_pub_sub_1 = require("rabbitmq-pub-sub");
|
14 | const async_1 = require("async");
|
15 | const child_logger_1 = require("./child-logger");
|
16 | class AmqpPubSub {
|
17 | constructor(options = {}) {
|
18 | this.triggerTransform =
|
19 | options.triggerTransform || (trigger => trigger);
|
20 | const config = options.config || { host: '127.0.0.1', port: 5672 };
|
21 | const { logger } = options;
|
22 | this.logger = child_logger_1.createChildLogger(logger, 'AmqpPubSub');
|
23 | const factory = new rabbitmq_pub_sub_1.RabbitMqSingletonConnectionFactory(logger, config);
|
24 | this.consumer = new rabbitmq_pub_sub_1.RabbitMqSubscriber(logger, factory);
|
25 | this.producer = new rabbitmq_pub_sub_1.RabbitMqPublisher(logger, factory);
|
26 | this.subscriptionMap = {};
|
27 | this.subsRefsMap = {};
|
28 | this.currentSubscriptionId = 0;
|
29 | }
|
30 | publish(trigger, payload) {
|
31 | return __awaiter(this, void 0, void 0, function* () {
|
32 | this.logger.trace("publishing for queue '%s' (%j)", trigger, payload);
|
33 | this.producer.publish(trigger, payload);
|
34 | });
|
35 | }
|
36 | subscribe(trigger, onMessage, options) {
|
37 | const triggerName = this.triggerTransform(trigger, options);
|
38 | const id = this.currentSubscriptionId++;
|
39 | this.subscriptionMap[id] = [triggerName, onMessage];
|
40 | let refs = this.subsRefsMap[triggerName];
|
41 | if (refs && refs.length > 0) {
|
42 | const newRefs = [...refs, id];
|
43 | this.subsRefsMap[triggerName] = newRefs;
|
44 | this.logger.trace("subscriber exist, adding triggerName '%s' to saved list.", triggerName);
|
45 | return Promise.resolve(id);
|
46 | }
|
47 | else {
|
48 | return new Promise((resolve, reject) => {
|
49 | this.logger.trace("trying to subscribe to queue '%s'", triggerName);
|
50 | this.consumer
|
51 | .subscribe(triggerName, msg => this.onMessage(triggerName, msg))
|
52 | .then(disposer => {
|
53 | this.subsRefsMap[triggerName] = [
|
54 | ...(this.subsRefsMap[triggerName] || []),
|
55 | id,
|
56 | ];
|
57 | this.unsubscribeChannel = disposer;
|
58 | return resolve(id);
|
59 | })
|
60 | .catch(err => {
|
61 | this.logger.error(err, "failed to recieve message from queue '%s'", triggerName);
|
62 | reject(id);
|
63 | });
|
64 | });
|
65 | }
|
66 | }
|
67 | unsubscribe(subId) {
|
68 | const [triggerName = null] = this.subscriptionMap[subId] || [];
|
69 | const refs = this.subsRefsMap[triggerName];
|
70 | if (!refs) {
|
71 | this.logger.error("There is no subscription of id '%s'", subId);
|
72 | throw new Error(`There is no subscription of id "{subId}"`);
|
73 | }
|
74 | let newRefs;
|
75 | if (refs.length === 1) {
|
76 | newRefs = [];
|
77 | this.unsubscribeChannel()
|
78 | .then(() => {
|
79 | this.logger.trace("cancelled channel from subscribing to queue '%s'", triggerName);
|
80 | })
|
81 | .catch(err => {
|
82 | this.logger.error(err, "channel cancellation failed from queue '%j'", triggerName);
|
83 | });
|
84 | }
|
85 | else {
|
86 | const index = refs.indexOf(subId);
|
87 | if (index !== -1) {
|
88 | newRefs = [...refs.slice(0, index), ...refs.slice(index + 1)];
|
89 | }
|
90 | this.logger.trace("removing triggerName from listening '%s' ", triggerName);
|
91 | }
|
92 | this.subsRefsMap[triggerName] = newRefs;
|
93 | delete this.subscriptionMap[subId];
|
94 | this.logger.trace("list of subscriptions still available '(%j)'", this.subscriptionMap);
|
95 | }
|
96 | asyncIterator(triggers) {
|
97 | return new pubsub_async_iterator_1.PubSubAsyncIterator(this, triggers);
|
98 | }
|
99 | onMessage(channel, message) {
|
100 | const subscribers = this.subsRefsMap[channel];
|
101 |
|
102 | if (!subscribers || !subscribers.length) {
|
103 | return;
|
104 | }
|
105 | this.logger.trace("sending message to subscriber callback function '(%j)'", message);
|
106 | async_1.each(subscribers, (subId, cb) => {
|
107 |
|
108 | const [triggerName, listener] = this.subscriptionMap[subId];
|
109 | listener(message);
|
110 | cb();
|
111 | });
|
112 | }
|
113 | }
|
114 | exports.AmqpPubSub = AmqpPubSub;
|