1 | const { join, resolve } = require('path');
|
2 | const _ = require('lodash');
|
3 | const requirePath = require('require-path');
|
4 | const Arena = require('bull-arena');
|
5 | const Queue = require('bull');
|
6 |
|
7 | const Redis = require('ioredis');
|
8 |
|
9 | let client;
|
10 | let subscriber;
|
11 |
|
12 | const JobBase = require('./job_base');
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 | class MicroQueueAdaptor {
|
26 |
|
27 | |
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 | static init(Core) {
|
38 | const opts = createClient();
|
39 |
|
40 | return loadJobs().then(modules => {
|
41 | modules.forEach(module => {
|
42 | const queue = new Queue(module.name, _.merge(opts, { prefix: `{${module.name}}` }));
|
43 |
|
44 | queue.process(module.name, module.concurrency, join(this.QUEUE_ROOT, 'worker.js'));
|
45 |
|
46 | module.queue = queue;
|
47 |
|
48 | this.jobs[module.name] = module;
|
49 | });
|
50 |
|
51 | return Core.AdaptorInitializer.add('Queue', this);
|
52 | });
|
53 | }
|
54 |
|
55 | |
56 |
|
57 |
|
58 |
|
59 | static afterInitialize(Core) {
|
60 | Core.Logger.silly('Queue after initialize: mount queue ui');
|
61 |
|
62 | return mountArena(Core, this.jobs);
|
63 | }
|
64 |
|
65 | |
66 |
|
67 |
|
68 |
|
69 | static connect(App) {
|
70 | const jobs = {};
|
71 |
|
72 | Object.keys(this.jobs).forEach(key => {
|
73 | const Job = this.jobs[key];
|
74 | jobs[key] = new Job(App);
|
75 | });
|
76 |
|
77 | return new this(jobs);
|
78 | }
|
79 |
|
80 | static loadJobs() {
|
81 | return loadJobs().then(modules => {
|
82 | const jobs = {};
|
83 |
|
84 | modules.forEach(module => {
|
85 | jobs[module.name] = module;
|
86 | });
|
87 |
|
88 | return jobs;
|
89 | });
|
90 | }
|
91 |
|
92 | constructor(jobs) {
|
93 | this.jobs = jobs;
|
94 | }
|
95 |
|
96 | |
97 |
|
98 |
|
99 |
|
100 |
|
101 |
|
102 |
|
103 | get(name) {
|
104 | return this.jobs[name];
|
105 | }
|
106 | }
|
107 |
|
108 | function createClient() {
|
109 | const { REDIS_HOST } = process.env;
|
110 | client = client || new Redis(REDIS_HOST);
|
111 | subscriber = subscriber || new Redis(REDIS_HOST);
|
112 |
|
113 | return {
|
114 | createClient(type) {
|
115 | switch (type) {
|
116 | case 'client':
|
117 | return client;
|
118 | case 'subscriber':
|
119 | return subscriber;
|
120 | default:
|
121 | return new Redis(REDIS_HOST);
|
122 | }
|
123 | }
|
124 | };
|
125 | }
|
126 |
|
127 | function mountArena(Core, jobs) {
|
128 | if (!Core.Server) {
|
129 | return Promise.resolve(undefined);
|
130 | }
|
131 |
|
132 | const names = Object.keys(jobs);
|
133 |
|
134 | const arena = Arena({
|
135 | queues: names.map(name => ({
|
136 | name,
|
137 | hostId: 'Queues',
|
138 | prefix: `{${name}}`,
|
139 | redis: { host: process.env.REDIS_HOST }
|
140 | }))
|
141 | }, {
|
142 | disableListen: true
|
143 | });
|
144 |
|
145 | Core.Server.use('/arena', arena);
|
146 |
|
147 | return Promise.resolve('mounted Arena');
|
148 | }
|
149 |
|
150 | function loadJobs() {
|
151 | const { APP_ROOT } = process.env;
|
152 | const jobsPath = join(APP_ROOT, 'app', 'jobs');
|
153 |
|
154 | return requirePath({
|
155 | path: jobsPath,
|
156 | include: ['*.job.js']
|
157 | }).then(modules => _.map(modules, (module, key) => module));
|
158 | }
|
159 |
|
160 | module.exports = MicroQueueAdaptor;
|
161 | module.exports.QUEUE_ROOT = resolve(__dirname);
|
162 | module.exports.jobs = {};
|
163 | module.exports.JobBase = JobBase; |
\ | No newline at end of file |