UNPKG

5.88 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const ChannelManager_1 = require("./ChannelManager");
4const uuid = require("node-uuid");
5const util = require("util");
6const EXCHANGE_PREFIX = "nimbus:jobs:";
7const EXCHANGE_OPTIONS = { durable: true, autoDelete: false };
8const JOB_QUEUE_PREFIX = "nimbus:jobs:queue:";
9const JOB_QUEUE_OPTIONS = { durable: true, autoDelete: false };
10const debug = util.debuglog("amqptools");
11class Task {
12 constructor(type, params) {
13 this.onReconnect = () => {
14 if (this.taskCallback) {
15 debug("Trying to re establish consuming on task queue %s", this.queueName);
16 this.consume();
17 }
18 };
19 this.onFinalize = () => {
20 if (this.taskCallback) {
21 debug("Cancel consuming on task queue %s because of the finalization process", this.queueName);
22 this.cancel();
23 }
24 };
25 this.uuid = uuid.v4();
26 this.type = type;
27 this.params = params;
28 ChannelManager_1.channelManager.on("reconnect", this.onReconnect);
29 ChannelManager_1.channelManager.on("finalize", this.onFinalize);
30 }
31 get exchangeName() {
32 return EXCHANGE_PREFIX + Task.taskManager.service;
33 }
34 get queueName() {
35 return JOB_QUEUE_PREFIX + this.type;
36 }
37 start(cb) {
38 return this.submit(cb);
39 }
40 submit(cb) {
41 if (!this.params)
42 return;
43 ChannelManager_1.channelManager.getChannel()
44 .then(() => this.assertExchange())
45 .then(() => this.assertQueue())
46 .then(() => this.bindQueue())
47 .then((channel) => {
48 let params = JSON.parse(JSON.stringify(this.params));
49 params['uuid'] = this.uuid;
50 var eventData = new Buffer(JSON.stringify(params));
51 channel.publish(this.exchangeName, this.type, eventData);
52 if (cb)
53 cb();
54 });
55 return this;
56 }
57 assertExchange() {
58 return ChannelManager_1.channelManager.getChannel().then((channel) => {
59 return new Promise((resolve, reject) => {
60 channel.assertExchange(this.exchangeName, 'direct', EXCHANGE_OPTIONS, (err) => {
61 if (err)
62 return reject(err);
63 resolve(channel);
64 });
65 });
66 });
67 }
68 assertQueue() {
69 return ChannelManager_1.channelManager.getChannel().then((channel) => {
70 return new Promise((resolve, reject) => {
71 channel.assertQueue(this.queueName, JOB_QUEUE_OPTIONS, (err) => {
72 if (err)
73 return reject(err);
74 resolve(channel);
75 });
76 });
77 });
78 }
79 bindQueue() {
80 return ChannelManager_1.channelManager.getChannel().then((channel) => {
81 channel.bindQueue(this.queueName, this.exchangeName, this.type);
82 return channel;
83 });
84 }
85 purgeQueue() {
86 return ChannelManager_1.channelManager.getChannel().then((channel) => {
87 return new Promise((resolve, reject) => {
88 channel.checkQueue(this.queueName, (err, ok) => {
89 if (err)
90 return resolve(null);
91 if (ok) {
92 return channel.purgeQueue(this.queueName, (err, reply) => {
93 if (err)
94 return reject(err);
95 resolve(reply);
96 });
97 }
98 resolve(null);
99 });
100 });
101 });
102 }
103 consume() {
104 return ChannelManager_1.channelManager.getChannel()
105 .then(() => this.assertQueue())
106 .then((channel) => {
107 channel.prefetch(this.opts.prefetchCount);
108 debug("Attaching task listener for %s, prefetch=%d", this.type, this.opts.prefetchCount);
109 channel.consume(this.queueName, (msg) => {
110 try {
111 var taskData = JSON.parse(msg.content.toString());
112 Task.taskManager.onStartProcesTask(taskData);
113 this.taskCallback(taskData, errRes => {
114 Task.taskManager.onEndProcessTask(taskData, errRes);
115 if (errRes && errRes.nack) {
116 channel.nack(msg, false, false);
117 }
118 else {
119 channel.ack(msg);
120 }
121 });
122 }
123 catch (err) {
124 Task.taskManager.onEndProcessTask(taskData, err);
125 console.error('Malformed message', msg.content.toString(), err);
126 channel.ack(msg);
127 }
128 }, { noAck: false }, (err, ok) => {
129 this.consumerTag = ok.consumerTag;
130 });
131 });
132 }
133 cancel() {
134 ChannelManager_1.channelManager.getChannel().then((channel) => {
135 channel.cancel(this.consumerTag);
136 ChannelManager_1.channelManager.removeListener("reconnect", this.onReconnect);
137 ChannelManager_1.channelManager.removeListener("finalize", this.onFinalize);
138 });
139 }
140 processTask(opts, taskCallback) {
141 if (this.taskCallback) {
142 throw new Error("Task callback already set");
143 }
144 if (typeof opts === "function") {
145 taskCallback = opts;
146 opts = {};
147 }
148 this.taskCallback = taskCallback;
149 opts = opts || {};
150 opts.prefetchCount = opts.prefetchCount || 1;
151 this.opts = opts;
152 return this.consume();
153 }
154}
155exports.Task = Task;
156//# sourceMappingURL=Task.js.map
\No newline at end of file