UNPKG

4.16 kBPlain TextView Raw
1import { channelManager } from './ChannelManager'
2import { TaskManager } from "./TaskManager"
3import { Channel } from "amqplib/callback_api"
4
5import uuid = require("node-uuid")
6import util = require("util")
7
8const EXCHANGE_PREFIX = "nimbus:jobs:";
9const EXCHANGE_OPTIONS = {durable: true, autoDelete: false};
10
11const JOB_QUEUE_PREFIX = "nimbus:jobs:queue:";
12const JOB_QUEUE_OPTIONS = {durable: true, autoDelete: false};
13
14const debug = util.debuglog("amqptools");
15
16export interface TaskParams {
17 title: string,
18 data: any
19}
20
21export class Task {
22 uuid:string;
23 type:string;
24 params:TaskParams;
25 taskCallback: any;
26 opts: any;
27 static taskManager:TaskManager;
28
29 constructor(type:string, params?:TaskParams) {
30 this.uuid = uuid.v4();
31 this.type = type;
32 this.params = params;
33
34 channelManager.on("reconnect", this.onReconnect);
35 }
36
37 onReconnect = () => {
38 if(this.taskCallback) {
39 debug("Trying to re establish consuming on task queue %s", this.queueName);
40 this.consume();
41 }
42 }
43
44 get exchangeName() {
45 return EXCHANGE_PREFIX + Task.taskManager.service;
46 }
47
48 get queueName() {
49 return JOB_QUEUE_PREFIX + this.type;
50 }
51
52 start(cb?) {
53 if (!this.params) return;
54
55 channelManager.getChannel()
56 .then(() => this.assertExchange())
57 .then(() => this.assertQueue())
58 .then(() => this.bindQueue())
59 .then((channel) => {
60 let params = JSON.parse(JSON.stringify(this.params));
61 params['uuid'] = this.uuid;
62 var eventData = new Buffer(JSON.stringify(params));
63
64 channel.publish(this.exchangeName, this.type, eventData);
65 if (cb) cb();
66 });
67
68 return this;
69 }
70
71 private assertExchange() {
72 return channelManager.getChannel().then((channel) => {
73 return new Promise((resolve, reject) => {
74 channel.assertExchange(this.exchangeName, 'direct', EXCHANGE_OPTIONS, (err) => {
75 if (err) return reject(err);
76 resolve(channel);
77 })
78 });
79 })
80 }
81
82 private assertQueue() {
83 return channelManager.getChannel().then((channel) => {
84 return new Promise<Channel>((resolve, reject) => {
85 channel.assertQueue(this.queueName, JOB_QUEUE_OPTIONS, (err) => {
86 if (err) return reject(err);
87 resolve(channel);
88 })
89 })
90 })
91 }
92
93 private bindQueue() {
94 return channelManager.getChannel().then((channel) => {
95 channel.bindQueue(this.queueName, this.exchangeName, this.type);
96 return channel;
97 });
98 }
99
100 purgeQueue() {
101 return channelManager.getChannel().then((channel) => {
102 return new Promise((resolve, reject) => {
103 channel.checkQueue(this.queueName, (err, ok) => {
104 if (err) return resolve(null);
105 if (ok) {
106 return channel.purgeQueue(this.queueName, (err, reply) => {
107 if (err) return reject(err);
108 resolve(reply)
109 })
110 }
111 resolve(null);
112 })
113 });
114 });
115 }
116
117 consume() {
118 return channelManager.getChannel()
119 .then(() => this.assertQueue())
120 .then((channel) => {
121 channel.prefetch(this.opts.prefetchCount);
122 debug("Attaching task listener for %s, prefetch=%d", this.type, this.opts.prefetchCount);
123 channel.consume(this.queueName, (msg) => {
124 try {
125 var taskData = JSON.parse(msg.content.toString());
126 this.taskCallback(taskData, err => {
127 if (err && err.nack) {
128 // dead letter the message
129 channel.nack(msg, false, false)
130 } else {
131 channel.ack(msg)
132 }
133 })
134 } catch (err) {
135 console.error('Malformed message', msg.content.toString(), err)
136 }
137 }, {noAck: false});
138 });
139 }
140
141 processTask(opts, taskCallback) {
142 if(this.taskCallback) {
143 throw new Error("Task callback already set");
144 }
145 if(typeof opts === "function") {
146 taskCallback = opts;
147 opts = {};
148 }
149 this.taskCallback = taskCallback;
150 opts = opts || {};
151 opts.prefetchCount = opts.prefetchCount || 1;
152 this.opts = opts;
153 return this.consume();
154 }
155}
156
\No newline at end of file