UNPKG

6.19 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.BullExplorer = void 0;
4const tslib_1 = require("tslib");
5const bull_shared_1 = require("@nestjs/bull-shared");
6const common_1 = require("@nestjs/common");
7const core_1 = require("@nestjs/core");
8const injector_1 = require("@nestjs/core/injector/injector");
9const bull_metadata_accessor_1 = require("./bull-metadata.accessor");
10let BullExplorer = class BullExplorer {
11 constructor(moduleRef, discoveryService, metadataAccessor, metadataScanner) {
12 this.moduleRef = moduleRef;
13 this.discoveryService = discoveryService;
14 this.metadataAccessor = metadataAccessor;
15 this.metadataScanner = metadataScanner;
16 this.logger = new common_1.Logger('BullModule');
17 this.injector = new injector_1.Injector();
18 }
19 onModuleInit() {
20 this.explore();
21 }
22 explore() {
23 const providers = this.discoveryService
24 .getProviders()
25 .filter((wrapper) => {
26 var _a;
27 return this.metadataAccessor.isQueueComponent(
28 // NOTE: Regarding the ternary statement below,
29 // - The condition `!wrapper.metatype` is because when we use `useValue`
30 // the value of `wrapper.metatype` will be `null`.
31 // - The condition `wrapper.inject` is needed here because when we use
32 // `useFactory`, the value of `wrapper.metatype` will be the supplied
33 // factory function.
34 // For both cases, we should use `wrapper.instance.constructor` instead
35 // of `wrapper.metatype` to resolve processor's class properly.
36 // But since calling `wrapper.instance` could degrade overall performance
37 // we must defer it as much we can. But there's no other way to grab the
38 // right class that could be annotated with `@Processor()` decorator
39 // without using this property.
40 !wrapper.metatype || wrapper.inject
41 ? (_a = wrapper.instance) === null || _a === void 0 ? void 0 : _a.constructor
42 : wrapper.metatype);
43 });
44 providers.forEach((wrapper) => {
45 const { instance, metatype } = wrapper;
46 const isRequestScoped = !wrapper.isDependencyTreeStatic();
47 const { name: queueName } = this.metadataAccessor.getQueueComponentMetadata(
48 // NOTE: We are relying on `instance.constructor` to properly support
49 // `useValue` and `useFactory` providers besides `useClass`.
50 instance.constructor || metatype);
51 const queueToken = (0, bull_shared_1.getQueueToken)(queueName);
52 const bullQueue = this.getQueue(queueToken, queueName);
53 this.metadataScanner.scanFromPrototype(instance, Object.getPrototypeOf(instance), (key) => {
54 if (this.metadataAccessor.isProcessor(instance[key])) {
55 const metadata = this.metadataAccessor.getProcessMetadata(instance[key]);
56 this.handleProcessor(instance, key, bullQueue, wrapper.host, isRequestScoped, metadata);
57 }
58 else if (this.metadataAccessor.isListener(instance[key])) {
59 const metadata = this.metadataAccessor.getListenerMetadata(instance[key]);
60 this.handleListener(instance, key, wrapper, bullQueue, metadata);
61 }
62 });
63 });
64 }
65 getQueue(queueToken, queueName) {
66 try {
67 return this.moduleRef.get(queueToken, { strict: false });
68 }
69 catch (err) {
70 this.logger.error((0, bull_shared_1.NO_QUEUE_FOUND)(queueName));
71 throw err;
72 }
73 }
74 handleProcessor(instance, key, queue, moduleRef, isRequestScoped, options) {
75 let args = [options === null || options === void 0 ? void 0 : options.name, options === null || options === void 0 ? void 0 : options.concurrency];
76 if (isRequestScoped) {
77 const callback = async (...args) => {
78 const contextId = (0, core_1.createContextId)();
79 if (this.moduleRef.registerRequestByContextId) {
80 // Additional condition to prevent breaking changes in
81 // applications that use @nestjs/bull older than v7.4.0.
82 const jobRef = args[0];
83 this.moduleRef.registerRequestByContextId(jobRef, contextId);
84 }
85 const contextInstance = await this.injector.loadPerContext(instance, moduleRef, moduleRef.providers, contextId);
86 return contextInstance[key].call(contextInstance, ...args);
87 };
88 args.push(callback);
89 }
90 else {
91 args.push(instance[key].bind(instance));
92 }
93 args = args.filter((item) => item !== undefined);
94 queue.process.call(queue, ...args);
95 }
96 handleListener(instance, key, wrapper, queue, options) {
97 if (!wrapper.isDependencyTreeStatic()) {
98 this.logger.warn(`Warning! "${wrapper.name}" class is request-scoped and it defines an event listener ("${wrapper.name}#${key}"). Since event listeners cannot be registered on scoped providers, this handler will be ignored.`);
99 return;
100 }
101 if (options.name || options.id) {
102 queue.on(options.eventName, async (jobOrJobId, ...args) => {
103 const job = typeof jobOrJobId === 'string'
104 ? (await queue.getJob(jobOrJobId)) || { name: false, id: false }
105 : jobOrJobId;
106 if (job.name === options.name || job.id === options.id) {
107 return instance[key].apply(instance, [job, ...args]);
108 }
109 });
110 }
111 else {
112 queue.on(options.eventName, instance[key].bind(instance));
113 }
114 }
115};
116BullExplorer = tslib_1.__decorate([
117 (0, common_1.Injectable)(),
118 tslib_1.__metadata("design:paramtypes", [core_1.ModuleRef,
119 core_1.DiscoveryService,
120 bull_metadata_accessor_1.BullMetadataAccessor,
121 core_1.MetadataScanner])
122], BullExplorer);
123exports.BullExplorer = BullExplorer;