UNPKG

6.15 kBMarkdownView Raw
1# @anchan828/nest-bull
2
3![npm](https://img.shields.io/npm/v/@anchan828/nest-bull.svg)
4![NPM](https://img.shields.io/npm/l/@anchan828/nest-bull.svg)
5
6## Description
7
8The [Bull](https://github.com/OptimalBits/bull) module for [Nest](https://github.com/nestjs/nest).
9
10## Installation
11
12```bash
13$ npm i --save @anchan828/nest-bull bull
14$ npm i --save-dev @types/bull
15```
16
17## Quick Start
18
19### Importing BullModule and Queue component
20
21```ts
22import { BullModule } from "@anchan828/nest-bull";
23import { Module } from "@nestjs/common";
24import { AppController } from "./app.controller";
25import { AppQueue } from "./app.queue";
26import { AppService } from "./app.service";
27
28@Module({
29 imports: [
30 BullModule.forRoot({
31 queues: [__dirname + "/**/*.queue{.ts,.js}"],
32 options: {
33 redis: {
34 host: "127.0.0.1",
35 },
36 },
37 }),
38 ],
39 controllers: [AppController],
40 providers: [AppService, AppQueue],
41})
42export class AppModule {}
43```
44
45### Creating queue class
46
47```ts
48import { BullQueue, BullQueueProcess } from "@anchan828/nest-bull";
49import { Job } from "bull";
50import { APP_QUEUE } from "./app.constants";
51import { AppService } from "./app.service";
52
53@BullQueue({ name: APP_QUEUE })
54export class AppQueue {
55 constructor(private readonly service: AppService) {}
56
57 @BullQueueProcess()
58 public async process(job: Job) {
59 console.log("called process", job.data, this.service.root());
60 }
61}
62```
63
64### Adding job
65
66```ts
67import { Controller, Get, Inject } from "@nestjs/common";
68import { JobId, Queue } from "bull";
69import { APP_QUEUE } from "./app.constants";
70import { BullQueueInject } from "@anchan828/nest-bull";
71
72@Controller()
73export class AppController {
74 constructor(
75 @BullQueueInject(APP_QUEUE)
76 private readonly queue: Queue,
77 ) {}
78
79 @Get()
80 async root(): Promise<JobId> {
81 const job = await this.queue.add({ text: "text" });
82 return job.id;
83 }
84}
85```
86
87### Override queue settings per queue
88
89```ts
90@BullQueue({
91 name: APP_QUEUE,
92 options: {
93 redis: {
94 db: 3,
95 },
96 },
97})
98export class AppQueue {
99 // queue.add('processorName1', data);
100 @BullQueueProcess({
101 name: "processorName1",
102 concurrency: 3,
103 })
104 async process1(job: Job) {
105 throw new Error(`throw error ${JSON.stringify(job.data)}`);
106 }
107
108 // queue.add('processorName2', data);
109 @BullQueueProcess({
110 name: "processorName2",
111 })
112 async process2(job: Job) {
113 throw new Error(`throw error ${JSON.stringify(job.data)}`);
114 }
115}
116```
117
118Handling events
119
120```ts
121@BullQueue({ name: APP_QUEUE })
122export class AppQueue {
123 constructor(private readonly service: AppService) {}
124
125 @BullQueueProcess()
126 public async process(job: Job) {
127 console.log("called process", job.data, this.service.root());
128 }
129
130 @BullQueueEventProgress()
131 public async progress(job: Job, progress: number) {
132 console.log("progress", job.id, progress);
133 }
134
135 @BullQueueEventCompleted()
136 public async completed(job: Job, result: any) {
137 console.log("completed", job.id, result);
138 }
139
140 @BullQueueEventFailed()
141 public async failed(job: Job, error: Error) {
142 console.error("failed", job.id, error);
143 }
144}
145```
146
147### Getting Queue using BullService
148
149```ts
150import { Controller, Get, Inject } from "@nestjs/common";
151import { JobId, Queue } from "bull";
152import { APP_QUEUE } from "./app.constants";
153import { BullService, BULL_MODULE_SERVICE } from "@anchan828/nest-bull";
154
155@Controller()
156export class AppController {
157 constructor(
158 @Inject(BULL_MODULE_SERVICE)
159 private readonly service: BullService,
160 ) {}
161
162 @Get()
163 async root(): Promise<JobId> {
164 const job = await this.service.getQueue(APP_QUEUE).add({ text: "text" });
165 return job.id;
166 }
167}
168```
169
170### forRootAsync
171
172This package supports forRootAsync. However, you can only BullService if you want to forRootAsync.
173
174### More examples...
175
176See example app: https://github.com/anchan828/nest-bull-example
177
178And more: https://github.com/anchan828/nest-bull/tree/master/src/examples
179
180### Extra
181
182There are extra options.
183
184```ts
185export interface BullQueueExtraOptions {
186 defaultProcessorOptions?: {
187 /**
188 * Bull will then call your handler in parallel respecting this maximum value.
189 */
190 concurrency?: number;
191
192 /**
193 * Skip call this processor if true.
194 */
195 skip?: boolean;
196 };
197
198 defaultJobOptions?: {
199 /**
200 * Set TTL when job in the completed. (Default: -1)
201 */
202 setTTLOnComplete?: number;
203 /**
204 * Set TTL when job in the failed. (Default: -1)
205 */
206 setTTLOnFail?: number;
207 };
208}
209```
210
211You can set options to module and per queue.
212
213```ts
214@Module({
215 imports: [
216 BullModule.forRoot({
217 queues: [__dirname + "/**/*.queue{.ts,.js}"],
218 options: {
219 redis: {
220 host: "127.0.0.1",
221 },
222 },
223 extra: {
224 defaultProcessorOptions: {
225 concurrency: 3,
226 },
227 defaultJobOptions: {
228 setTTLOnComplete: 30,
229 },
230 },
231 }),
232 ],
233 controllers: [AppController],
234 providers: [AppService, AppQueue],
235})
236export class AppModule {}
237```
238
239```ts
240@BullQueue({
241 name: APP_QUEUE,
242 extra: {
243 defaultJobOptions: {
244 setTTLOnComplete: 300,
245 },
246 },
247})
248export class AppQueue {
249 @BullQueueProcess()
250 public async process(job: Job) {
251 return Promise.resolve();
252 }
253}
254```
255
256## Testing
257
258Example for TestingModule
259
260Set `mock: true` if you don't want to create Queue instance.
261BullModule create mock instance instead of Queue.
262
263```ts
264@Module({
265 imports: [
266 BullModule.forRoot({
267 queues: [__filename],
268 mock: true,
269 }),
270 ],
271})
272export class ApplicationModule {}
273```
274
275Or you can use createTestBullProvider
276
277```ts
278import { BullQueueInject } from "@anchan828/nest-bull";
279
280@Injectable()
281export class Service {
282 constructor(
283 @BullQueueInject("Queue name")
284 private readonly queue: Queue,
285 ) {}
286
287 public async someMethod() {
288 await this.queue.add({ key: "value" });
289 }
290}
291```
292
293```ts
294import { createTestBullProvider } from "@anchan828/nest-bull/dist/testing";
295const app: TestingModule = await Test.createTestingModule({
296 providers: [Service, createTestBullProvider("Queue name")],
297}).compile();
298```
299
300## License
301
302[MIT](LICENSE).