UNPKG

3.5 kBJavaScriptView Raw
1const { join, resolve } = require('path');
2const _ = require('lodash');
3const requirePath = require('require-path');
4const Arena = require('bull-arena');
5const Queue = require('bull');
6
7const Redis = require('ioredis');
8
9let client;
10let subscriber;
11
12const JobBase = require('./job_base');
13
14/**
15 * Generate a new job
16 *
17 * ```
18 * $ yarn run micro-queue generate {job_name}
19 * ```
20 *
21 * Creates a job inside the /app/jobs/{job_name}.job.js
22 *
23 * @name MicroQueueAdaptor
24*/
25class MicroQueueAdaptor {
26
27 /**
28 * A Micro Core applicaiton calls init() on all adaptors during application
29 * init
30 *
31 * - create the redis client
32 * - load the job classes
33 * - create the job queue instsances
34 * - assign the processor to each queue
35 * - Add this to Core as the Queue Adaptor
36 */
37 static init(Core) {
38 const opts = createClient();
39
40 return loadJobs().then(modules => {
41 modules.forEach(module => {
42 const queue = new Queue(module.name, _.merge(opts, { prefix: `{${module.name}}` }));
43
44 queue.process(module.name, module.concurrency, join(this.QUEUE_ROOT, 'worker.js'));
45
46 module.queue = queue;
47
48 this.jobs[module.name] = module;
49 });
50
51 return Core.AdaptorInitializer.add('Queue', this);
52 });
53 }
54
55 /**
56 * After Micro Core init, load the Arena Queue UI into Core.Server. Only is
57 * Core.Server exists
58 */
59 static afterInitialize(Core) {
60 Core.Logger.silly('Queue after initialize: mount queue ui');
61
62 return mountArena(Core, this.jobs);
63 }
64
65 /**
66 * On App connect, adds the Job instances to the App scoping perform_later to
67 * the App instance;
68 */
69 static connect(App) {
70 const jobs = {};
71
72 Object.keys(this.jobs).forEach(key => {
73 const Job = this.jobs[key];
74 jobs[key] = new Job(App);
75 });
76
77 return new this(jobs);
78 }
79
80 static loadJobs() {
81 return loadJobs().then(modules => {
82 const jobs = {};
83
84 modules.forEach(module => {
85 jobs[module.name] = module;
86 });
87
88 return jobs;
89 });
90 }
91
92 constructor(jobs) {
93 this.jobs = jobs;
94 }
95
96 /**
97 * Get a Job instance from the App instances compiled Job collection
98 *
99 * ```
100 * const Job = App.Queue.get('SomeJobNameJob');
101 * ```
102 */
103 get(name) {
104 return this.jobs[name];
105 }
106}
107
108function createClient() {
109 const { REDIS_HOST } = process.env;
110 client = client || new Redis(REDIS_HOST);
111 subscriber = subscriber || new Redis(REDIS_HOST);
112
113 return {
114 createClient(type) {
115 switch (type) {
116 case 'client':
117 return client;
118 case 'subscriber':
119 return subscriber;
120 default:
121 return new Redis(REDIS_HOST);
122 }
123 }
124 };
125}
126
127function mountArena(Core, jobs) {
128 if (!Core.Server) {
129 return Promise.resolve(undefined);
130 }
131
132 const names = Object.keys(jobs);
133
134 const arena = Arena({
135 queues: names.map(name => ({
136 name,
137 hostId: 'Queues',
138 prefix: `{${name}}`,
139 redis: { host: process.env.REDIS_HOST }
140 }))
141 }, {
142 disableListen: true
143 });
144
145 Core.Server.use('/arena', arena);
146
147 return Promise.resolve('mounted Arena');
148}
149
150function loadJobs() {
151 const { APP_ROOT } = process.env;
152 const jobsPath = join(APP_ROOT, 'app', 'jobs');
153
154 return requirePath({
155 path: jobsPath,
156 include: ['*.job.js']
157 }).then(modules => _.map(modules, (module, key) => module));
158}
159
160module.exports = MicroQueueAdaptor;
161module.exports.QUEUE_ROOT = resolve(__dirname);
162module.exports.jobs = {};
163module.exports.JobBase = JobBase;
\No newline at end of file