1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 | 'use strict';
|
15 |
|
16 | const async = require('async');
|
17 | const concat = require('concat-stream');
|
18 | const debug = require('debug')('cloudant:client');
|
19 | const EventRelay = require('./eventrelay.js');
|
20 | const path = require('path');
|
21 | const PassThroughDuplex = require('./passthroughduplex.js');
|
22 | const pkg = require('../package.json');
|
23 | const utils = require('./clientutils.js');
|
24 |
|
25 | const DEFAULTS = {
|
26 | maxAttempt: 3
|
27 | };
|
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 | class CloudantClient {
|
35 | constructor(cfg) {
|
36 | var self = this;
|
37 |
|
38 | cfg = cfg || {};
|
39 | self._cfg = Object.assign({}, DEFAULTS, cfg);
|
40 |
|
41 | var client;
|
42 | self._plugins = [];
|
43 | self._pluginIds = [];
|
44 | self.useLegacyPlugin = false;
|
45 |
|
46 |
|
47 | var plugins = [];
|
48 |
|
49 | if (self._cfg.creds && self._cfg.creds.iamApiKey) {
|
50 |
|
51 | plugins = [ { iamauth: { iamApiKey: self._cfg.creds.iamApiKey } } ];
|
52 | } else if (typeof this._cfg.plugins === 'undefined') {
|
53 |
|
54 | plugins = [ { cookieauth: { errorOnNoCreds: false } } ];
|
55 | }
|
56 |
|
57 |
|
58 | if (typeof this._cfg.plugins !== 'undefined') {
|
59 | [].concat(self._cfg.plugins).forEach(function(plugin) {
|
60 | if (typeof plugin !== 'function' || plugin.pluginVersion >= 2) {
|
61 | plugins.push(plugin);
|
62 | } else if (self.useLegacyPlugin) {
|
63 | throw new Error('Using multiple legacy plugins is not permitted');
|
64 | } else {
|
65 | self.useLegacyPlugin = true;
|
66 | client = plugin;
|
67 | }
|
68 | });
|
69 | }
|
70 |
|
71 |
|
72 | self._initClient(client);
|
73 |
|
74 |
|
75 | self._addPlugins(plugins);
|
76 | }
|
77 |
|
78 | _addPlugins(plugins) {
|
79 | var self = this;
|
80 |
|
81 | if (!Array.isArray(plugins)) {
|
82 | plugins = [ plugins ];
|
83 | }
|
84 |
|
85 | plugins.forEach(function(plugin) {
|
86 | var cfg, Plugin;
|
87 |
|
88 | switch (typeof plugin) {
|
89 |
|
90 | case 'function':
|
91 | debug(`Found custom plugin: '${plugin.id}'`);
|
92 | Plugin = plugin;
|
93 | cfg = {};
|
94 | break;
|
95 |
|
96 |
|
97 | case 'object':
|
98 | if (Array.isArray(plugin) || Object.keys(plugin).length !== 1) {
|
99 | throw new Error(`Invalid plugin configuration: '${plugin}'`);
|
100 | }
|
101 |
|
102 | var pluginName = Object.keys(plugin)[0];
|
103 | Plugin = self._importPlugin(pluginName);
|
104 |
|
105 | cfg = plugin[pluginName];
|
106 | if (typeof cfg !== 'object' || Array.isArray(cfg)) {
|
107 | throw new Error(`Invalid plugin configuration: '${plugin}'`);
|
108 | }
|
109 | break;
|
110 |
|
111 |
|
112 | case 'string':
|
113 | if (plugin === 'base' || plugin === 'default' || plugin === 'promises') {
|
114 | return;
|
115 | }
|
116 |
|
117 | Plugin = self._importPlugin(plugin);
|
118 | cfg = {};
|
119 | break;
|
120 |
|
121 |
|
122 | case 'undefined':
|
123 | return;
|
124 | default:
|
125 | throw new Error(`Invalid plugin configuration: '${plugin}'`);
|
126 | }
|
127 |
|
128 | if (self._pluginIds.indexOf(Plugin.id) !== -1) {
|
129 | debug(`Not adding duplicate plugin: '${Plugin.id}'`);
|
130 | } else {
|
131 | debug(`Adding plugin: '${Plugin.id}'`);
|
132 | var creds = self._cfg.creds || {};
|
133 | self._plugins.push(
|
134 |
|
135 | new Plugin(self._client, Object.assign({ serverUrl: creds.outUrl }, cfg))
|
136 | );
|
137 | self._pluginIds.push(Plugin.id);
|
138 | }
|
139 | });
|
140 | }
|
141 |
|
142 | _buildPluginPath(name) {
|
143 |
|
144 | if (path.basename(name) === name) {
|
145 | return '../plugins/' + name;
|
146 | }
|
147 |
|
148 |
|
149 | if (path.isAbsolute(name)) {
|
150 | return name;
|
151 | }
|
152 |
|
153 |
|
154 | return path.join(process.cwd(), name);
|
155 | }
|
156 |
|
157 | _importPlugin(pluginName) {
|
158 | switch (pluginName) {
|
159 |
|
160 |
|
161 | case 'cookieauth':
|
162 | return require('../plugins/cookieauth');
|
163 | case 'iamauth':
|
164 | return require('../plugins/iamauth');
|
165 | case 'retry':
|
166 | return require('../plugins/retry');
|
167 | default:
|
168 |
|
169 |
|
170 |
|
171 | try {
|
172 |
|
173 |
|
174 | return require(`${this._buildPluginPath(pluginName)}`);
|
175 | } catch (e) {
|
176 | throw new Error(`Failed to load plugin - ${e.message}`);
|
177 | }
|
178 | }
|
179 | }
|
180 |
|
181 | _initClient(client) {
|
182 | if (typeof client !== 'undefined') {
|
183 | debug('Using custom client.');
|
184 | this._client = client;
|
185 | return;
|
186 | }
|
187 |
|
188 | var protocol;
|
189 | if (this._cfg && this._cfg.https === false) {
|
190 | protocol = require('http');
|
191 | } else {
|
192 | protocol = require('https');
|
193 | }
|
194 |
|
195 | var agent = new protocol.Agent({
|
196 | keepAlive: true,
|
197 | keepAliveMsecs: 30000,
|
198 | maxSockets: 6
|
199 | });
|
200 | var requestDefaults = {
|
201 | agent: agent,
|
202 | gzip: true,
|
203 | headers: {
|
204 |
|
205 | 'User-Agent': `nodejs-cloudant/${pkg.version} (Node.js ${process.version})`
|
206 | },
|
207 | jar: false
|
208 | };
|
209 |
|
210 | if (this._cfg.requestDefaults) {
|
211 |
|
212 | requestDefaults = Object.assign({}, requestDefaults, this._cfg.requestDefaults);
|
213 | }
|
214 |
|
215 | debug('Using request options: %j', requestDefaults);
|
216 |
|
217 | this.requestDefaults = requestDefaults;
|
218 | this._client = require('request').defaults(requestDefaults);
|
219 | }
|
220 |
|
221 | _executeRequest(request, done) {
|
222 | debug('Submitting request: %j', request.options);
|
223 |
|
224 | request.response = this._client(
|
225 | request.options, utils.wrapCallback(request, done));
|
226 |
|
227 |
|
228 | request.eventRelay.setSource(request.response);
|
229 |
|
230 | request.response
|
231 | .on('response', function(response) {
|
232 | request.response.pause();
|
233 | utils.runHooks('onResponse', request, response, function() {
|
234 | utils.processState(request, done);
|
235 | });
|
236 | });
|
237 |
|
238 | if (typeof request.clientCallback === 'undefined') {
|
239 | debug('No client callback specified.');
|
240 | request.response
|
241 | .on('error', function(error) {
|
242 | utils.runHooks('onError', request, error, function() {
|
243 | utils.processState(request, done);
|
244 | });
|
245 | });
|
246 | }
|
247 | }
|
248 |
|
249 |
|
250 |
|
251 | |
252 |
|
253 |
|
254 |
|
255 |
|
256 | getPlugin(pluginId) {
|
257 | return this._plugins[this._pluginIds.indexOf(pluginId)];
|
258 | }
|
259 |
|
260 | |
261 |
|
262 |
|
263 |
|
264 |
|
265 |
|
266 | request(options, callback) {
|
267 | var self = this;
|
268 |
|
269 | if (typeof options === 'string') {
|
270 | options = { method: 'GET', url: options };
|
271 | }
|
272 |
|
273 | var request = {};
|
274 | request.abort = false;
|
275 | request.clientCallback = callback;
|
276 |
|
277 | request.clientStream = new PassThroughDuplex();
|
278 |
|
279 | request.clientStream.on('error', function(err) {
|
280 | debug(err);
|
281 | });
|
282 | request.clientStream.on('pipe', function() {
|
283 | debug('Request body is being piped.');
|
284 | request.pipedRequest = true;
|
285 | });
|
286 |
|
287 | request.eventRelay = new EventRelay(request.clientStream);
|
288 |
|
289 | request.plugins = self._plugins;
|
290 |
|
291 |
|
292 | request.state = {
|
293 | attempt: 0,
|
294 | maxAttempt: self._cfg.maxAttempt,
|
295 |
|
296 | abortWithResponse: undefined,
|
297 | retry: false,
|
298 | retryDelayMsecs: 0
|
299 | };
|
300 |
|
301 |
|
302 | request.plugin_stash = {};
|
303 | request.plugins.forEach(function(plugin) {
|
304 |
|
305 | request.plugin_stash[plugin.id] = {};
|
306 | });
|
307 |
|
308 | request.clientStream.abort = function() {
|
309 |
|
310 |
|
311 |
|
312 | request.abort = true;
|
313 | };
|
314 |
|
315 | async.forever(function(done) {
|
316 | request.doneCallback = done;
|
317 | request.done = false;
|
318 |
|
319 |
|
320 |
|
321 | done = function(error) {
|
322 | if (request.done) {
|
323 | debug('Callback was already called.');
|
324 | return;
|
325 | }
|
326 | request.done = true;
|
327 | return request.doneCallback(error);
|
328 | };
|
329 |
|
330 | request.options = Object.assign({}, options);
|
331 | request.response = undefined;
|
332 |
|
333 |
|
334 | request.state.attempt++;
|
335 | request.state.retry = false;
|
336 | request.state.sending = false;
|
337 |
|
338 | debug(`Request attempt: ${request.state.attempt}`);
|
339 | debug(`Delaying request for ${request.state.retryDelayMsecs} Msecs.`);
|
340 |
|
341 | setTimeout(function() {
|
342 | utils.runHooks('onRequest', request, request.options, function(err) {
|
343 | utils.processState(request, function(stop) {
|
344 | if (request.state.retry) {
|
345 | debug('The onRequest hook issued retry.');
|
346 | return done();
|
347 | }
|
348 | if (stop) {
|
349 | debug(`The onRequest hook issued abort: ${stop}`);
|
350 | return done(stop);
|
351 | }
|
352 | if (request.abort) {
|
353 | debug('Client issued abort during plugin execution.');
|
354 | return done(new Error('Client issued abort'));
|
355 | }
|
356 |
|
357 | request.state.sending = true;
|
358 |
|
359 | if (!request.pipedRequest) {
|
360 | self._executeRequest(request, done);
|
361 | } else {
|
362 | if (typeof request.pipedRequestBuffer !== 'undefined' && request.state.attempt > 1) {
|
363 | request.options.body = request.pipedRequestBuffer;
|
364 | self._executeRequest(request, done);
|
365 | } else {
|
366 |
|
367 | var concatStream = concat({ encoding: 'buffer' }, function(buffer) {
|
368 | request.options.body = request.pipedRequestBuffer = buffer;
|
369 | self._executeRequest(request, done);
|
370 | });
|
371 | request.clientStream.passThroughWritable
|
372 | .on('error', function(error) {
|
373 | debug(error);
|
374 | self._executeRequest(request, done);
|
375 | })
|
376 | .pipe(concatStream);
|
377 | }
|
378 | }
|
379 | });
|
380 | });
|
381 | }, request.state.retryDelayMsecs);
|
382 | }, function(err) { debug(err.message); });
|
383 |
|
384 | return request.clientStream;
|
385 | }
|
386 | }
|
387 |
|
388 | module.exports = CloudantClient;
|