UNPKG

10.6 kBJavaScriptView Raw
1// Copyright © 2017, 2018 IBM Corp. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14'use strict';
15
16const async = require('async');
17const concat = require('concat-stream');
18const debug = require('debug')('cloudant:client');
19const EventRelay = require('./eventrelay.js');
20const PassThroughDuplex = require('./passthroughduplex.js');
21const pkg = require('../package.json');
22const utils = require('./clientutils.js');
23
24const DEFAULTS = {
25 maxAttempt: 3
26};
27
28/**
29 * Create a Cloudant client for managing requests.
30 *
31 * @param {Object} cfg - Request client configuration.
32 */
33class CloudantClient {
34 constructor(cfg) {
35 var self = this;
36
37 cfg = cfg || {};
38 self._cfg = Object.assign({}, DEFAULTS, cfg);
39
40 var client;
41 self._plugins = [];
42 self._pluginIds = [];
43 self.useLegacyPlugin = false;
44
45 // Build plugin array.
46 var plugins = [];
47
48 if (self._cfg.creds && self._cfg.creds.iamApiKey) {
49 // => Found IAM API key in VCAP - Add 'iamauth' plugin.
50 plugins = [ { iamauth: { iamApiKey: self._cfg.creds.iamApiKey } } ];
51 } else if (typeof this._cfg.plugins === 'undefined') {
52 // => No plugins specified - Add 'cookieauth' plugin.
53 plugins = [ 'cookieauth' ];
54 }
55
56 // Add user specified plugins.
57 if (typeof this._cfg.plugins !== 'undefined') {
58 [].concat(self._cfg.plugins).forEach(function(plugin) {
59 if (typeof plugin !== 'function' || plugin.pluginVersion >= 2) {
60 plugins.push(plugin);
61 } else if (self.useLegacyPlugin) {
62 throw new Error('Using multiple legacy plugins is not permitted');
63 } else {
64 self.useLegacyPlugin = true;
65 client = plugin; // use legacy plugin as client
66 }
67 });
68 }
69
70 // initialize the internal client
71 self._initClient(client);
72
73 // add plugins
74 self._addPlugins(plugins);
75 }
76
77 _addPlugins(plugins) {
78 var self = this;
79
80 if (!Array.isArray(plugins)) {
81 plugins = [ plugins ];
82 }
83
84 plugins.forEach(function(plugin) {
85 var cfg, Plugin;
86
87 switch (typeof plugin) {
88 // 1). Custom plugin
89 case 'function':
90 debug(`Found custom plugin: '${plugin.id}'`);
91 Plugin = plugin;
92 cfg = {};
93 break;
94
95 // 2). Plugin (with configuration): { 'pluginName': { 'configKey1': 'configValue1', ... } }
96 case 'object':
97 if (Array.isArray(plugin) || Object.keys(plugin).length !== 1) {
98 throw new Error(`Invalid plugin configuration: '${plugin}'`);
99 }
100
101 var pluginName = Object.keys(plugin)[0];
102
103 try {
104 Plugin = require('../plugins/' + pluginName);
105 } catch (e) {
106 throw new Error(`Failed to load plugin - ${e.message}`);
107 }
108
109 cfg = plugin[pluginName];
110 if (typeof cfg !== 'object' || Array.isArray(cfg)) {
111 throw new Error(`Invalid plugin configuration: '${plugin}'`);
112 }
113 break;
114
115 // 3). Plugin (no configuration): 'pluginName'
116 case 'string':
117 if (plugin === 'base' || plugin === 'default' || plugin === 'promises') {
118 return; // noop
119 }
120
121 try {
122 Plugin = require('../plugins/' + plugin);
123 } catch (e) {
124 throw new Error(`Failed to load plugin - ${e.message}`);
125 }
126
127 cfg = {};
128 break;
129
130 // 4). Noop
131 case 'undefined':
132 return;
133 default:
134 throw new Error(`Invalid plugin configuration: '${plugin}'`);
135 }
136
137 if (self._pluginIds.indexOf(Plugin.id) !== -1) {
138 debug(`Not adding duplicate plugin: '${Plugin.id}'`);
139 } else {
140 debug(`Adding plugin: '${Plugin.id}'`);
141 self._plugins.push(
142 // instantiate plugin
143 new Plugin(self._client, Object.assign({}, cfg))
144 );
145 self._pluginIds.push(Plugin.id);
146 }
147 });
148 }
149
150 _initClient(client) {
151 if (typeof client !== 'undefined') {
152 debug('Using custom client.');
153 this._client = client;
154 return;
155 }
156
157 var protocol;
158 if (this._cfg && this._cfg.https === false) {
159 protocol = require('http');
160 } else {
161 protocol = require('https'); // default to https
162 }
163
164 var agent = new protocol.Agent({
165 keepAlive: true,
166 keepAliveMsecs: 30000,
167 maxSockets: 6
168 });
169 var requestDefaults = {
170 agent: agent,
171 gzip: true,
172 headers: {
173 // set library UA header
174 'User-Agent': `nodejs-cloudant/${pkg.version} (Node.js ${process.version})`
175 },
176 jar: false
177 };
178
179 if (this._cfg.requestDefaults) {
180 // allow user to override defaults
181 requestDefaults = Object.assign({}, requestDefaults, this._cfg.requestDefaults);
182 }
183
184 debug('Using request options: %j', requestDefaults);
185
186 this.requestDefaults = requestDefaults; // expose request defaults
187 this._client = require('request').defaults(requestDefaults);
188 }
189
190 _executeRequest(request, done) {
191 debug('Submitting request: %j', request.options);
192
193 request.response = this._client(
194 request.options, utils.wrapCallback(request, done));
195
196 // define new source on event relay
197 request.eventRelay.setSource(request.response);
198
199 request.response
200 .on('response', function(response) {
201 request.response.pause();
202 utils.runHooks('onResponse', request, response, function() {
203 utils.processState(request, done); // process response hook results
204 });
205 });
206
207 if (typeof request.clientCallback === 'undefined') {
208 debug('No client callback specified.');
209 request.response
210 .on('error', function(error) {
211 utils.runHooks('onError', request, error, function() {
212 utils.processState(request, done); // process error hook results
213 });
214 });
215 }
216 }
217
218 // public
219
220 /**
221 * Perform a request using this Cloudant client.
222 *
223 * @param {Object} options - HTTP options.
224 * @param {requestCallback} callback - The callback that handles the response.
225 */
226 request(options, callback) {
227 var self = this;
228
229 if (typeof options === 'string') {
230 options = { method: 'GET', url: options }; // default GET
231 }
232
233 var request = {};
234 request.abort = false;
235 request.clientCallback = callback;
236
237 request.clientStream = new PassThroughDuplex();
238
239 request.clientStream.on('error', function(err) {
240 debug(err);
241 });
242 request.clientStream.on('pipe', function() {
243 debug('Request body is being piped.');
244 request.pipedRequest = true;
245 });
246
247 request.eventRelay = new EventRelay(request.clientStream);
248
249 request.plugins = self._plugins;
250
251 // init state
252 request.state = {
253 attempt: 0,
254 maxAttempt: self._cfg.maxAttempt,
255 // following are editable by plugin hooks during execution
256 abortWithResponse: undefined,
257 retry: false,
258 retryDelayMsecs: 0
259 };
260
261 // add plugin stash
262 request.plugin_stash = {};
263 request.plugins.forEach(function(plugin) {
264 // allow plugin hooks to share data via the request state
265 request.plugin_stash[plugin.id] = {};
266 });
267
268 request.clientStream.abort = function() {
269 // aborts response during hook execution phase.
270 // note that once a "good" request is made, this abort function is
271 // monkey-patched with `request.abort()`.
272 request.abort = true;
273 };
274
275 async.forever(function(done) {
276 request.doneCallback = done;
277 request.done = false;
278
279 // Fixes an intermittent bug where the `done` callback is executed
280 // multiple times.
281 done = function(error) {
282 if (request.done) {
283 debug('Callback was already called.');
284 return;
285 }
286 request.done = true;
287 return request.doneCallback(error);
288 };
289
290 request.options = Object.assign({}, options); // new copy
291 request.response = undefined;
292
293 // update state
294 request.state.attempt++;
295 request.state.retry = false;
296 request.state.sending = false;
297
298 debug(`Request attempt: ${request.state.attempt}`);
299
300 utils.runHooks('onRequest', request, request.options, function(err) {
301 utils.processState(request, function(stop) {
302 if (request.state.retry) {
303 debug('The onRequest hook issued retry.');
304 return done();
305 }
306 if (stop) {
307 debug(`The onRequest hook issued abort: ${stop}`);
308 return done(stop);
309 }
310
311 debug(`Delaying request for ${request.state.retryDelayMsecs} Msecs.`);
312
313 setTimeout(function() {
314 if (request.abort) {
315 debug('Client issued abort during plugin execution.');
316 return done(new Error('Client issued abort'));
317 }
318
319 request.state.sending = true; // indicates onRequest hooks completed
320
321 if (!request.pipedRequest) {
322 self._executeRequest(request, done);
323 } else {
324 if (typeof request.pipedRequestBuffer !== 'undefined' && request.state.attempt > 1) {
325 request.options.body = request.pipedRequestBuffer;
326 self._executeRequest(request, done);
327 } else {
328 // copy stream contents to buffer for possible retry
329 var concatStream = concat({ encoding: 'buffer' }, function(buffer) {
330 request.options.body = request.pipedRequestBuffer = buffer;
331 self._executeRequest(request, done);
332 });
333 request.clientStream.passThroughWritable
334 .on('error', function(error) {
335 debug(error);
336 self._executeRequest(request, done);
337 })
338 .pipe(concatStream);
339 }
340 }
341 }, request.state.retryDelayMsecs);
342 });
343 });
344 }, function(err) { debug(err.message); });
345
346 return request.clientStream; // return stream to client
347 }
348}
349
350module.exports = CloudantClient;