UNPKG

3.64 kBJavaScriptView Raw
1const amqp = require('amqplib'),
2 Promise = require('bluebird'),
3 __ = require('underscore'),
4 logger = require('../app/Logger');
5
6let __conn;
7let __connTimes = 0
8let __publishes = {}
9
10class __Publish {
11 constructor(name) {
12 this.__ex = name
13 }
14
15 publish(type, msg) {
16 const payload = Buffer.from(JSON.stringify(msg))
17 let channel
18 let ex = this.__ex
19 return __conn.createConfirmChannel()
20 .then((ch) => {
21 channel = ch
22 return ch.assertExchange(ex, 'topic', {
23 durable: false
24 })
25 })
26 .then(function () {
27 return channel.publish(ex, type, payload, {}, (err, ok) => {
28 if (err !== null)
29 return Promise.reject(err)
30 else
31 return ok
32 });
33 });
34 }
35}
36
37const __createQueue = (ch, ex, name, config) => {
38 let queue
39 return ch.assertQueue(name, {
40 durable: false
41 })
42 .then((q) => {
43 queue = q.queue
44 return ch.bindQueue(queue, ex, config.topic)
45 })
46 .then(() => {
47 return ch.consume(queue, (msg) => {
48 let payload = JSON.parse(msg.content.toString())
49 return config.consumer(payload)
50 .then((ok) => {
51 if (ok === true || ok === false) {
52 if (!ok)
53 logger.warn('The message consumer decide to requeue the message!')
54 return ok ? ch.ack(msg) : ch.nack(msg)
55 }
56 logger.warn('You should ack the message by true or false!')
57 return ch.ack(msg)
58 })
59 .catch((err) => {
60 logger.warn('the consumer has rejected message:\r\n' + JSON.stringify(payload) +
61 '\r\nError:' + JSON.stringify(err))
62 return ch.nack(msg, false, false)
63 })
64 })
65 })
66}
67
68const __createExchange = (ch, name, config) => {
69 let queues = []
70 return ch.assertExchange(name, 'topic', {
71 durable: false
72 })
73 .then(() => {
74 __.each(config.queues, (element, key) => {
75 queues.push(__createQueue(ch, name, key, element))
76 })
77 return Promise.all(queues)
78 })
79 .then(() => {
80 __publishes[name] = new __Publish(name)
81 })
82}
83
84const __start = (config) => {
85 logger.debug('MQ connection: ' + config.connect)
86 return amqp.connect(config.connect)
87 .then((conn) => {
88 logger.debug('MQ connected successfully!')
89 __connTimes = 0
90 __conn = conn
91 return __conn.createChannel()
92 })
93 .then((ch) => {
94 let exchanges = []
95 __.each(config.exchanges, (element, key) => {
96 exchanges.push(__createExchange(ch, key, element))
97 })
98 return Promise.all(exchanges)
99 })
100 .catch(err => {
101 ++__connTimes
102 logger.error('Times: ' + __connTimes)
103 logger.error('Failed to connect to MQ:\n\r' + JSON.stringify(err, null, 2))
104 setTimeout(__start, 2000, config);
105 })
106}
107
108const rabbitMessageCenter = {
109 start: (config) => {
110 return __start(config)
111 },
112
113 publish: (name, type, msg) => {
114 return __publishes[name].publish(type, msg)
115 }
116}
117module.exports = rabbitMessageCenter
\No newline at end of file