UNPKG

5.13 kBJavaScriptView Raw
1"use strict"
2
3const EVERY_30_MINUTES = 60 * 1000 * 30
4
5class BgProcessingInitializer {
6 constructor(logger, applicationContext, Queue, createRedisClient) {
7 this.logger = logger
8 this.jobDefinitions = {}
9 this.jobsCallback = async () => {}
10 this.applicationContext = applicationContext
11 this.Queue = Queue
12 this.createRedisClient = createRedisClient
13 this.queues = {}
14 this.redisClients = {
15 __default: []
16 }
17 this.cleanupTimeout = null
18 }
19
20 async start() {
21 await this._initializeJobs()
22 await this._initializeProcessors()
23 this._startCleanup()
24 }
25
26 async stop() {
27 this._stopCleanup()
28 await this._closeQueues()
29 await this._disconnectRedisClients()
30 }
31
32 async initializeQueues(jobDefinitions) {
33 this.jobDefinitions = jobDefinitions
34
35 let Queue = this.Queue
36 let jobNames = Object.keys(this.jobDefinitions)
37 for (let name of jobNames) {
38 this.queues[name] = new Queue(name, this._queueOptions)
39 }
40 }
41
42 async initializeJobs(jobsCallback) {
43 this.jobsCallback = jobsCallback
44 }
45
46 async runCleanup() {
47 await Promise.all(
48 Object.values(this.queues).map((queue) => {
49 if (queue.clean) {
50 return queue.clean(100).catch(e => this.logger.error(e))
51 }
52
53 return Promise.reslove()
54 })
55 )
56 }
57
58 async fetchStatus() {
59 let statuses = await Promise.all(
60 Object.values(this.queues).map((queue) => {
61 return queue.getJobCounts().then((counts) => [queue.name, counts])
62 })
63 )
64
65 return statuses.reduce((acc, status) => {
66 acc[status[0]] = status[1]
67 return acc
68 }, {})
69 }
70
71 async _initializeJobs() {
72 await this.jobsCallback.call(this, this.queues)
73 }
74
75 async _initializeProcessors() {
76 let jobNames = Object.keys(this.jobDefinitions)
77 for (let name of jobNames) {
78 let queue = this.queues[name]
79 let processor = this.jobDefinitions[name]
80 let concurrency = 1
81 if (Array.isArray(processor)) {
82 concurrency = processor[0]
83 processor = processor[1]
84 }
85 this._assignEventHandlers(queue)
86 this._assignProcessor(queue, processor, concurrency)
87 }
88 }
89
90 _assignProcessor(queue, process, concurency) {
91 queue.process("*", concurency, async (job) => await process(this.applicationContext, job))
92 }
93
94 _prettyPrintJob(job) {
95 return `Job[${job.id}] { ${Object.keys(job.data).map(k => `${k}: ${job.data[k]}`).join(", ")} }`
96 }
97
98 _assignEventHandlers(queue) {
99 let logger = this.logger.child(queue.name)
100 queue
101 .on("error", (error) => logger.error("Queue error", error))
102 .on("paused", () => logger.info("Queue paused"))
103 .on("resumed", () => logger.info("Queue resumed"))
104 .on("drained", () => logger.debug("Queue drained"))
105 .on("cleaned", (jobIds, type) => jobIds.length && logger.debug(`Jobs(${type}) cleaned: ${jobIds}`))
106 //waiting is internal event and cause warns
107 //.on("waiting", (jobId) => logger.debug(`Job[${jobId}] waiting`))
108 .on("active", (job) => logger.debug(`${this._prettyPrintJob(job)} started`))
109 .on("stalled", (job) => logger.error(`${this._prettyPrintJob(job)} stalled`))
110 .on("progress", (job, progress) => logger.debug(`${this._prettyPrintJob(job)} progress ${progress * 100}`))
111 .on("completed", (job) => logger.info( `${this._prettyPrintJob(job)} completed`))
112 .on("failed", (job, err) => logger.error(`${this._prettyPrintJob(job)} failed`, err))
113 .on("removed", (job) => logger.debug(`${this._prettyPrintJob(job)} removed`))
114 }
115
116 async _closeQueues() {
117 await Promise.all(
118 Object.values(this.queues).map((queue) => queue.close())
119 )
120 }
121
122 async _disconnectRedisClients() {
123 let disconnects = []
124
125
126 if (this.redisClients.client) disconnects.push(this._disconnectRedisClient(this.redisClients.client))
127 if (this.redisClients.subscriber) disconnects.push(this._disconnectRedisClient(this.redisClients.subscriber))
128 this.redisClients.__default.forEach((client) => {
129 disconnects.push( this._disconnectRedisClient(client) )
130 })
131
132 await Promise.all(disconnects)
133 }
134
135 _disconnectRedisClient(client) {
136 return client
137 .quit()
138 .catch((e) => this.logger.debug(e))
139 }
140
141 _startCleanup() {
142 this.cleanupTimeout = setTimeout(async () => {
143 await this.runCleanup()
144 this._startCleanup()
145 }, EVERY_30_MINUTES)
146 }
147
148 _stopCleanup() {
149 clearTimeout(this.cleanupTimeout)
150 }
151
152 get _queueOptions() {
153 return {
154 createClient: this._createClient.bind(this)
155 }
156 }
157
158 _createClient(type) {
159 if (type === "client" || type === "subscriber") {
160 if (!this.redisClients[type]) {
161 this.redisClients[type] = this.createRedisClient()
162 }
163 return this.redisClients[type]
164 } else {
165 let client = this.createRedisClient()
166 this.redisClients.__default.push(client)
167 return client
168 }
169 }
170}
171
172module.exports = BgProcessingInitializer