1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 | 'use strict';
|
15 |
|
16 | const async = require('async');
|
17 | const concat = require('concat-stream');
|
18 | const debug = require('debug')('cloudant:client');
|
19 | const EventRelay = require('./eventrelay.js');
|
20 | const PassThroughDuplex = require('./passthroughduplex.js');
|
21 | const pkg = require('../package.json');
|
22 | const utils = require('./clientutils.js');
|
23 |
|
24 | const DEFAULTS = {
|
25 | maxAttempt: 3
|
26 | };
|
27 |
|
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 | class CloudantClient {
|
34 | constructor(cfg) {
|
35 | var self = this;
|
36 |
|
37 | cfg = cfg || {};
|
38 | self._cfg = Object.assign({}, DEFAULTS, cfg);
|
39 |
|
40 | var client;
|
41 | self._plugins = [];
|
42 | self._pluginIds = [];
|
43 | self.useLegacyPlugin = false;
|
44 |
|
45 |
|
46 | var plugins = [];
|
47 |
|
48 | if (self._cfg.creds && self._cfg.creds.iamApiKey) {
|
49 |
|
50 | plugins = [ { iamauth: { iamApiKey: self._cfg.creds.iamApiKey } } ];
|
51 | } else if (typeof this._cfg.plugins === 'undefined') {
|
52 |
|
53 | plugins = [ 'cookieauth' ];
|
54 | }
|
55 |
|
56 |
|
57 | if (typeof this._cfg.plugins !== 'undefined') {
|
58 | [].concat(self._cfg.plugins).forEach(function(plugin) {
|
59 | if (typeof plugin !== 'function' || plugin.pluginVersion >= 2) {
|
60 | plugins.push(plugin);
|
61 | } else if (self.useLegacyPlugin) {
|
62 | throw new Error('Using multiple legacy plugins is not permitted');
|
63 | } else {
|
64 | self.useLegacyPlugin = true;
|
65 | client = plugin;
|
66 | }
|
67 | });
|
68 | }
|
69 |
|
70 |
|
71 | self._initClient(client);
|
72 |
|
73 |
|
74 | self._addPlugins(plugins);
|
75 | }
|
76 |
|
77 | _addPlugins(plugins) {
|
78 | var self = this;
|
79 |
|
80 | if (!Array.isArray(plugins)) {
|
81 | plugins = [ plugins ];
|
82 | }
|
83 |
|
84 | plugins.forEach(function(plugin) {
|
85 | var cfg, Plugin;
|
86 |
|
87 | switch (typeof plugin) {
|
88 |
|
89 | case 'function':
|
90 | debug(`Found custom plugin: '${plugin.id}'`);
|
91 | Plugin = plugin;
|
92 | cfg = {};
|
93 | break;
|
94 |
|
95 |
|
96 | case 'object':
|
97 | if (Array.isArray(plugin) || Object.keys(plugin).length !== 1) {
|
98 | throw new Error(`Invalid plugin configuration: '${plugin}'`);
|
99 | }
|
100 |
|
101 | var pluginName = Object.keys(plugin)[0];
|
102 |
|
103 | try {
|
104 | Plugin = require('../plugins/' + pluginName);
|
105 | } catch (e) {
|
106 | throw new Error(`Failed to load plugin - ${e.message}`);
|
107 | }
|
108 |
|
109 | cfg = plugin[pluginName];
|
110 | if (typeof cfg !== 'object' || Array.isArray(cfg)) {
|
111 | throw new Error(`Invalid plugin configuration: '${plugin}'`);
|
112 | }
|
113 | break;
|
114 |
|
115 |
|
116 | case 'string':
|
117 | if (plugin === 'base' || plugin === 'default' || plugin === 'promises') {
|
118 | return;
|
119 | }
|
120 |
|
121 | try {
|
122 | Plugin = require('../plugins/' + plugin);
|
123 | } catch (e) {
|
124 | throw new Error(`Failed to load plugin - ${e.message}`);
|
125 | }
|
126 |
|
127 | cfg = {};
|
128 | break;
|
129 |
|
130 |
|
131 | case 'undefined':
|
132 | return;
|
133 | default:
|
134 | throw new Error(`Invalid plugin configuration: '${plugin}'`);
|
135 | }
|
136 |
|
137 | if (self._pluginIds.indexOf(Plugin.id) !== -1) {
|
138 | debug(`Not adding duplicate plugin: '${Plugin.id}'`);
|
139 | } else {
|
140 | debug(`Adding plugin: '${Plugin.id}'`);
|
141 | self._plugins.push(
|
142 |
|
143 | new Plugin(self._client, Object.assign({}, cfg))
|
144 | );
|
145 | self._pluginIds.push(Plugin.id);
|
146 | }
|
147 | });
|
148 | }
|
149 |
|
150 | _initClient(client) {
|
151 | if (typeof client !== 'undefined') {
|
152 | debug('Using custom client.');
|
153 | this._client = client;
|
154 | return;
|
155 | }
|
156 |
|
157 | var protocol;
|
158 | if (this._cfg && this._cfg.https === false) {
|
159 | protocol = require('http');
|
160 | } else {
|
161 | protocol = require('https');
|
162 | }
|
163 |
|
164 | var agent = new protocol.Agent({
|
165 | keepAlive: true,
|
166 | keepAliveMsecs: 30000,
|
167 | maxSockets: 6
|
168 | });
|
169 | var requestDefaults = {
|
170 | agent: agent,
|
171 | gzip: true,
|
172 | headers: {
|
173 |
|
174 | 'User-Agent': `nodejs-cloudant/${pkg.version} (Node.js ${process.version})`
|
175 | },
|
176 | jar: false
|
177 | };
|
178 |
|
179 | if (this._cfg.requestDefaults) {
|
180 |
|
181 | requestDefaults = Object.assign({}, requestDefaults, this._cfg.requestDefaults);
|
182 | }
|
183 |
|
184 | debug('Using request options: %j', requestDefaults);
|
185 |
|
186 | this.requestDefaults = requestDefaults;
|
187 | this._client = require('request').defaults(requestDefaults);
|
188 | }
|
189 |
|
190 | _executeRequest(request, done) {
|
191 | debug('Submitting request: %j', request.options);
|
192 |
|
193 | request.response = this._client(
|
194 | request.options, utils.wrapCallback(request, done));
|
195 |
|
196 |
|
197 | request.eventRelay.setSource(request.response);
|
198 |
|
199 | request.response
|
200 | .on('response', function(response) {
|
201 | request.response.pause();
|
202 | utils.runHooks('onResponse', request, response, function() {
|
203 | utils.processState(request, done);
|
204 | });
|
205 | });
|
206 |
|
207 | if (typeof request.clientCallback === 'undefined') {
|
208 | debug('No client callback specified.');
|
209 | request.response
|
210 | .on('error', function(error) {
|
211 | utils.runHooks('onError', request, error, function() {
|
212 | utils.processState(request, done);
|
213 | });
|
214 | });
|
215 | }
|
216 | }
|
217 |
|
218 |
|
219 |
|
220 | |
221 |
|
222 |
|
223 |
|
224 |
|
225 |
|
226 | request(options, callback) {
|
227 | var self = this;
|
228 |
|
229 | if (typeof options === 'string') {
|
230 | options = { method: 'GET', url: options };
|
231 | }
|
232 |
|
233 | var request = {};
|
234 | request.abort = false;
|
235 | request.clientCallback = callback;
|
236 |
|
237 | request.clientStream = new PassThroughDuplex();
|
238 |
|
239 | request.clientStream.on('error', function(err) {
|
240 | debug(err);
|
241 | });
|
242 | request.clientStream.on('pipe', function() {
|
243 | debug('Request body is being piped.');
|
244 | request.pipedRequest = true;
|
245 | });
|
246 |
|
247 | request.eventRelay = new EventRelay(request.clientStream);
|
248 |
|
249 | request.plugins = self._plugins;
|
250 |
|
251 |
|
252 | request.state = {
|
253 | attempt: 0,
|
254 | maxAttempt: self._cfg.maxAttempt,
|
255 |
|
256 | abortWithResponse: undefined,
|
257 | retry: false,
|
258 | retryDelayMsecs: 0
|
259 | };
|
260 |
|
261 |
|
262 | request.plugin_stash = {};
|
263 | request.plugins.forEach(function(plugin) {
|
264 |
|
265 | request.plugin_stash[plugin.id] = {};
|
266 | });
|
267 |
|
268 | request.clientStream.abort = function() {
|
269 |
|
270 |
|
271 |
|
272 | request.abort = true;
|
273 | };
|
274 |
|
275 | async.forever(function(done) {
|
276 | request.doneCallback = done;
|
277 | request.done = false;
|
278 |
|
279 |
|
280 |
|
281 | done = function(error) {
|
282 | if (request.done) {
|
283 | debug('Callback was already called.');
|
284 | return;
|
285 | }
|
286 | request.done = true;
|
287 | return request.doneCallback(error);
|
288 | };
|
289 |
|
290 | request.options = Object.assign({}, options);
|
291 | request.response = undefined;
|
292 |
|
293 |
|
294 | request.state.attempt++;
|
295 | request.state.retry = false;
|
296 | request.state.sending = false;
|
297 |
|
298 | debug(`Request attempt: ${request.state.attempt}`);
|
299 |
|
300 | utils.runHooks('onRequest', request, request.options, function(err) {
|
301 | utils.processState(request, function(stop) {
|
302 | if (request.state.retry) {
|
303 | debug('The onRequest hook issued retry.');
|
304 | return done();
|
305 | }
|
306 | if (stop) {
|
307 | debug(`The onRequest hook issued abort: ${stop}`);
|
308 | return done(stop);
|
309 | }
|
310 |
|
311 | debug(`Delaying request for ${request.state.retryDelayMsecs} Msecs.`);
|
312 |
|
313 | setTimeout(function() {
|
314 | if (request.abort) {
|
315 | debug('Client issued abort during plugin execution.');
|
316 | return done(new Error('Client issued abort'));
|
317 | }
|
318 |
|
319 | request.state.sending = true;
|
320 |
|
321 | if (!request.pipedRequest) {
|
322 | self._executeRequest(request, done);
|
323 | } else {
|
324 | if (typeof request.pipedRequestBuffer !== 'undefined' && request.state.attempt > 1) {
|
325 | request.options.body = request.pipedRequestBuffer;
|
326 | self._executeRequest(request, done);
|
327 | } else {
|
328 |
|
329 | var concatStream = concat({ encoding: 'buffer' }, function(buffer) {
|
330 | request.options.body = request.pipedRequestBuffer = buffer;
|
331 | self._executeRequest(request, done);
|
332 | });
|
333 | request.clientStream.passThroughWritable
|
334 | .on('error', function(error) {
|
335 | debug(error);
|
336 | self._executeRequest(request, done);
|
337 | })
|
338 | .pipe(concatStream);
|
339 | }
|
340 | }
|
341 | }, request.state.retryDelayMsecs);
|
342 | });
|
343 | });
|
344 | }, function(err) { debug(err.message); });
|
345 |
|
346 | return request.clientStream;
|
347 | }
|
348 | }
|
349 |
|
350 | module.exports = CloudantClient;
|