1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.BullExplorer = void 0;
|
4 | const tslib_1 = require("tslib");
|
5 | const bull_shared_1 = require("@nestjs/bull-shared");
|
6 | const common_1 = require("@nestjs/common");
|
7 | const core_1 = require("@nestjs/core");
|
8 | const injector_1 = require("@nestjs/core/injector/injector");
|
9 | const bull_metadata_accessor_1 = require("./bull-metadata.accessor");
|
10 | let BullExplorer = class BullExplorer {
|
11 | constructor(moduleRef, discoveryService, metadataAccessor, metadataScanner) {
|
12 | this.moduleRef = moduleRef;
|
13 | this.discoveryService = discoveryService;
|
14 | this.metadataAccessor = metadataAccessor;
|
15 | this.metadataScanner = metadataScanner;
|
16 | this.logger = new common_1.Logger('BullModule');
|
17 | this.injector = new injector_1.Injector();
|
18 | }
|
19 | onModuleInit() {
|
20 | this.explore();
|
21 | }
|
22 | explore() {
|
23 | const providers = this.discoveryService
|
24 | .getProviders()
|
25 | .filter((wrapper) => {
|
26 | var _a;
|
27 | return this.metadataAccessor.isQueueComponent(
|
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 |
|
38 |
|
39 |
|
40 | !wrapper.metatype || wrapper.inject
|
41 | ? (_a = wrapper.instance) === null || _a === void 0 ? void 0 : _a.constructor
|
42 | : wrapper.metatype);
|
43 | });
|
44 | providers.forEach((wrapper) => {
|
45 | const { instance, metatype } = wrapper;
|
46 | const isRequestScoped = !wrapper.isDependencyTreeStatic();
|
47 | const { name: queueName } = this.metadataAccessor.getQueueComponentMetadata(
|
48 |
|
49 |
|
50 | instance.constructor || metatype);
|
51 | const queueToken = (0, bull_shared_1.getQueueToken)(queueName);
|
52 | const bullQueue = this.getQueue(queueToken, queueName);
|
53 | this.metadataScanner.scanFromPrototype(instance, Object.getPrototypeOf(instance), (key) => {
|
54 | if (this.metadataAccessor.isProcessor(instance[key])) {
|
55 | const metadata = this.metadataAccessor.getProcessMetadata(instance[key]);
|
56 | this.handleProcessor(instance, key, bullQueue, wrapper.host, isRequestScoped, metadata);
|
57 | }
|
58 | else if (this.metadataAccessor.isListener(instance[key])) {
|
59 | const metadata = this.metadataAccessor.getListenerMetadata(instance[key]);
|
60 | this.handleListener(instance, key, wrapper, bullQueue, metadata);
|
61 | }
|
62 | });
|
63 | });
|
64 | }
|
65 | getQueue(queueToken, queueName) {
|
66 | try {
|
67 | return this.moduleRef.get(queueToken, { strict: false });
|
68 | }
|
69 | catch (err) {
|
70 | this.logger.error((0, bull_shared_1.NO_QUEUE_FOUND)(queueName));
|
71 | throw err;
|
72 | }
|
73 | }
|
74 | handleProcessor(instance, key, queue, moduleRef, isRequestScoped, options) {
|
75 | let args = [options === null || options === void 0 ? void 0 : options.name, options === null || options === void 0 ? void 0 : options.concurrency];
|
76 | if (isRequestScoped) {
|
77 | const callback = async (...args) => {
|
78 | const contextId = (0, core_1.createContextId)();
|
79 | if (this.moduleRef.registerRequestByContextId) {
|
80 |
|
81 |
|
82 | const jobRef = args[0];
|
83 | this.moduleRef.registerRequestByContextId(jobRef, contextId);
|
84 | }
|
85 | const contextInstance = await this.injector.loadPerContext(instance, moduleRef, moduleRef.providers, contextId);
|
86 | return contextInstance[key].call(contextInstance, ...args);
|
87 | };
|
88 | args.push(callback);
|
89 | }
|
90 | else {
|
91 | args.push(instance[key].bind(instance));
|
92 | }
|
93 | args = args.filter((item) => item !== undefined);
|
94 | queue.process.call(queue, ...args);
|
95 | }
|
96 | handleListener(instance, key, wrapper, queue, options) {
|
97 | if (!wrapper.isDependencyTreeStatic()) {
|
98 | this.logger.warn(`Warning! "${wrapper.name}" class is request-scoped and it defines an event listener ("${wrapper.name}#${key}"). Since event listeners cannot be registered on scoped providers, this handler will be ignored.`);
|
99 | return;
|
100 | }
|
101 | if (options.name || options.id) {
|
102 | queue.on(options.eventName, async (jobOrJobId, ...args) => {
|
103 | const job = typeof jobOrJobId === 'string'
|
104 | ? (await queue.getJob(jobOrJobId)) || { name: false, id: false }
|
105 | : jobOrJobId;
|
106 | if (job.name === options.name || job.id === options.id) {
|
107 | return instance[key].apply(instance, [job, ...args]);
|
108 | }
|
109 | });
|
110 | }
|
111 | else {
|
112 | queue.on(options.eventName, instance[key].bind(instance));
|
113 | }
|
114 | }
|
115 | };
|
116 | BullExplorer = tslib_1.__decorate([
|
117 | (0, common_1.Injectable)(),
|
118 | tslib_1.__metadata("design:paramtypes", [core_1.ModuleRef,
|
119 | core_1.DiscoveryService,
|
120 | bull_metadata_accessor_1.BullMetadataAccessor,
|
121 | core_1.MetadataScanner])
|
122 | ], BullExplorer);
|
123 | exports.BullExplorer = BullExplorer;
|