1 | 'use strict'
|
2 |
|
3 | const _ = require('lodash')
|
4 | const debug = require('debug')('loopback:component:mq')
|
5 | const loopback = require('loopback')
|
6 | const setupModel = require('./setup-model')
|
7 | const Rabbit = require('./rabbit')
|
8 | let rabbit = null
|
9 | let msqQueue = null
|
10 |
|
11 | function setupQueue(name) {
|
12 | debug('setupQueue: queue: %s', name)
|
13 | msqQueue = rabbit.exchange.queue({ name, durable: true })
|
14 | }
|
15 |
|
16 | function setupQueueConsumer(app, queue, definition) {
|
17 | const modelName = definition.model
|
18 | const methodName = definition.method
|
19 | const Model = app.models[modelName]
|
20 |
|
21 |
|
22 | if (!Model) {
|
23 | throw new Error(`setupQueueConsumer: Model not found: ${modelName}`)
|
24 | }
|
25 |
|
26 | const Method = Model[methodName]
|
27 |
|
28 |
|
29 | if (!Method) {
|
30 | console.warn(`setupQueueConsumer: Method not found: ${modelName}.${methodName}`)
|
31 | }
|
32 |
|
33 |
|
34 | msqQueue.consume(Method)
|
35 |
|
36 | debug('setupQueueConsumer: queue: %s, model: %s, method: %s', queue, modelName, methodName)
|
37 | }
|
38 |
|
39 | function setupQueueProducer(app, queue, definition) {
|
40 | const modelName = definition.model
|
41 | const methodName = definition.method
|
42 | const Model = app.models[modelName]
|
43 |
|
44 | if (!Model) {
|
45 | throw new Error(`setupQueueProducer: Model not found: ${modelName}`)
|
46 | }
|
47 |
|
48 | debug('setupQueueProducer: queue: %s, model: %s, method: %s', queue, modelName, methodName)
|
49 | Model[methodName] = function queueProducer(params) {
|
50 | debug(`${modelName}.${methodName}(%o)`, params)
|
51 | rabbit.exchange.publish(params, { key: queue })
|
52 | }
|
53 | }
|
54 |
|
55 | module.exports = function loopbackComponentMq(app, config) {
|
56 | const options = config.options || {}
|
57 | const topology = config.topology || {}
|
58 |
|
59 | debug('options: %o', options)
|
60 | debug('topology: %o', topology)
|
61 |
|
62 | if (!options.dataSource) {
|
63 | debug('options.dataSource not set, using default value \'rabbit\'')
|
64 | options.dataSource = 'rabbit'
|
65 | }
|
66 |
|
67 | const ds = app.dataSources[options.dataSource]
|
68 |
|
69 | if (ds) {
|
70 | rabbit = new Rabbit(ds.settings.options || {})
|
71 |
|
72 |
|
73 | _.forEach(topology, (handlers, queue) => {
|
74 |
|
75 |
|
76 | setupQueue(queue)
|
77 |
|
78 |
|
79 | if (handlers.consumer) {
|
80 | setupQueueConsumer(app, queue, handlers.consumer)
|
81 | }
|
82 |
|
83 |
|
84 | if (handlers.producer) {
|
85 | setupQueueProducer(app, queue, handlers.producer)
|
86 | }
|
87 |
|
88 | })
|
89 |
|
90 | setupModel(app, ds, rabbit, topology)
|
91 | }
|
92 | else {
|
93 | debug(`DataSource ${options.dataSource} not found`)
|
94 | const memDs = loopback.createDataSource({ connector: 'memory' })
|
95 |
|
96 | setupModel(app, memDs, {}, {})
|
97 | }
|
98 |
|
99 | }
|