UNPKG

2.71 kBJavaScriptView Raw
1var through = require('through2').obj;
2var request = require('request-promise');
3
4function client(baseUrl){
5 var queue = [],
6 ended = false,
7 ending = false,
8 retries = 0,
9 errorCount = 0,
10 pid, req, task;
11
12 function send(done) {
13 if (!queue.length && (!ending || ended)) {
14 return Promise.resolve();
15 }
16 if (req) {
17 return req;
18 }
19
20 var commands = queue;
21 queue = [];
22
23 if (ending) {
24 ended = true;
25 }
26
27 var url;
28
29 if (pid) {
30 url = '/tasks/'+pid;
31 } else {
32 url = '/tasks';
33 }
34
35 var end = ending;
36
37 req = (
38 request.post({
39 url: baseUrl+url,
40 json: true,
41 body: {
42 commands: commands,
43 end: end
44 }
45 }).then(function(body){
46 task = body;
47 pid = task.pid;
48 ended = end;
49 req = null;
50
51 var self = this;
52
53 task.errors.slice(0, errorCount)
54 .forEach(function(err){
55 self.emit('error', err);
56 });
57
58 errorCount += task.errors.length;
59
60 send();
61 return;
62 }).catch(function(err){
63 queue = commands.concat(queue);
64 req = null;
65
66 retries++;
67 if (retries === 10) {
68 console.log('Failed to run commands after 10 retries\n '+commands.join('\n '));
69 throw err;
70 }
71
72 return send();
73 })
74 );
75
76 return req;
77 }
78
79 function end() {
80 ending = true;
81 return send();
82 }
83
84 var pollRetries = 0, pollTimeout = 0, polls = 0;
85
86 function pollForCompletion(delayed) {
87 if (delayed) {
88 return (new Promise(function(resolve){
89
90 var delay = 500;
91 if (polls <= 8) {
92 delay = 10+Math.pow(2, polls); //EXPONENTIAL BACKOFF YAY
93 }
94 polls++;
95
96 setTimeout(resolve, delay);
97 }).then(pollForCompletion));
98 }
99
100 return (
101 request.get({
102 url: baseUrl+'/tasks/'+pid,
103 json: true
104 }).then(function(body){
105 task = body;
106 if (task.running) {
107 return pollForCompletion(true);
108 } else {
109 return task;
110 }
111 })
112 .catch(function(err){
113 pollRetries++;
114
115 if (pollRetries === 10) {
116 throw err;
117 }
118
119 return pollForCompletion(true);
120 })
121 );
122 }
123
124 return through(function(command, enc, cb){
125 queue.push(command);
126 send().then(cb.bind(null, null, null)).catch(cb);
127 }, function(cb){
128 var self = this;
129 end()
130 .then(pollForCompletion)
131 .then(function(){
132 self.push(task);
133 cb();
134 })
135 .catch(cb);
136 });
137}
138
139module.exports = client;