UNPKG

5.24 kBJavaScriptView Raw
1"use strict";
2var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
3 function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
4 return new (P || (P = Promise))(function (resolve, reject) {
5 function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
6 function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
7 function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
8 step((generator = generator.apply(thisArg, _arguments || [])).next());
9 });
10};
11Object.defineProperty(exports, "__esModule", { value: true });
12const pubsub_async_iterator_1 = require("./pubsub-async-iterator");
13const rabbitmq_pub_sub_1 = require("rabbitmq-pub-sub");
14const async_1 = require("async");
15const child_logger_1 = require("./child-logger");
16class AmqpPubSub {
17 constructor(options = {}) {
18 this.triggerTransform =
19 options.triggerTransform || (trigger => trigger);
20 const config = options.config || { host: '127.0.0.1', port: 5672 };
21 const { logger } = options;
22 this.logger = child_logger_1.createChildLogger(logger, 'AmqpPubSub');
23 const factory = new rabbitmq_pub_sub_1.RabbitMqSingletonConnectionFactory(logger, config);
24 this.consumer = new rabbitmq_pub_sub_1.RabbitMqSubscriber(logger, factory);
25 this.producer = new rabbitmq_pub_sub_1.RabbitMqPublisher(logger, factory);
26 this.subscriptionMap = {};
27 this.subsRefsMap = {};
28 this.currentSubscriptionId = 0;
29 }
30 publish(trigger, payload) {
31 return __awaiter(this, void 0, void 0, function* () {
32 this.logger.trace("publishing for queue '%s' (%j)", trigger, payload);
33 this.producer.publish(trigger, payload);
34 });
35 }
36 subscribe(trigger, onMessage, options) {
37 const triggerName = this.triggerTransform(trigger, options);
38 const id = this.currentSubscriptionId++;
39 this.subscriptionMap[id] = [triggerName, onMessage];
40 let refs = this.subsRefsMap[triggerName];
41 if (refs && refs.length > 0) {
42 const newRefs = [...refs, id];
43 this.subsRefsMap[triggerName] = newRefs;
44 this.logger.trace("subscriber exist, adding triggerName '%s' to saved list.", triggerName);
45 return Promise.resolve(id);
46 }
47 else {
48 return new Promise((resolve, reject) => {
49 this.logger.trace("trying to subscribe to queue '%s'", triggerName);
50 this.consumer
51 .subscribe(triggerName, msg => this.onMessage(triggerName, msg))
52 .then(disposer => {
53 this.subsRefsMap[triggerName] = [
54 ...(this.subsRefsMap[triggerName] || []),
55 id,
56 ];
57 this.unsubscribeChannel = disposer;
58 return resolve(id);
59 })
60 .catch(err => {
61 this.logger.error(err, "failed to recieve message from queue '%s'", triggerName);
62 reject(id);
63 });
64 });
65 }
66 }
67 unsubscribe(subId) {
68 const [triggerName = null] = this.subscriptionMap[subId] || [];
69 const refs = this.subsRefsMap[triggerName];
70 if (!refs) {
71 this.logger.error("There is no subscription of id '%s'", subId);
72 throw new Error(`There is no subscription of id "{subId}"`);
73 }
74 let newRefs;
75 if (refs.length === 1) {
76 newRefs = [];
77 this.unsubscribeChannel()
78 .then(() => {
79 this.logger.trace("cancelled channel from subscribing to queue '%s'", triggerName);
80 })
81 .catch(err => {
82 this.logger.error(err, "channel cancellation failed from queue '%j'", triggerName);
83 });
84 }
85 else {
86 const index = refs.indexOf(subId);
87 if (index !== -1) {
88 newRefs = [...refs.slice(0, index), ...refs.slice(index + 1)];
89 }
90 this.logger.trace("removing triggerName from listening '%s' ", triggerName);
91 }
92 this.subsRefsMap[triggerName] = newRefs;
93 delete this.subscriptionMap[subId];
94 this.logger.trace("list of subscriptions still available '(%j)'", this.subscriptionMap);
95 }
96 asyncIterator(triggers) {
97 return new pubsub_async_iterator_1.PubSubAsyncIterator(this, triggers);
98 }
99 onMessage(channel, message) {
100 const subscribers = this.subsRefsMap[channel];
101 // Don't work for nothing..
102 if (!subscribers || !subscribers.length) {
103 return;
104 }
105 this.logger.trace("sending message to subscriber callback function '(%j)'", message);
106 async_1.each(subscribers, (subId, cb) => {
107 // TODO Support pattern based subscriptions
108 const [triggerName, listener] = this.subscriptionMap[subId];
109 listener(message);
110 cb();
111 });
112 }
113}
114exports.AmqpPubSub = AmqpPubSub;