1 | # @anchan828/nest-bull
|
2 |
|
3 | ![npm](https://img.shields.io/npm/v/@anchan828/nest-bull.svg)
|
4 | ![NPM](https://img.shields.io/npm/l/@anchan828/nest-bull.svg)
|
5 |
|
6 | ## Description
|
7 |
|
8 | The [Bull](https://github.com/OptimalBits/bull) module for [Nest](https://github.com/nestjs/nest).
|
9 |
|
10 | ## Installation
|
11 |
|
12 | ```bash
|
13 | $ npm i --save @anchan828/nest-bull bull
|
14 | $ npm i --save-dev @types/bull
|
15 | ```
|
16 |
|
17 | ## Quick Start
|
18 |
|
19 | ### Importing BullModule and Queue component
|
20 |
|
21 | ```ts
|
22 | import { BullModule } from "@anchan828/nest-bull";
|
23 | import { Module } from "@nestjs/common";
|
24 | import { AppController } from "./app.controller";
|
25 | import { AppQueue } from "./app.queue";
|
26 | import { AppService } from "./app.service";
|
27 |
|
28 | @Module({
|
29 | imports: [
|
30 | BullModule.forRoot({
|
31 | queues: [__dirname + "/**/*.queue{.ts,.js}"],
|
32 | options: {
|
33 | redis: {
|
34 | host: "127.0.0.1",
|
35 | },
|
36 | },
|
37 | }),
|
38 | ],
|
39 | controllers: [AppController],
|
40 | providers: [AppService, AppQueue],
|
41 | })
|
42 | export class AppModule {}
|
43 | ```
|
44 |
|
45 | ### Creating queue class
|
46 |
|
47 | ```ts
|
48 | import { BullQueue, BullQueueProcess } from "@anchan828/nest-bull";
|
49 | import { Job } from "bull";
|
50 | import { APP_QUEUE } from "./app.constants";
|
51 | import { AppService } from "./app.service";
|
52 |
|
53 | @BullQueue({ name: APP_QUEUE })
|
54 | export class AppQueue {
|
55 | constructor(private readonly service: AppService) {}
|
56 |
|
57 | @BullQueueProcess()
|
58 | public async process(job: Job) {
|
59 | console.log("called process", job.data, this.service.root());
|
60 | }
|
61 | }
|
62 | ```
|
63 |
|
64 | ### Adding job
|
65 |
|
66 | ```ts
|
67 | import { Controller, Get, Inject } from "@nestjs/common";
|
68 | import { JobId, Queue } from "bull";
|
69 | import { APP_QUEUE } from "./app.constants";
|
70 | import { BullQueueInject } from "@anchan828/nest-bull";
|
71 |
|
72 | @Controller()
|
73 | export class AppController {
|
74 | constructor(
|
75 | @BullQueueInject(APP_QUEUE)
|
76 | private readonly queue: Queue,
|
77 | ) {}
|
78 |
|
79 | @Get()
|
80 | async root(): Promise<JobId> {
|
81 | const job = await this.queue.add({ text: "text" });
|
82 | return job.id;
|
83 | }
|
84 | }
|
85 | ```
|
86 |
|
87 | ### Override queue settings per queue
|
88 |
|
89 | ```ts
|
90 | @BullQueue({
|
91 | name: APP_QUEUE,
|
92 | options: {
|
93 | redis: {
|
94 | db: 3,
|
95 | },
|
96 | },
|
97 | })
|
98 | export class AppQueue {
|
99 | // queue.add('processorName1', data);
|
100 | @BullQueueProcess({
|
101 | name: "processorName1",
|
102 | concurrency: 3,
|
103 | })
|
104 | async process1(job: Job) {
|
105 | throw new Error(`throw error ${JSON.stringify(job.data)}`);
|
106 | }
|
107 |
|
108 | // queue.add('processorName2', data);
|
109 | @BullQueueProcess({
|
110 | name: "processorName2",
|
111 | })
|
112 | async process2(job: Job) {
|
113 | throw new Error(`throw error ${JSON.stringify(job.data)}`);
|
114 | }
|
115 | }
|
116 | ```
|
117 |
|
118 | Handling events
|
119 |
|
120 | ```ts
|
121 | @BullQueue({ name: APP_QUEUE })
|
122 | export class AppQueue {
|
123 | constructor(private readonly service: AppService) {}
|
124 |
|
125 | @BullQueueProcess()
|
126 | public async process(job: Job) {
|
127 | console.log("called process", job.data, this.service.root());
|
128 | }
|
129 |
|
130 | @BullQueueEventProgress()
|
131 | public async progress(job: Job, progress: number) {
|
132 | console.log("progress", job.id, progress);
|
133 | }
|
134 |
|
135 | @BullQueueEventCompleted()
|
136 | public async completed(job: Job, result: any) {
|
137 | console.log("completed", job.id, result);
|
138 | }
|
139 |
|
140 | @BullQueueEventFailed()
|
141 | public async failed(job: Job, error: Error) {
|
142 | console.error("failed", job.id, error);
|
143 | }
|
144 | }
|
145 | ```
|
146 |
|
147 | ### Getting Queue using BullService
|
148 |
|
149 | ```ts
|
150 | import { Controller, Get, Inject } from "@nestjs/common";
|
151 | import { JobId, Queue } from "bull";
|
152 | import { APP_QUEUE } from "./app.constants";
|
153 | import { BullService, BULL_MODULE_SERVICE } from "@anchan828/nest-bull";
|
154 |
|
155 | @Controller()
|
156 | export class AppController {
|
157 | constructor(
|
158 | @Inject(BULL_MODULE_SERVICE)
|
159 | private readonly service: BullService,
|
160 | ) {}
|
161 |
|
162 | @Get()
|
163 | async root(): Promise<JobId> {
|
164 | const job = await this.service.getQueue(APP_QUEUE).add({ text: "text" });
|
165 | return job.id;
|
166 | }
|
167 | }
|
168 | ```
|
169 |
|
170 | ### forRootAsync
|
171 |
|
172 | This package supports forRootAsync. However, you can only BullService if you want to forRootAsync.
|
173 |
|
174 | ### More examples...
|
175 |
|
176 | See example app: https://github.com/anchan828/nest-bull-example
|
177 |
|
178 | And more: https://github.com/anchan828/nest-bull/tree/master/src/examples
|
179 |
|
180 | ### Extra
|
181 |
|
182 | There are extra options.
|
183 |
|
184 | ```ts
|
185 | export interface BullQueueExtraOptions {
|
186 | defaultProcessorOptions?: {
|
187 | /**
|
188 | * Bull will then call your handler in parallel respecting this maximum value.
|
189 | */
|
190 | concurrency?: number;
|
191 |
|
192 | /**
|
193 | * Skip call this processor if true.
|
194 | */
|
195 | skip?: boolean;
|
196 | };
|
197 |
|
198 | defaultJobOptions?: {
|
199 | /**
|
200 | * Set TTL when job in the completed. (Default: -1)
|
201 | */
|
202 | setTTLOnComplete?: number;
|
203 | /**
|
204 | * Set TTL when job in the failed. (Default: -1)
|
205 | */
|
206 | setTTLOnFail?: number;
|
207 | };
|
208 | }
|
209 | ```
|
210 |
|
211 | You can set options to module and per queue.
|
212 |
|
213 | ```ts
|
214 | @Module({
|
215 | imports: [
|
216 | BullModule.forRoot({
|
217 | queues: [__dirname + "/**/*.queue{.ts,.js}"],
|
218 | options: {
|
219 | redis: {
|
220 | host: "127.0.0.1",
|
221 | },
|
222 | },
|
223 | extra: {
|
224 | defaultProcessorOptions: {
|
225 | concurrency: 3,
|
226 | },
|
227 | defaultJobOptions: {
|
228 | setTTLOnComplete: 30,
|
229 | },
|
230 | },
|
231 | }),
|
232 | ],
|
233 | controllers: [AppController],
|
234 | providers: [AppService, AppQueue],
|
235 | })
|
236 | export class AppModule {}
|
237 | ```
|
238 |
|
239 | ```ts
|
240 | @BullQueue({
|
241 | name: APP_QUEUE,
|
242 | extra: {
|
243 | defaultJobOptions: {
|
244 | setTTLOnComplete: 300,
|
245 | },
|
246 | },
|
247 | })
|
248 | export class AppQueue {
|
249 | @BullQueueProcess()
|
250 | public async process(job: Job) {
|
251 | return Promise.resolve();
|
252 | }
|
253 | }
|
254 | ```
|
255 |
|
256 | ## Testing
|
257 |
|
258 | Example for TestingModule
|
259 |
|
260 | Set `mock: true` if you don't want to create Queue instance.
|
261 | BullModule create mock instance instead of Queue.
|
262 |
|
263 | ```ts
|
264 | @Module({
|
265 | imports: [
|
266 | BullModule.forRoot({
|
267 | queues: [__filename],
|
268 | mock: true,
|
269 | }),
|
270 | ],
|
271 | })
|
272 | export class ApplicationModule {}
|
273 | ```
|
274 |
|
275 | Or you can use createTestBullProvider
|
276 |
|
277 | ```ts
|
278 | import { BullQueueInject } from "@anchan828/nest-bull";
|
279 |
|
280 | @Injectable()
|
281 | export class Service {
|
282 | constructor(
|
283 | @BullQueueInject("Queue name")
|
284 | private readonly queue: Queue,
|
285 | ) {}
|
286 |
|
287 | public async someMethod() {
|
288 | await this.queue.add({ key: "value" });
|
289 | }
|
290 | }
|
291 | ```
|
292 |
|
293 | ```ts
|
294 | import { createTestBullProvider } from "@anchan828/nest-bull/dist/testing";
|
295 | const app: TestingModule = await Test.createTestingModule({
|
296 | providers: [Service, createTestBullProvider("Queue name")],
|
297 | }).compile();
|
298 | ```
|
299 |
|
300 | ## License
|
301 |
|
302 | [MIT](LICENSE).
|