1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 | 'use strict';
|
15 |
|
16 | const async = require('async');
|
17 | const debug = require('debug')('cloudant:clientutils');
|
18 |
|
19 |
|
20 | var sendResponseToClient = function(response, clientStream, clientCallback) {
|
21 | debug('An alternative response will be returned to the client');
|
22 |
|
23 | if (response[0]) {
|
24 | clientStream.emit('error', response[0]);
|
25 | }
|
26 | if (response[1]) {
|
27 | clientStream.emit('response', response[1]);
|
28 | }
|
29 | if (response[2]) {
|
30 | clientStream.emit('data', Buffer.from(response[2], 'utf8'));
|
31 | }
|
32 | clientStream.emit('end');
|
33 |
|
34 | if (typeof clientCallback === 'function') {
|
35 | clientCallback.apply(null, response);
|
36 | }
|
37 | };
|
38 |
|
39 |
|
40 | var updateState = function(state, newState, callback) {
|
41 | if (newState.abortWithResponse) {
|
42 |
|
43 | state.retry = false;
|
44 | state.abortWithResponse = newState.abortWithResponse;
|
45 | return callback(new Error('Plugin issued abort'));
|
46 | }
|
47 | if (newState.retry) {
|
48 | state.retry = true;
|
49 | }
|
50 | if (newState.retryDelayMsecs > state.retryDelayMsecs) {
|
51 | state.retryDelayMsecs = newState.retryDelayMsecs;
|
52 | }
|
53 | callback();
|
54 | };
|
55 |
|
56 |
|
57 |
|
58 |
|
59 | var processState = function(r, callback) {
|
60 | var abort = function() {
|
61 | if (r.response) {
|
62 | debug('Client issued abort.');
|
63 | r.response.abort();
|
64 | }
|
65 | };
|
66 |
|
67 | if (r.abort) {
|
68 |
|
69 | abort();
|
70 | callback(new Error('Client issued abort'));
|
71 | return;
|
72 | }
|
73 |
|
74 | if (r.state.abortWithResponse) {
|
75 |
|
76 | abort();
|
77 | sendResponseToClient(r.state.abortWithResponse, r.clientStream, r.clientCallback);
|
78 | var err = new Error('Plugin issued abort');
|
79 | err.skipClientCallback = true;
|
80 | callback(err);
|
81 | return;
|
82 | }
|
83 |
|
84 | if (r.state.retry && r.state.attempt < r.state.maxAttempt) {
|
85 |
|
86 | abort();
|
87 | debug('Plugin issued a retry.');
|
88 | callback();
|
89 | return;
|
90 | }
|
91 |
|
92 | if (r.response) {
|
93 |
|
94 | r.clientStream.abort = function() {
|
95 | debug('Client issued abort.');
|
96 | r.response.abort();
|
97 | };
|
98 | }
|
99 |
|
100 | if (r.state.retry) {
|
101 | debug('Failed to retry request. Too many retry attempts.');
|
102 | r.state.retry = false;
|
103 | }
|
104 |
|
105 | if (!r.state.sending) {
|
106 |
|
107 | callback();
|
108 | return;
|
109 | }
|
110 |
|
111 | if (r.response) {
|
112 |
|
113 | if (r.eventRelay) {
|
114 | r.eventRelay.resume();
|
115 | }
|
116 | if (r.clientStream.destinations.length > 0) {
|
117 | r.response.pipe(r.clientStream.passThroughReadable);
|
118 | }
|
119 | r.response.resume();
|
120 | }
|
121 |
|
122 |
|
123 | callback(new Error('No retry requested'));
|
124 | };
|
125 |
|
126 |
|
127 | var runHooks = function(hookName, r, data, end) {
|
128 | if (r.plugins.length === 0) {
|
129 | end();
|
130 | } else {
|
131 | async.eachSeries(r.plugins, function(plugin, done) {
|
132 | if (typeof plugin[hookName] !== 'function') {
|
133 | done();
|
134 | } else if (plugin.disabled) {
|
135 | debug(`Skipping hook ${hookName} for disabled plugin '${plugin.id}'.`);
|
136 | done();
|
137 | } else {
|
138 | debug(`Running hook ${hookName} for plugin '${plugin.id}'.`);
|
139 | var oldState = Object.assign({}, r.state);
|
140 | oldState.stash = r.plugin_stash[plugin.id];
|
141 | plugin[hookName](oldState, data, function(newState) {
|
142 | updateState(r.state, newState, done);
|
143 | });
|
144 | }
|
145 | }, end);
|
146 | }
|
147 | };
|
148 |
|
149 |
|
150 | var wrapCallback = function(r, done) {
|
151 | if (typeof r.clientCallback === 'undefined') {
|
152 | return undefined;
|
153 | } else {
|
154 | debug('Client callback specified.');
|
155 |
|
156 | return function(error, response, body) {
|
157 | if (error) {
|
158 | runHooks('onError', r, error, function() {
|
159 | processState(r, function(stop) {
|
160 | if (stop && !stop.skipClientCallback) {
|
161 | r.clientCallback(error, response, body);
|
162 | }
|
163 | done(stop);
|
164 | });
|
165 | });
|
166 | } else {
|
167 | r.clientCallback(error, response, body);
|
168 |
|
169 | }
|
170 | };
|
171 | }
|
172 | };
|
173 |
|
174 | module.exports = {
|
175 | runHooks: runHooks,
|
176 | processState: processState,
|
177 | wrapCallback: wrapCallback
|
178 | };
|