UNPKG

4.01 kBPlain TextView Raw
1import { channelManager } from './ChannelManager'
2import { TaskManager } from "./TaskManager"
3import { Channel } from "amqplib/callback_api"
4
5import uuid = require("node-uuid")
6import util = require("util")
7
8const EXCHANGE_PREFIX = "nimbus:jobs:";
9const EXCHANGE_OPTIONS = {durable: true, autoDelete: false};
10
11const JOB_QUEUE_PREFIX = "nimbus:jobs:queue:";
12const JOB_QUEUE_OPTIONS = {durable: true, autoDelete: false};
13
14const debug = util.debuglog("amqptools");
15
16export interface TaskParams {
17 title: string,
18 data: any
19}
20
21export class Task {
22 uuid:string;
23 type:string;
24 params:TaskParams;
25 taskCallback: any;
26 opts: any;
27 static taskManager:TaskManager;
28
29 constructor(type:string, params?:TaskParams) {
30 this.uuid = uuid.v4();
31 this.type = type;
32 this.params = params;
33
34 channelManager.on("reconnect", this.onReconnect);
35 }
36
37 onReconnect = () => {
38 if(this.taskCallback) {
39 debug("Trying to re establish consuming on task queue %s", this.queueName);
40 this.consume();
41 }
42 }
43
44 get exchangeName() {
45 return EXCHANGE_PREFIX + Task.taskManager.service;
46 }
47
48 get queueName() {
49 return JOB_QUEUE_PREFIX + this.type;
50 }
51
52 start(cb?) {
53 if (!this.params) return;
54
55 channelManager.getChannel()
56 .then(() => this.assertExchange())
57 .then(() => this.assertQueue())
58 .then(() => this.bindQueue())
59 .then((channel) => {
60 let params = JSON.parse(JSON.stringify(this.params));
61 params['uuid'] = this.uuid;
62 var eventData = new Buffer(JSON.stringify(params));
63
64 channel.publish(this.exchangeName, this.type, eventData);
65 if (cb) cb();
66 });
67
68 return this;
69 }
70
71 private assertExchange() {
72 return channelManager.getChannel().then((channel) => {
73 return new Promise((resolve, reject) => {
74 channel.assertExchange(this.exchangeName, 'direct', EXCHANGE_OPTIONS, (err) => {
75 if (err) return reject(err);
76 resolve(channel);
77 })
78 });
79 })
80 }
81
82 private assertQueue() {
83 return channelManager.getChannel().then((channel) => {
84 return new Promise<Channel>((resolve, reject) => {
85 channel.assertQueue(this.queueName, JOB_QUEUE_OPTIONS, (err) => {
86 if (err) return reject(err);
87 resolve(channel);
88 })
89 })
90 })
91 }
92
93 private bindQueue() {
94 return channelManager.getChannel().then((channel) => {
95 channel.bindQueue(this.queueName, this.exchangeName, this.type);
96 return channel;
97 });
98 }
99
100 purgeQueue() {
101 return channelManager.getChannel().then((channel) => {
102 return new Promise((resolve, reject) => {
103 channel.checkQueue(this.queueName, (err, ok) => {
104 if (err) return resolve(null);
105 if (ok) {
106 return channel.purgeQueue(this.queueName, (err, reply) => {
107 if (err) return reject(err);
108 resolve(reply)
109 })
110 }
111 resolve(null);
112 })
113 });
114 });
115 }
116
117 consume() {
118 return channelManager.getChannel()
119 .then(() => this.assertQueue())
120 .then((channel) => {
121 channel.prefetch(this.opts.prefetchCount);
122 debug("Attaching task listener for %s, prefetch=%d", this.type, this.opts.prefetchCount);
123 channel.consume(this.queueName, (msg) => {
124 var taskData = JSON.parse(msg.content.toString());
125 this.taskCallback(taskData, err => {
126 if (err && err.nack) {
127 // dead letter the message
128 channel.nack(msg, false, false)
129 } else {
130 channel.ack(msg)
131 }
132 })
133 }, {noAck: false});
134 });
135 }
136
137 processTask(opts, taskCallback) {
138 if(this.taskCallback) {
139 throw new Error("Task callback already set");
140 }
141 if(typeof opts === "function") {
142 taskCallback = opts;
143 opts = {};
144 }
145 this.taskCallback = taskCallback;
146 opts = opts || {};
147 opts.prefetchCount = opts.prefetchCount || 1;
148 this.opts = opts;
149 return this.consume();
150 }
151}
152
\No newline at end of file