UNPKG

21 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2022 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.RetryingCall = exports.MessageBufferTracker = exports.RetryThrottler = void 0;
20const constants_1 = require("./constants");
21const metadata_1 = require("./metadata");
22const logging = require("./logging");
23const TRACER_NAME = 'retrying_call';
24class RetryThrottler {
25 constructor(maxTokens, tokenRatio, previousRetryThrottler) {
26 this.maxTokens = maxTokens;
27 this.tokenRatio = tokenRatio;
28 if (previousRetryThrottler) {
29 /* When carrying over tokens from a previous config, rescale them to the
30 * new max value */
31 this.tokens = previousRetryThrottler.tokens * (maxTokens / previousRetryThrottler.maxTokens);
32 }
33 else {
34 this.tokens = maxTokens;
35 }
36 }
37 addCallSucceeded() {
38 this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens);
39 }
40 addCallFailed() {
41 this.tokens = Math.min(this.tokens - 1, 0);
42 }
43 canRetryCall() {
44 return this.tokens > this.maxTokens / 2;
45 }
46}
47exports.RetryThrottler = RetryThrottler;
48class MessageBufferTracker {
49 constructor(totalLimit, limitPerCall) {
50 this.totalLimit = totalLimit;
51 this.limitPerCall = limitPerCall;
52 this.totalAllocated = 0;
53 this.allocatedPerCall = new Map();
54 }
55 allocate(size, callId) {
56 var _a;
57 const currentPerCall = (_a = this.allocatedPerCall.get(callId)) !== null && _a !== void 0 ? _a : 0;
58 if (this.limitPerCall - currentPerCall < size || this.totalLimit - this.totalAllocated < size) {
59 return false;
60 }
61 this.allocatedPerCall.set(callId, currentPerCall + size);
62 this.totalAllocated += size;
63 return true;
64 }
65 free(size, callId) {
66 var _a;
67 if (this.totalAllocated < size) {
68 throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`);
69 }
70 this.totalAllocated -= size;
71 const currentPerCall = (_a = this.allocatedPerCall.get(callId)) !== null && _a !== void 0 ? _a : 0;
72 if (currentPerCall < size) {
73 throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`);
74 }
75 this.allocatedPerCall.set(callId, currentPerCall - size);
76 }
77 freeAll(callId) {
78 var _a;
79 const currentPerCall = (_a = this.allocatedPerCall.get(callId)) !== null && _a !== void 0 ? _a : 0;
80 if (this.totalAllocated < currentPerCall) {
81 throw new Error(`Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`);
82 }
83 this.totalAllocated -= currentPerCall;
84 this.allocatedPerCall.delete(callId);
85 }
86}
87exports.MessageBufferTracker = MessageBufferTracker;
88const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';
89class RetryingCall {
90 constructor(channel, callConfig, methodName, host, credentials, deadline, callNumber, bufferTracker, retryThrottler) {
91 this.channel = channel;
92 this.callConfig = callConfig;
93 this.methodName = methodName;
94 this.host = host;
95 this.credentials = credentials;
96 this.deadline = deadline;
97 this.callNumber = callNumber;
98 this.bufferTracker = bufferTracker;
99 this.retryThrottler = retryThrottler;
100 this.listener = null;
101 this.initialMetadata = null;
102 this.underlyingCalls = [];
103 this.writeBuffer = [];
104 this.transparentRetryUsed = false;
105 /**
106 * Number of attempts so far
107 */
108 this.attempts = 0;
109 this.hedgingTimer = null;
110 this.committedCallIndex = null;
111 this.initialRetryBackoffSec = 0;
112 this.nextRetryBackoffSec = 0;
113 if (callConfig.methodConfig.retryPolicy) {
114 this.state = 'RETRY';
115 const retryPolicy = callConfig.methodConfig.retryPolicy;
116 this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(retryPolicy.initialBackoff.substring(0, retryPolicy.initialBackoff.length - 1));
117 }
118 else if (callConfig.methodConfig.hedgingPolicy) {
119 this.state = 'HEDGING';
120 }
121 else {
122 this.state = 'TRANSPARENT_ONLY';
123 }
124 }
125 getCallNumber() {
126 return this.callNumber;
127 }
128 trace(text) {
129 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '[' + this.callNumber + '] ' + text);
130 }
131 reportStatus(statusObject) {
132 this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"');
133 process.nextTick(() => {
134 var _a;
135 (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveStatus(statusObject);
136 });
137 }
138 cancelWithStatus(status, details) {
139 this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
140 this.reportStatus({ code: status, details, metadata: new metadata_1.Metadata() });
141 for (const { call } of this.underlyingCalls) {
142 call.cancelWithStatus(status, details);
143 }
144 }
145 getPeer() {
146 if (this.committedCallIndex !== null) {
147 return this.underlyingCalls[this.committedCallIndex].call.getPeer();
148 }
149 else {
150 return 'unknown';
151 }
152 }
153 commitCall(index) {
154 if (this.state === 'COMMITTED') {
155 return;
156 }
157 if (this.underlyingCalls[index].state === 'COMPLETED') {
158 return;
159 }
160 this.trace('Committing call [' + this.underlyingCalls[index].call.getCallNumber() + '] at index ' + index);
161 this.state = 'COMMITTED';
162 this.committedCallIndex = index;
163 for (let i = 0; i < this.underlyingCalls.length; i++) {
164 if (i === index) {
165 continue;
166 }
167 if (this.underlyingCalls[i].state === 'COMPLETED') {
168 continue;
169 }
170 this.underlyingCalls[i].state = 'COMPLETED';
171 this.underlyingCalls[i].call.cancelWithStatus(constants_1.Status.CANCELLED, 'Discarded in favor of other hedged attempt');
172 }
173 for (let messageIndex = 0; messageIndex < this.underlyingCalls[index].nextMessageToSend - 1; messageIndex += 1) {
174 const bufferEntry = this.writeBuffer[messageIndex];
175 if (bufferEntry.entryType === 'MESSAGE') {
176 this.bufferTracker.free(bufferEntry.message.message.length, this.callNumber);
177 this.writeBuffer[messageIndex] = {
178 entryType: 'FREED'
179 };
180 }
181 }
182 }
183 commitCallWithMostMessages() {
184 let mostMessages = -1;
185 let callWithMostMessages = -1;
186 for (const [index, childCall] of this.underlyingCalls.entries()) {
187 if (childCall.nextMessageToSend > mostMessages) {
188 mostMessages = childCall.nextMessageToSend;
189 callWithMostMessages = index;
190 }
191 }
192 this.commitCall(callWithMostMessages);
193 }
194 isStatusCodeInList(list, code) {
195 return list.some((value => value === code || value.toString().toLowerCase() === constants_1.Status[code].toLowerCase()));
196 }
197 getNextRetryBackoffMs() {
198 var _a;
199 const retryPolicy = (_a = this.callConfig) === null || _a === void 0 ? void 0 : _a.methodConfig.retryPolicy;
200 if (!retryPolicy) {
201 return 0;
202 }
203 const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000;
204 const maxBackoffSec = Number(retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1));
205 this.nextRetryBackoffSec = Math.min(this.nextRetryBackoffSec * retryPolicy.backoffMultiplier, maxBackoffSec);
206 return nextBackoffMs;
207 }
208 maybeRetryCall(pushback, callback) {
209 if (this.state !== 'RETRY') {
210 callback(false);
211 return;
212 }
213 const retryPolicy = this.callConfig.methodConfig.retryPolicy;
214 if (this.attempts >= retryPolicy.maxAttempts) {
215 callback(false);
216 return;
217 }
218 let retryDelayMs;
219 if (pushback === null) {
220 retryDelayMs = this.getNextRetryBackoffMs();
221 }
222 else if (pushback < 0) {
223 this.state = 'TRANSPARENT_ONLY';
224 callback(false);
225 return;
226 }
227 else {
228 retryDelayMs = pushback;
229 this.nextRetryBackoffSec = this.initialRetryBackoffSec;
230 }
231 setTimeout(() => {
232 var _a, _b;
233 if (this.state !== 'RETRY') {
234 callback(false);
235 return;
236 }
237 if ((_b = (_a = this.retryThrottler) === null || _a === void 0 ? void 0 : _a.canRetryCall()) !== null && _b !== void 0 ? _b : true) {
238 callback(true);
239 this.attempts += 1;
240 this.startNewAttempt();
241 }
242 }, retryDelayMs);
243 }
244 countActiveCalls() {
245 let count = 0;
246 for (const call of this.underlyingCalls) {
247 if ((call === null || call === void 0 ? void 0 : call.state) === 'ACTIVE') {
248 count += 1;
249 }
250 }
251 return count;
252 }
253 handleProcessedStatus(status, callIndex, pushback) {
254 var _a, _b;
255 switch (this.state) {
256 case 'COMMITTED':
257 case 'TRANSPARENT_ONLY':
258 this.commitCall(callIndex);
259 this.reportStatus(status);
260 break;
261 case 'HEDGING':
262 if (this.isStatusCodeInList(this.callConfig.methodConfig.hedgingPolicy.nonFatalStatusCodes, status.code)) {
263 (_a = this.retryThrottler) === null || _a === void 0 ? void 0 : _a.addCallFailed();
264 let delayMs;
265 if (pushback === null) {
266 delayMs = 0;
267 }
268 else if (pushback < 0) {
269 this.state = 'TRANSPARENT_ONLY';
270 this.commitCall(callIndex);
271 this.reportStatus(status);
272 return;
273 }
274 else {
275 delayMs = pushback;
276 }
277 setTimeout(() => {
278 this.maybeStartHedgingAttempt();
279 // If after trying to start a call there are no active calls, this was the last one
280 if (this.countActiveCalls() === 0) {
281 this.commitCall(callIndex);
282 this.reportStatus(status);
283 }
284 }, delayMs);
285 }
286 else {
287 this.commitCall(callIndex);
288 this.reportStatus(status);
289 }
290 break;
291 case 'RETRY':
292 if (this.isStatusCodeInList(this.callConfig.methodConfig.retryPolicy.retryableStatusCodes, status.code)) {
293 (_b = this.retryThrottler) === null || _b === void 0 ? void 0 : _b.addCallFailed();
294 this.maybeRetryCall(pushback, (retried) => {
295 if (!retried) {
296 this.commitCall(callIndex);
297 this.reportStatus(status);
298 }
299 });
300 }
301 else {
302 this.commitCall(callIndex);
303 this.reportStatus(status);
304 }
305 break;
306 }
307 }
308 getPushback(metadata) {
309 const mdValue = metadata.get('grpc-retry-pushback-ms');
310 if (mdValue.length === 0) {
311 return null;
312 }
313 try {
314 return parseInt(mdValue[0]);
315 }
316 catch (e) {
317 return -1;
318 }
319 }
320 handleChildStatus(status, callIndex) {
321 var _a;
322 if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
323 return;
324 }
325 this.underlyingCalls[callIndex].state = 'COMPLETED';
326 if (status.code === constants_1.Status.OK) {
327 (_a = this.retryThrottler) === null || _a === void 0 ? void 0 : _a.addCallSucceeded();
328 this.commitCall(callIndex);
329 this.reportStatus(status);
330 return;
331 }
332 if (this.state === 'COMMITTED') {
333 this.reportStatus(status);
334 return;
335 }
336 const pushback = this.getPushback(status.metadata);
337 switch (status.progress) {
338 case 'NOT_STARTED':
339 // RPC never leaves the client, always safe to retry
340 this.startNewAttempt();
341 break;
342 case 'REFUSED':
343 // RPC reaches the server library, but not the server application logic
344 if (this.transparentRetryUsed) {
345 this.handleProcessedStatus(status, callIndex, pushback);
346 }
347 else {
348 this.transparentRetryUsed = true;
349 this.startNewAttempt();
350 }
351 ;
352 break;
353 case 'DROP':
354 this.commitCall(callIndex);
355 this.reportStatus(status);
356 break;
357 case 'PROCESSED':
358 this.handleProcessedStatus(status, callIndex, pushback);
359 break;
360 }
361 }
362 maybeStartHedgingAttempt() {
363 if (this.state !== 'HEDGING') {
364 return;
365 }
366 if (!this.callConfig.methodConfig.hedgingPolicy) {
367 return;
368 }
369 const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
370 if (this.attempts >= hedgingPolicy.maxAttempts) {
371 return;
372 }
373 this.attempts += 1;
374 this.startNewAttempt();
375 this.maybeStartHedgingTimer();
376 }
377 maybeStartHedgingTimer() {
378 var _a, _b, _c;
379 if (this.hedgingTimer) {
380 clearTimeout(this.hedgingTimer);
381 }
382 if (this.state !== 'HEDGING') {
383 return;
384 }
385 if (!this.callConfig.methodConfig.hedgingPolicy) {
386 return;
387 }
388 const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
389 if (this.attempts >= hedgingPolicy.maxAttempts) {
390 return;
391 }
392 const hedgingDelayString = (_a = hedgingPolicy.hedgingDelay) !== null && _a !== void 0 ? _a : '0s';
393 const hedgingDelaySec = Number(hedgingDelayString.substring(0, hedgingDelayString.length - 1));
394 this.hedgingTimer = setTimeout(() => {
395 this.maybeStartHedgingAttempt();
396 }, hedgingDelaySec * 1000);
397 (_c = (_b = this.hedgingTimer).unref) === null || _c === void 0 ? void 0 : _c.call(_b);
398 }
399 startNewAttempt() {
400 const child = this.channel.createLoadBalancingCall(this.callConfig, this.methodName, this.host, this.credentials, this.deadline);
401 this.trace('Created child call [' + child.getCallNumber() + '] for attempt ' + this.attempts);
402 const index = this.underlyingCalls.length;
403 this.underlyingCalls.push({ state: 'ACTIVE', call: child, nextMessageToSend: 0 });
404 const previousAttempts = this.attempts - 1;
405 const initialMetadata = this.initialMetadata.clone();
406 if (previousAttempts > 0) {
407 initialMetadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
408 }
409 let receivedMetadata = false;
410 child.start(initialMetadata, {
411 onReceiveMetadata: metadata => {
412 this.commitCall(index);
413 receivedMetadata = true;
414 if (previousAttempts > 0) {
415 metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
416 }
417 if (this.underlyingCalls[index].state === 'ACTIVE') {
418 this.listener.onReceiveMetadata(metadata);
419 }
420 },
421 onReceiveMessage: message => {
422 this.commitCall(index);
423 if (this.underlyingCalls[index].state === 'ACTIVE') {
424 this.listener.onReceiveMessage(message);
425 }
426 },
427 onReceiveStatus: status => {
428 if (!receivedMetadata && previousAttempts > 0) {
429 status.metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
430 }
431 this.commitCall(index);
432 this.handleChildStatus(status, index);
433 }
434 });
435 }
436 start(metadata, listener) {
437 this.trace('start called');
438 this.listener = listener;
439 this.initialMetadata = metadata;
440 this.attempts += 1;
441 this.startNewAttempt();
442 this.maybeStartHedgingTimer();
443 }
444 sendNextChildMessage(childIndex) {
445 const childCall = this.underlyingCalls[childIndex];
446 if (childCall.state === 'COMPLETED') {
447 return;
448 }
449 if (this.writeBuffer[childCall.nextMessageToSend]) {
450 const bufferEntry = this.writeBuffer[childCall.nextMessageToSend];
451 switch (bufferEntry.entryType) {
452 case 'MESSAGE':
453 childCall.call.sendMessageWithContext({
454 callback: (error) => {
455 // Ignore error
456 childCall.nextMessageToSend += 1;
457 this.sendNextChildMessage(childIndex);
458 }
459 }, bufferEntry.message.message);
460 break;
461 case 'HALF_CLOSE':
462 childCall.nextMessageToSend += 1;
463 childCall.call.halfClose();
464 break;
465 case 'FREED':
466 // Should not be possible
467 break;
468 }
469 }
470 }
471 sendMessageWithContext(context, message) {
472 var _a;
473 this.trace('write() called with message of length ' + message.length);
474 const writeObj = {
475 message,
476 flags: context.flags,
477 };
478 const messageIndex = this.writeBuffer.length;
479 const bufferEntry = {
480 entryType: 'MESSAGE',
481 message: writeObj
482 };
483 this.writeBuffer[messageIndex] = bufferEntry;
484 if (this.bufferTracker.allocate(message.length, this.callNumber)) {
485 (_a = context.callback) === null || _a === void 0 ? void 0 : _a.call(context);
486 for (const [callIndex, call] of this.underlyingCalls.entries()) {
487 if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
488 call.call.sendMessageWithContext({
489 callback: (error) => {
490 // Ignore error
491 call.nextMessageToSend += 1;
492 this.sendNextChildMessage(callIndex);
493 }
494 }, message);
495 }
496 }
497 }
498 else {
499 this.commitCallWithMostMessages();
500 bufferEntry.callback = context.callback;
501 }
502 }
503 startRead() {
504 this.trace('startRead called');
505 for (const underlyingCall of this.underlyingCalls) {
506 if ((underlyingCall === null || underlyingCall === void 0 ? void 0 : underlyingCall.state) === 'ACTIVE') {
507 underlyingCall.call.startRead();
508 }
509 }
510 }
511 halfClose() {
512 this.trace('halfClose called');
513 const halfCloseIndex = this.writeBuffer.length;
514 this.writeBuffer[halfCloseIndex] = {
515 entryType: 'HALF_CLOSE'
516 };
517 for (const call of this.underlyingCalls) {
518 if ((call === null || call === void 0 ? void 0 : call.state) === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) {
519 call.nextMessageToSend += 1;
520 call.call.halfClose();
521 }
522 }
523 }
524 setCredentials(newCredentials) {
525 throw new Error("Method not implemented.");
526 }
527 getMethod() {
528 return this.methodName;
529 }
530 getHost() {
531 return this.host;
532 }
533}
534exports.RetryingCall = RetryingCall;
535//# sourceMappingURL=retrying-call.js.map
\No newline at end of file