1 | "use strict"
|
2 |
|
3 | const EVERY_30_MINUTES = 60 * 1000 * 30
|
4 |
|
5 | class BgProcessingInitializer {
|
6 | constructor(logger, applicationContext, Queue, createRedisClient) {
|
7 | this.logger = logger
|
8 | this.jobDefinitions = {}
|
9 | this.jobsCallback = async () => {}
|
10 | this.applicationContext = applicationContext
|
11 | this.Queue = Queue
|
12 | this.createRedisClient = createRedisClient
|
13 | this.queues = {}
|
14 | this.redisClients = {
|
15 | __default: []
|
16 | }
|
17 | this.cleanupTimeout = null
|
18 | }
|
19 |
|
20 | async start() {
|
21 | await this._initializeJobs()
|
22 | await this._initializeProcessors()
|
23 | this._startCleanup()
|
24 | }
|
25 |
|
26 | async stop() {
|
27 | this._stopCleanup()
|
28 | await this._closeQueues()
|
29 | await this._disconnectRedisClients()
|
30 | }
|
31 |
|
32 | async initializeQueues(jobDefinitions) {
|
33 | this.jobDefinitions = jobDefinitions
|
34 |
|
35 | let Queue = this.Queue
|
36 | let jobNames = Object.keys(this.jobDefinitions)
|
37 | for (let name of jobNames) {
|
38 | this.queues[name] = new Queue(name, this._queueOptions)
|
39 | }
|
40 | }
|
41 |
|
42 | async initializeJobs(jobsCallback) {
|
43 | this.jobsCallback = jobsCallback
|
44 | }
|
45 |
|
46 | async runCleanup() {
|
47 | await Promise.all(
|
48 | Object.values(this.queues).map((queue) => {
|
49 | if (queue.clean) {
|
50 | return queue.clean(100).catch(e => this.logger.error(e))
|
51 | }
|
52 |
|
53 | return Promise.reslove()
|
54 | })
|
55 | )
|
56 | }
|
57 |
|
58 | async fetchStatus() {
|
59 | let statuses = await Promise.all(
|
60 | Object.values(this.queues).map((queue) => {
|
61 | return queue.getJobCounts().then((counts) => [queue.name, counts])
|
62 | })
|
63 | )
|
64 |
|
65 | return statuses.reduce((acc, status) => {
|
66 | acc[status[0]] = status[1]
|
67 | return acc
|
68 | }, {})
|
69 | }
|
70 |
|
71 | async _initializeJobs() {
|
72 | await this.jobsCallback.call(this, this.queues)
|
73 | }
|
74 |
|
75 | async _initializeProcessors() {
|
76 | let jobNames = Object.keys(this.jobDefinitions)
|
77 | for (let name of jobNames) {
|
78 | let queue = this.queues[name]
|
79 | let processor = this.jobDefinitions[name]
|
80 | let concurrency = 1
|
81 | if (Array.isArray(processor)) {
|
82 | concurrency = processor[0]
|
83 | processor = processor[1]
|
84 | }
|
85 | this._assignEventHandlers(queue)
|
86 | this._assignProcessor(queue, processor, concurrency)
|
87 | }
|
88 | }
|
89 |
|
90 | _assignProcessor(queue, process, concurency) {
|
91 | queue.process("*", concurency, async (job) => await process(this.applicationContext, job))
|
92 | }
|
93 |
|
94 | _prettyPrintJob(job) {
|
95 | return `Job[${job.id}] { ${Object.keys(job.data).map(k => `${k}: ${job.data[k]}`).join(", ")} }`
|
96 | }
|
97 |
|
98 | _assignEventHandlers(queue) {
|
99 | let logger = this.logger.child(queue.name)
|
100 | queue
|
101 | .on("error", (error) => logger.error("Queue error", error))
|
102 | .on("paused", () => logger.info("Queue paused"))
|
103 | .on("resumed", () => logger.info("Queue resumed"))
|
104 | .on("drained", () => logger.debug("Queue drained"))
|
105 | .on("cleaned", (jobIds, type) => jobIds.length && logger.debug(`Jobs(${type}) cleaned: ${jobIds}`))
|
106 |
|
107 |
|
108 | .on("active", (job) => logger.debug(`${this._prettyPrintJob(job)} started`))
|
109 | .on("stalled", (job) => logger.error(`${this._prettyPrintJob(job)} stalled`))
|
110 | .on("progress", (job, progress) => logger.debug(`${this._prettyPrintJob(job)} progress ${progress * 100}`))
|
111 | .on("completed", (job) => logger.info( `${this._prettyPrintJob(job)} completed`))
|
112 | .on("failed", (job, err) => logger.error(`${this._prettyPrintJob(job)} failed`, err))
|
113 | .on("removed", (job) => logger.debug(`${this._prettyPrintJob(job)} removed`))
|
114 | }
|
115 |
|
116 | async _closeQueues() {
|
117 | await Promise.all(
|
118 | Object.values(this.queues).map((queue) => queue.close())
|
119 | )
|
120 | }
|
121 |
|
122 | async _disconnectRedisClients() {
|
123 | let disconnects = []
|
124 |
|
125 |
|
126 | if (this.redisClients.client) disconnects.push(this._disconnectRedisClient(this.redisClients.client))
|
127 | if (this.redisClients.subscriber) disconnects.push(this._disconnectRedisClient(this.redisClients.subscriber))
|
128 | this.redisClients.__default.forEach((client) => {
|
129 | disconnects.push( this._disconnectRedisClient(client) )
|
130 | })
|
131 |
|
132 | await Promise.all(disconnects)
|
133 | }
|
134 |
|
135 | _disconnectRedisClient(client) {
|
136 | return client
|
137 | .quit()
|
138 | .catch((e) => this.logger.debug(e))
|
139 | }
|
140 |
|
141 | _startCleanup() {
|
142 | this.cleanupTimeout = setTimeout(async () => {
|
143 | await this.runCleanup()
|
144 | this._startCleanup()
|
145 | }, EVERY_30_MINUTES)
|
146 | }
|
147 |
|
148 | _stopCleanup() {
|
149 | clearTimeout(this.cleanupTimeout)
|
150 | }
|
151 |
|
152 | get _queueOptions() {
|
153 | return {
|
154 | createClient: this._createClient.bind(this)
|
155 | }
|
156 | }
|
157 |
|
158 | _createClient(type) {
|
159 | if (type === "client" || type === "subscriber") {
|
160 | if (!this.redisClients[type]) {
|
161 | this.redisClients[type] = this.createRedisClient()
|
162 | }
|
163 | return this.redisClients[type]
|
164 | } else {
|
165 | let client = this.createRedisClient()
|
166 | this.redisClients.__default.push(client)
|
167 | return client
|
168 | }
|
169 | }
|
170 | }
|
171 |
|
172 | module.exports = BgProcessingInitializer
|