UNPKG

4.63 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const ChannelManager_1 = require("./ChannelManager");
4const uuid = require("node-uuid");
5const util = require("util");
6const EXCHANGE_PREFIX = "nimbus:jobs:";
7const EXCHANGE_OPTIONS = { durable: true, autoDelete: false };
8const JOB_QUEUE_PREFIX = "nimbus:jobs:queue:";
9const JOB_QUEUE_OPTIONS = { durable: true, autoDelete: false };
10const debug = util.debuglog("amqptools");
11class Task {
12 constructor(type, params) {
13 this.onReconnect = () => {
14 if (this.taskCallback) {
15 debug("Trying to re establish consuming on task queue %s", this.queueName);
16 this.consume();
17 }
18 };
19 this.uuid = uuid.v4();
20 this.type = type;
21 this.params = params;
22 ChannelManager_1.channelManager.on("reconnect", this.onReconnect);
23 }
24 get exchangeName() {
25 return EXCHANGE_PREFIX + Task.taskManager.service;
26 }
27 get queueName() {
28 return JOB_QUEUE_PREFIX + this.type;
29 }
30 start(cb) {
31 if (!this.params)
32 return;
33 ChannelManager_1.channelManager.getChannel()
34 .then(() => this.assertExchange())
35 .then(() => this.assertQueue())
36 .then(() => this.bindQueue())
37 .then((channel) => {
38 let params = JSON.parse(JSON.stringify(this.params));
39 params['uuid'] = this.uuid;
40 var eventData = new Buffer(JSON.stringify(params));
41 channel.publish(this.exchangeName, this.type, eventData);
42 if (cb)
43 cb();
44 });
45 return this;
46 }
47 assertExchange() {
48 return ChannelManager_1.channelManager.getChannel().then((channel) => {
49 return new Promise((resolve, reject) => {
50 channel.assertExchange(this.exchangeName, 'direct', EXCHANGE_OPTIONS, (err) => {
51 if (err)
52 return reject(err);
53 resolve(channel);
54 });
55 });
56 });
57 }
58 assertQueue() {
59 return ChannelManager_1.channelManager.getChannel().then((channel) => {
60 return new Promise((resolve, reject) => {
61 channel.assertQueue(this.queueName, JOB_QUEUE_OPTIONS, (err) => {
62 if (err)
63 return reject(err);
64 resolve(channel);
65 });
66 });
67 });
68 }
69 bindQueue() {
70 return ChannelManager_1.channelManager.getChannel().then((channel) => {
71 channel.bindQueue(this.queueName, this.exchangeName, this.type);
72 return channel;
73 });
74 }
75 purgeQueue() {
76 return ChannelManager_1.channelManager.getChannel().then((channel) => {
77 return new Promise((resolve, reject) => {
78 channel.checkQueue(this.queueName, (err, ok) => {
79 if (err)
80 return resolve(null);
81 if (ok) {
82 return channel.purgeQueue(this.queueName, (err, reply) => {
83 if (err)
84 return reject(err);
85 resolve(reply);
86 });
87 }
88 resolve(null);
89 });
90 });
91 });
92 }
93 consume() {
94 return ChannelManager_1.channelManager.getChannel()
95 .then(() => this.assertQueue())
96 .then((channel) => {
97 channel.prefetch(this.opts.prefetchCount);
98 debug("Attaching task listener for %s, prefetch=%d", this.type, this.opts.prefetchCount);
99 channel.consume(this.queueName, (msg) => {
100 var taskData = JSON.parse(msg.content.toString());
101 this.taskCallback(taskData, err => {
102 if (err && err.nack) {
103 channel.nack(msg, false, false);
104 }
105 else {
106 channel.ack(msg);
107 }
108 });
109 }, { noAck: false });
110 });
111 }
112 processTask(opts, taskCallback) {
113 if (this.taskCallback) {
114 throw new Error("Task callback already set");
115 }
116 if (typeof opts === "function") {
117 taskCallback = opts;
118 opts = {};
119 }
120 this.taskCallback = taskCallback;
121 opts = opts || {};
122 opts.prefetchCount = opts.prefetchCount || 1;
123 this.opts = opts;
124 return this.consume();
125 }
126}
127exports.Task = Task;
128//# sourceMappingURL=Task.js.map
\No newline at end of file