UNPKG

11.9 kBJavaScriptView Raw
1// Copyright © 2017, 2019 IBM Corp. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14'use strict';
15
16const async = require('async');
17const concat = require('concat-stream');
18const debug = require('debug')('cloudant:client');
19const EventRelay = require('./eventrelay.js');
20const path = require('path');
21const PassThroughDuplex = require('./passthroughduplex.js');
22const pkg = require('../package.json');
23const utils = require('./clientutils.js');
24
25const DEFAULTS = {
26 maxAttempt: 3
27};
28
29/**
30 * Create a Cloudant client for managing requests.
31 *
32 * @param {Object} cfg - Request client configuration.
33 */
34class CloudantClient {
35 constructor(cfg) {
36 var self = this;
37
38 cfg = cfg || {};
39 self._cfg = Object.assign({}, DEFAULTS, cfg);
40
41 var client;
42 self._plugins = [];
43 self._pluginIds = [];
44 self.useLegacyPlugin = false;
45
46 // Build plugin array.
47 var plugins = [];
48
49 if (self._cfg.creds && self._cfg.creds.iamApiKey) {
50 // => Found IAM API key in VCAP - Add 'iamauth' plugin.
51 plugins = [ { iamauth: { iamApiKey: self._cfg.creds.iamApiKey } } ];
52 } else if (typeof this._cfg.plugins === 'undefined') {
53 // => No plugins specified - Add 'cookieauth' plugin.
54 plugins = [ { cookieauth: { errorOnNoCreds: false } } ];
55 }
56
57 // Add user specified plugins.
58 if (typeof this._cfg.plugins !== 'undefined') {
59 [].concat(self._cfg.plugins).forEach(function(plugin) {
60 if (typeof plugin !== 'function' || plugin.pluginVersion >= 2) {
61 plugins.push(plugin);
62 } else if (self.useLegacyPlugin) {
63 throw new Error('Using multiple legacy plugins is not permitted');
64 } else {
65 self.useLegacyPlugin = true;
66 client = plugin; // use legacy plugin as client
67 }
68 });
69 }
70
71 // initialize the internal client
72 self._initClient(client);
73
74 // add plugins
75 self._addPlugins(plugins);
76 }
77
78 _addPlugins(plugins) {
79 var self = this;
80
81 if (!Array.isArray(plugins)) {
82 plugins = [ plugins ];
83 }
84
85 plugins.forEach(function(plugin) {
86 var cfg, Plugin;
87
88 switch (typeof plugin) {
89 // 1). Custom plugin
90 case 'function':
91 debug(`Found custom plugin: '${plugin.id}'`);
92 Plugin = plugin;
93 cfg = {};
94 break;
95
96 // 2). Plugin (with configuration): { 'pluginName': { 'configKey1': 'configValue1', ... } }
97 case 'object':
98 if (Array.isArray(plugin) || Object.keys(plugin).length !== 1) {
99 throw new Error(`Invalid plugin configuration: '${plugin}'`);
100 }
101
102 var pluginName = Object.keys(plugin)[0];
103 Plugin = self._importPlugin(pluginName);
104
105 cfg = plugin[pluginName];
106 if (typeof cfg !== 'object' || Array.isArray(cfg)) {
107 throw new Error(`Invalid plugin configuration: '${plugin}'`);
108 }
109 break;
110
111 // 3). Plugin (no configuration): 'pluginName'
112 case 'string':
113 if (plugin === 'base' || plugin === 'default' || plugin === 'promises') {
114 return; // noop
115 }
116
117 Plugin = self._importPlugin(plugin);
118 cfg = {};
119 break;
120
121 // 4). Noop
122 case 'undefined':
123 return;
124 default:
125 throw new Error(`Invalid plugin configuration: '${plugin}'`);
126 }
127
128 if (self._pluginIds.indexOf(Plugin.id) !== -1) {
129 debug(`Not adding duplicate plugin: '${Plugin.id}'`);
130 } else {
131 debug(`Adding plugin: '${Plugin.id}'`);
132 var creds = self._cfg.creds || {};
133 self._plugins.push(
134 // instantiate plugin
135 new Plugin(self._client, Object.assign({ serverUrl: creds.outUrl }, cfg))
136 );
137 self._pluginIds.push(Plugin.id);
138 }
139 });
140 }
141
142 _buildPluginPath(name) {
143 // Only a plugin name was provided: use plugin directory
144 if (path.basename(name) === name) {
145 return '../plugins/' + name;
146 }
147
148 // An absolute path was provided
149 if (path.isAbsolute(name)) {
150 return name;
151 }
152
153 // A relative path was provided
154 return path.join(process.cwd(), name);
155 }
156
157 _importPlugin(pluginName) {
158 switch (pluginName) {
159 // Note: All built-in plugins are individually listed here to ensure they
160 // are included in a webpack bundle.
161 case 'cookieauth':
162 return require('../plugins/cookieauth');
163 case 'iamauth':
164 return require('../plugins/iamauth');
165 case 'retry':
166 return require('../plugins/retry');
167 default:
168 // Warning: Custom plugins will not be included in a webpack bundle
169 // by default because the exact module is not known on compile
170 // time.
171 try {
172 // Use template literal to suppress 'dependency is an expression'
173 // webpack compilation warning.
174 return require(`${this._buildPluginPath(pluginName)}`);
175 } catch (e) {
176 throw new Error(`Failed to load plugin - ${e.message}`);
177 }
178 }
179 }
180
181 _initClient(client) {
182 if (typeof client !== 'undefined') {
183 debug('Using custom client.');
184 this._client = client;
185 return;
186 }
187
188 var protocol;
189 if (this._cfg && this._cfg.https === false) {
190 protocol = require('http');
191 } else {
192 protocol = require('https'); // default to https
193 }
194
195 var agent = new protocol.Agent({
196 keepAlive: true,
197 keepAliveMsecs: 30000,
198 maxSockets: 6
199 });
200 var requestDefaults = {
201 agent: agent,
202 gzip: true,
203 headers: {
204 // set library UA header
205 'User-Agent': `nodejs-cloudant/${pkg.version} (Node.js ${process.version})`
206 },
207 jar: false
208 };
209
210 if (this._cfg.requestDefaults) {
211 // allow user to override defaults
212 requestDefaults = Object.assign({}, requestDefaults, this._cfg.requestDefaults);
213 }
214
215 debug('Using request options: %j', requestDefaults);
216
217 this.requestDefaults = requestDefaults; // expose request defaults
218 this._client = require('request').defaults(requestDefaults);
219 }
220
221 _executeRequest(request, done) {
222 debug('Submitting request: %j', request.options);
223
224 request.response = this._client(
225 request.options, utils.wrapCallback(request, done));
226
227 // define new source on event relay
228 request.eventRelay.setSource(request.response);
229
230 request.response
231 .on('response', function(response) {
232 request.response.pause();
233 utils.runHooks('onResponse', request, response, function() {
234 utils.processState(request, done); // process response hook results
235 });
236 });
237
238 if (typeof request.clientCallback === 'undefined') {
239 debug('No client callback specified.');
240 request.response
241 .on('error', function(error) {
242 utils.runHooks('onError', request, error, function() {
243 utils.processState(request, done); // process error hook results
244 });
245 });
246 }
247 }
248
249 // public
250
251 /**
252 * Get a client plugin instance.
253 *
254 * @param {string} pluginId
255 */
256 getPlugin(pluginId) {
257 return this._plugins[this._pluginIds.indexOf(pluginId)];
258 }
259
260 /**
261 * Perform a request using this Cloudant client.
262 *
263 * @param {Object} options - HTTP options.
264 * @param {requestCallback} callback - The callback that handles the response.
265 */
266 request(options, callback) {
267 var self = this;
268
269 if (typeof options === 'string') {
270 options = { method: 'GET', url: options }; // default GET
271 }
272
273 var request = {};
274 request.abort = false;
275 request.clientCallback = callback;
276
277 request.clientStream = new PassThroughDuplex();
278
279 request.clientStream.on('error', function(err) {
280 debug(err);
281 });
282 request.clientStream.on('pipe', function() {
283 debug('Request body is being piped.');
284 request.pipedRequest = true;
285 });
286
287 request.eventRelay = new EventRelay(request.clientStream);
288
289 request.plugins = self._plugins;
290
291 // init state
292 request.state = {
293 attempt: 0,
294 maxAttempt: self._cfg.maxAttempt,
295 // following are editable by plugin hooks during execution
296 abortWithResponse: undefined,
297 retry: false,
298 retryDelayMsecs: 0
299 };
300
301 // add plugin stash
302 request.plugin_stash = {};
303 request.plugins.forEach(function(plugin) {
304 // allow plugin hooks to share data via the request state
305 request.plugin_stash[plugin.id] = {};
306 });
307
308 request.clientStream.abort = function() {
309 // aborts response during hook execution phase.
310 // note that once a "good" request is made, this abort function is
311 // monkey-patched with `request.abort()`.
312 request.abort = true;
313 };
314
315 async.forever(function(done) {
316 request.doneCallback = done;
317 request.done = false;
318
319 // Fixes an intermittent bug where the `done` callback is executed
320 // multiple times.
321 done = function(error) {
322 if (request.done) {
323 debug('Callback was already called.');
324 return;
325 }
326 request.done = true;
327 return request.doneCallback(error);
328 };
329
330 request.options = Object.assign({}, options); // new copy
331 request.response = undefined;
332
333 // update state
334 request.state.attempt++;
335 request.state.retry = false;
336 request.state.sending = false;
337
338 debug(`Request attempt: ${request.state.attempt}`);
339 debug(`Delaying request for ${request.state.retryDelayMsecs} Msecs.`);
340
341 setTimeout(function() {
342 utils.runHooks('onRequest', request, request.options, function(err) {
343 utils.processState(request, function(stop) {
344 if (request.state.retry) {
345 debug('The onRequest hook issued retry.');
346 return done();
347 }
348 if (stop) {
349 debug(`The onRequest hook issued abort: ${stop}`);
350 return done(stop);
351 }
352 if (request.abort) {
353 debug('Client issued abort during plugin execution.');
354 return done(new Error('Client issued abort'));
355 }
356
357 request.state.sending = true; // indicates onRequest hooks completed
358
359 if (!request.pipedRequest) {
360 self._executeRequest(request, done);
361 } else {
362 if (typeof request.pipedRequestBuffer !== 'undefined' && request.state.attempt > 1) {
363 request.options.body = request.pipedRequestBuffer;
364 self._executeRequest(request, done);
365 } else {
366 // copy stream contents to buffer for possible retry
367 var concatStream = concat({ encoding: 'buffer' }, function(buffer) {
368 request.options.body = request.pipedRequestBuffer = buffer;
369 self._executeRequest(request, done);
370 });
371 request.clientStream.passThroughWritable
372 .on('error', function(error) {
373 debug(error);
374 self._executeRequest(request, done);
375 })
376 .pipe(concatStream);
377 }
378 }
379 });
380 });
381 }, request.state.retryDelayMsecs);
382 }, function(err) { debug(err.message); });
383
384 return request.clientStream; // return stream to client
385 }
386}
387
388module.exports = CloudantClient;