1 | const amqp = require('amqplib'),
|
2 | Promise = require('bluebird'),
|
3 | __ = require('underscore'),
|
4 | logger = require('../app/Logger');
|
5 |
|
6 | let __conn;
|
7 | let __connTimes = 0
|
8 | let __publishes = {}
|
9 |
|
10 | class __Publish {
|
11 | constructor(name) {
|
12 | this.__ex = name
|
13 | }
|
14 |
|
15 | publish(type, msg) {
|
16 | const payload = Buffer.from(JSON.stringify(msg))
|
17 | let channel
|
18 | let ex = this.__ex
|
19 | return __conn.createConfirmChannel()
|
20 | .then((ch) => {
|
21 | channel = ch
|
22 | return ch.assertExchange(ex, 'topic', {
|
23 | durable: false
|
24 | })
|
25 | })
|
26 | .then(function () {
|
27 | return channel.publish(ex, type, payload, {}, (err, ok) => {
|
28 | if (err !== null)
|
29 | return Promise.reject(err)
|
30 | else
|
31 | return ok
|
32 | });
|
33 | });
|
34 | }
|
35 | }
|
36 |
|
37 | const __createQueue = (ch, ex, name, config) => {
|
38 | let queue
|
39 | return ch.assertQueue(name, {
|
40 | durable: false
|
41 | })
|
42 | .then((q) => {
|
43 | queue = q.queue
|
44 | return ch.bindQueue(queue, ex, config.topic)
|
45 | })
|
46 | .then(() => {
|
47 | return ch.consume(queue, (msg) => {
|
48 | let payload = JSON.parse(msg.content.toString())
|
49 | return config.consumer(payload)
|
50 | .then((ok) => {
|
51 | if (ok === true || ok === false) {
|
52 | if (!ok)
|
53 | logger.warn('The message consumer decide to requeue the message!')
|
54 | return ok ? ch.ack(msg) : ch.nack(msg)
|
55 | }
|
56 | logger.warn('You should ack the message by true or false!')
|
57 | return ch.ack(msg)
|
58 | })
|
59 | .catch((err) => {
|
60 | logger.warn('the consumer has rejected message:\r\n' + JSON.stringify(payload) +
|
61 | '\r\nError:' + JSON.stringify(err))
|
62 | return ch.nack(msg, false, false)
|
63 | })
|
64 | })
|
65 | })
|
66 | }
|
67 |
|
68 | const __createExchange = (ch, name, config) => {
|
69 | let queues = []
|
70 | return ch.assertExchange(name, 'topic', {
|
71 | durable: false
|
72 | })
|
73 | .then(() => {
|
74 | __.each(config.queues, (element, key) => {
|
75 | queues.push(__createQueue(ch, name, key, element))
|
76 | })
|
77 | return Promise.all(queues)
|
78 | })
|
79 | .then(() => {
|
80 | __publishes[name] = new __Publish(name)
|
81 | })
|
82 | }
|
83 |
|
84 | const __start = (config) => {
|
85 | logger.debug('MQ connection: ' + config.connect)
|
86 | return amqp.connect(config.connect)
|
87 | .then((conn) => {
|
88 | logger.debug('MQ connected successfully!')
|
89 | __connTimes = 0
|
90 | __conn = conn
|
91 | return __conn.createChannel()
|
92 | })
|
93 | .then((ch) => {
|
94 | let exchanges = []
|
95 | __.each(config.exchanges, (element, key) => {
|
96 | exchanges.push(__createExchange(ch, key, element))
|
97 | })
|
98 | return Promise.all(exchanges)
|
99 | })
|
100 | .catch(err => {
|
101 | ++__connTimes
|
102 | logger.error('Times: ' + __connTimes)
|
103 | logger.error('Failed to connect to MQ:\n\r' + JSON.stringify(err, null, 2))
|
104 | setTimeout(__start, 2000, config);
|
105 | })
|
106 | }
|
107 |
|
108 | const rabbitMessageCenter = {
|
109 | start: (config) => {
|
110 | return __start(config)
|
111 | },
|
112 |
|
113 | publish: (name, type, msg) => {
|
114 | return __publishes[name].publish(type, msg)
|
115 | }
|
116 | }
|
117 | module.exports = rabbitMessageCenter |
\ | No newline at end of file |