UNPKG

5.2 kBJavaScriptView Raw
1// Copyright © 2017, 2018 IBM Corp. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14'use strict';
15
16const async = require('async');
17const debug = require('debug')('cloudant:clientutils');
18
19// send response to the client
20var sendResponseToClient = function(response, clientStream, clientCallback) {
21 debug('An alternative response will be returned to the client');
22 // response = [<error>, <response>, <data>]
23 if (response[0]) {
24 clientStream.emit('error', response[0]);
25 }
26 if (response[1]) {
27 clientStream.emit('response', response[1]);
28 }
29 if (response[2]) {
30 clientStream.emit('data', Buffer.from(response[2], 'utf8'));
31 }
32 clientStream.emit('end');
33
34 if (typeof clientCallback === 'function') {
35 clientCallback.apply(null, response); // execute client callback
36 }
37};
38
39// update the state with a new state (from plugin hook)
40var updateState = function(state, newState, callback) {
41 if (newState.abortWithResponse) {
42 // plugin requested immediate abort
43 state.retry = false;
44 state.abortWithResponse = newState.abortWithResponse;
45 return callback(new Error('Plugin issued abort')); // stop plugin hooks
46 }
47 if (newState.retry) {
48 state.retry = true; // plugin requested a retry
49 }
50 if (newState.retryDelayMsecs > state.retryDelayMsecs) {
51 state.retryDelayMsecs = newState.retryDelayMsecs; // set new retry delay
52 }
53 callback();
54};
55
56// public
57
58// process state (following plugin execution)
59var processState = function(r, callback) {
60 var abort = function() {
61 if (r.response) {
62 debug('Client issued abort.');
63 r.response.abort();
64 }
65 };
66
67 if (r.abort) {
68 // [1] => Client has called for the request to be aborted.
69 abort();
70 callback(new Error('Client issued abort')); // no retry
71 return;
72 }
73
74 if (r.state.abortWithResponse) {
75 // [2] => Plugin requested abort and specified alternative response.
76 abort();
77 sendResponseToClient(r.state.abortWithResponse, r.clientStream, r.clientCallback);
78 var err = new Error('Plugin issued abort');
79 err.skipClientCallback = true;
80 callback(err); // no retry
81 return;
82 }
83
84 if (r.state.retry && r.state.attempt < r.state.maxAttempt) {
85 // [3] => One or more plugins have called for the request to be retried.
86 abort();
87 debug('Plugin issued a retry.');
88 callback(); // retry
89 return;
90 }
91
92 if (r.response) {
93 // monkey-patch real abort function
94 r.clientStream.abort = function() {
95 debug('Client issued abort.');
96 r.response.abort();
97 };
98 }
99
100 if (r.state.retry) {
101 debug('Failed to retry request. Too many retry attempts.');
102 r.state.retry = false;
103 }
104
105 if (!r.state.sending) {
106 // [4] => Request has not yet been sent. Still processing 'onRequest' hooks.
107 callback(); // continue
108 return;
109 }
110
111 if (r.response) {
112 // pass response events/data to awaiting client
113 if (r.eventRelay) {
114 r.eventRelay.resume();
115 }
116 if (r.clientStream.destinations.length > 0) {
117 r.response.pipe(r.clientStream.passThroughReadable);
118 }
119 r.response.resume();
120 }
121
122 // [5] => Return response to awaiting client.
123 callback(new Error('No retry requested')); // no retry
124};
125
126// execute a specified hook for all plugins
127var runHooks = function(hookName, r, data, end) {
128 if (r.plugins.length === 0) {
129 end(); // no plugins
130 } else {
131 async.eachSeries(r.plugins, function(plugin, done) {
132 if (typeof plugin[hookName] !== 'function') {
133 done(); // no hooks for plugin
134 } else {
135 debug(`Running hook ${hookName} for plugin '${plugin.id}'.`);
136 var oldState = Object.assign({}, r.state);
137 oldState.stash = r.plugin_stash[plugin.id]; // add stash
138 plugin[hookName](oldState, data, function(newState) {
139 updateState(r.state, newState, done);
140 });
141 }
142 }, end);
143 }
144};
145
146// wrap client callback to allow for plugin error hook execution
147var wrapCallback = function(r, done) {
148 if (typeof r.clientCallback === 'undefined') {
149 return undefined; // noop
150 } else {
151 debug('Client callback specified.');
152 // return wrapped callback
153 return function(error, response, body) {
154 if (error) {
155 runHooks('onError', r, error, function() {
156 processState(r, function(stop) {
157 if (stop && !stop.skipClientCallback) {
158 r.clientCallback(error, response, body);
159 }
160 done(stop);
161 });
162 });
163 } else {
164 r.clientCallback(error, response, body);
165 // execute `done()` in response hook
166 }
167 };
168 }
169};
170
171module.exports = {
172 runHooks: runHooks,
173 processState: processState,
174 wrapCallback: wrapCallback
175};