UNPKG

2.71 kBJavaScriptView Raw
1'use strict'
2
3const _ = require('lodash')
4const debug = require('debug')('loopback:component:mq')
5const loopback = require('loopback')
6const setupModel = require('./setup-model')
7const Rabbit = require('./rabbit')
8let rabbit = null
9let msqQueue = null
10
11function setupQueue(name) {
12 debug('setupQueue: queue: %s', name)
13 msqQueue = rabbit.exchange.queue({ name, durable: true })
14}
15
16function setupQueueConsumer(app, queue, definition) {
17 const modelName = definition.model
18 const methodName = definition.method
19 const Model = app.models[modelName]
20
21 // Check if the model exists
22 if (!Model) {
23 throw new Error(`setupQueueConsumer: Model not found: ${modelName}`)
24 }
25
26 const Method = Model[methodName]
27
28 // Check if the method on the model exists
29 if (!Method) {
30 console.warn(`setupQueueConsumer: Method not found: ${modelName}.${methodName}`) // eslint-disable-line no-console
31 }
32
33 // Start consuming the queue
34 msqQueue.consume(Method)
35
36 debug('setupQueueConsumer: queue: %s, model: %s, method: %s', queue, modelName, methodName)
37}
38
39function setupQueueProducer(app, queue, definition) {
40 const modelName = definition.model
41 const methodName = definition.method
42 const Model = app.models[modelName]
43
44 if (!Model) {
45 throw new Error(`setupQueueProducer: Model not found: ${modelName}`)
46 }
47
48 debug('setupQueueProducer: queue: %s, model: %s, method: %s', queue, modelName, methodName)
49 Model[methodName] = function queueProducer(params) {
50 debug(`${modelName}.${methodName}(%o)`, params)
51 rabbit.exchange.publish(params, { key: queue })
52 }
53}
54
55module.exports = function loopbackComponentMq(app, config) {
56 const options = config.options || {}
57 const topology = config.topology || {}
58
59 debug('options: %o', options)
60 debug('topology: %o', topology)
61
62 if (!options.dataSource) {
63 debug('options.dataSource not set, using default value \'rabbit\'')
64 options.dataSource = 'rabbit'
65 }
66
67 const ds = app.dataSources[options.dataSource]
68
69 if (ds) {
70 rabbit = new Rabbit(ds.settings.options || {})
71
72 // Loop through all the defined queues
73 _.forEach(topology, (handlers, queue) => {
74
75 // Setup the actual queue on RabbitMQ
76 setupQueue(queue)
77
78 // Setup the consumer of this queue
79 if (handlers.consumer) {
80 setupQueueConsumer(app, queue, handlers.consumer)
81 }
82
83 // Setup the producers of this queue
84 if (handlers.producer) {
85 setupQueueProducer(app, queue, handlers.producer)
86 }
87
88 })
89
90 setupModel(app, ds, rabbit, topology)
91 }
92 else {
93 debug(`DataSource ${options.dataSource} not found`)
94 const memDs = loopback.createDataSource({ connector: 'memory' })
95
96 setupModel(app, memDs, {}, {})
97 }
98
99}