UNPKG

25 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2022 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.RetryingCall = exports.MessageBufferTracker = exports.RetryThrottler = void 0;
20const constants_1 = require("./constants");
21const metadata_1 = require("./metadata");
22const logging = require("./logging");
23const TRACER_NAME = 'retrying_call';
24class RetryThrottler {
25 constructor(maxTokens, tokenRatio, previousRetryThrottler) {
26 this.maxTokens = maxTokens;
27 this.tokenRatio = tokenRatio;
28 if (previousRetryThrottler) {
29 /* When carrying over tokens from a previous config, rescale them to the
30 * new max value */
31 this.tokens =
32 previousRetryThrottler.tokens *
33 (maxTokens / previousRetryThrottler.maxTokens);
34 }
35 else {
36 this.tokens = maxTokens;
37 }
38 }
39 addCallSucceeded() {
40 this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens);
41 }
42 addCallFailed() {
43 this.tokens = Math.min(this.tokens - 1, 0);
44 }
45 canRetryCall() {
46 return this.tokens > this.maxTokens / 2;
47 }
48}
49exports.RetryThrottler = RetryThrottler;
50class MessageBufferTracker {
51 constructor(totalLimit, limitPerCall) {
52 this.totalLimit = totalLimit;
53 this.limitPerCall = limitPerCall;
54 this.totalAllocated = 0;
55 this.allocatedPerCall = new Map();
56 }
57 allocate(size, callId) {
58 var _a;
59 const currentPerCall = (_a = this.allocatedPerCall.get(callId)) !== null && _a !== void 0 ? _a : 0;
60 if (this.limitPerCall - currentPerCall < size ||
61 this.totalLimit - this.totalAllocated < size) {
62 return false;
63 }
64 this.allocatedPerCall.set(callId, currentPerCall + size);
65 this.totalAllocated += size;
66 return true;
67 }
68 free(size, callId) {
69 var _a;
70 if (this.totalAllocated < size) {
71 throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`);
72 }
73 this.totalAllocated -= size;
74 const currentPerCall = (_a = this.allocatedPerCall.get(callId)) !== null && _a !== void 0 ? _a : 0;
75 if (currentPerCall < size) {
76 throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`);
77 }
78 this.allocatedPerCall.set(callId, currentPerCall - size);
79 }
80 freeAll(callId) {
81 var _a;
82 const currentPerCall = (_a = this.allocatedPerCall.get(callId)) !== null && _a !== void 0 ? _a : 0;
83 if (this.totalAllocated < currentPerCall) {
84 throw new Error(`Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`);
85 }
86 this.totalAllocated -= currentPerCall;
87 this.allocatedPerCall.delete(callId);
88 }
89}
90exports.MessageBufferTracker = MessageBufferTracker;
91const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';
92class RetryingCall {
93 constructor(channel, callConfig, methodName, host, credentials, deadline, callNumber, bufferTracker, retryThrottler) {
94 this.channel = channel;
95 this.callConfig = callConfig;
96 this.methodName = methodName;
97 this.host = host;
98 this.credentials = credentials;
99 this.deadline = deadline;
100 this.callNumber = callNumber;
101 this.bufferTracker = bufferTracker;
102 this.retryThrottler = retryThrottler;
103 this.listener = null;
104 this.initialMetadata = null;
105 this.underlyingCalls = [];
106 this.writeBuffer = [];
107 /**
108 * The offset of message indices in the writeBuffer. For example, if
109 * writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15
110 * is in writeBuffer[5].
111 */
112 this.writeBufferOffset = 0;
113 /**
114 * Tracks whether a read has been started, so that we know whether to start
115 * reads on new child calls. This only matters for the first read, because
116 * once a message comes in the child call becomes committed and there will
117 * be no new child calls.
118 */
119 this.readStarted = false;
120 this.transparentRetryUsed = false;
121 /**
122 * Number of attempts so far
123 */
124 this.attempts = 0;
125 this.hedgingTimer = null;
126 this.committedCallIndex = null;
127 this.initialRetryBackoffSec = 0;
128 this.nextRetryBackoffSec = 0;
129 if (callConfig.methodConfig.retryPolicy) {
130 this.state = 'RETRY';
131 const retryPolicy = callConfig.methodConfig.retryPolicy;
132 this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(retryPolicy.initialBackoff.substring(0, retryPolicy.initialBackoff.length - 1));
133 }
134 else if (callConfig.methodConfig.hedgingPolicy) {
135 this.state = 'HEDGING';
136 }
137 else {
138 this.state = 'TRANSPARENT_ONLY';
139 }
140 }
141 getCallNumber() {
142 return this.callNumber;
143 }
144 trace(text) {
145 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '[' + this.callNumber + '] ' + text);
146 }
147 reportStatus(statusObject) {
148 this.trace('ended with status: code=' +
149 statusObject.code +
150 ' details="' +
151 statusObject.details +
152 '"');
153 this.bufferTracker.freeAll(this.callNumber);
154 this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length;
155 this.writeBuffer = [];
156 process.nextTick(() => {
157 var _a;
158 // Explicitly construct status object to remove progress field
159 (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveStatus({
160 code: statusObject.code,
161 details: statusObject.details,
162 metadata: statusObject.metadata,
163 });
164 });
165 }
166 cancelWithStatus(status, details) {
167 this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
168 this.reportStatus({ code: status, details, metadata: new metadata_1.Metadata() });
169 for (const { call } of this.underlyingCalls) {
170 call.cancelWithStatus(status, details);
171 }
172 }
173 getPeer() {
174 if (this.committedCallIndex !== null) {
175 return this.underlyingCalls[this.committedCallIndex].call.getPeer();
176 }
177 else {
178 return 'unknown';
179 }
180 }
181 getBufferEntry(messageIndex) {
182 var _a;
183 return ((_a = this.writeBuffer[messageIndex - this.writeBufferOffset]) !== null && _a !== void 0 ? _a : {
184 entryType: 'FREED',
185 allocated: false,
186 });
187 }
188 getNextBufferIndex() {
189 return this.writeBufferOffset + this.writeBuffer.length;
190 }
191 clearSentMessages() {
192 if (this.state !== 'COMMITTED') {
193 return;
194 }
195 const earliestNeededMessageIndex = this.underlyingCalls[this.committedCallIndex].nextMessageToSend;
196 for (let messageIndex = this.writeBufferOffset; messageIndex < earliestNeededMessageIndex; messageIndex++) {
197 const bufferEntry = this.getBufferEntry(messageIndex);
198 if (bufferEntry.allocated) {
199 this.bufferTracker.free(bufferEntry.message.message.length, this.callNumber);
200 }
201 }
202 this.writeBuffer = this.writeBuffer.slice(earliestNeededMessageIndex - this.writeBufferOffset);
203 this.writeBufferOffset = earliestNeededMessageIndex;
204 }
205 commitCall(index) {
206 if (this.state === 'COMMITTED') {
207 return;
208 }
209 if (this.underlyingCalls[index].state === 'COMPLETED') {
210 return;
211 }
212 this.trace('Committing call [' +
213 this.underlyingCalls[index].call.getCallNumber() +
214 '] at index ' +
215 index);
216 this.state = 'COMMITTED';
217 this.committedCallIndex = index;
218 for (let i = 0; i < this.underlyingCalls.length; i++) {
219 if (i === index) {
220 continue;
221 }
222 if (this.underlyingCalls[i].state === 'COMPLETED') {
223 continue;
224 }
225 this.underlyingCalls[i].state = 'COMPLETED';
226 this.underlyingCalls[i].call.cancelWithStatus(constants_1.Status.CANCELLED, 'Discarded in favor of other hedged attempt');
227 }
228 this.clearSentMessages();
229 }
230 commitCallWithMostMessages() {
231 if (this.state === 'COMMITTED') {
232 return;
233 }
234 let mostMessages = -1;
235 let callWithMostMessages = -1;
236 for (const [index, childCall] of this.underlyingCalls.entries()) {
237 if (childCall.state === 'ACTIVE' &&
238 childCall.nextMessageToSend > mostMessages) {
239 mostMessages = childCall.nextMessageToSend;
240 callWithMostMessages = index;
241 }
242 }
243 if (callWithMostMessages === -1) {
244 /* There are no active calls, disable retries to force the next call that
245 * is started to be committed. */
246 this.state = 'TRANSPARENT_ONLY';
247 }
248 else {
249 this.commitCall(callWithMostMessages);
250 }
251 }
252 isStatusCodeInList(list, code) {
253 return list.some(value => value === code ||
254 value.toString().toLowerCase() === constants_1.Status[code].toLowerCase());
255 }
256 getNextRetryBackoffMs() {
257 var _a;
258 const retryPolicy = (_a = this.callConfig) === null || _a === void 0 ? void 0 : _a.methodConfig.retryPolicy;
259 if (!retryPolicy) {
260 return 0;
261 }
262 const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000;
263 const maxBackoffSec = Number(retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1));
264 this.nextRetryBackoffSec = Math.min(this.nextRetryBackoffSec * retryPolicy.backoffMultiplier, maxBackoffSec);
265 return nextBackoffMs;
266 }
267 maybeRetryCall(pushback, callback) {
268 if (this.state !== 'RETRY') {
269 callback(false);
270 return;
271 }
272 const retryPolicy = this.callConfig.methodConfig.retryPolicy;
273 if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) {
274 callback(false);
275 return;
276 }
277 let retryDelayMs;
278 if (pushback === null) {
279 retryDelayMs = this.getNextRetryBackoffMs();
280 }
281 else if (pushback < 0) {
282 this.state = 'TRANSPARENT_ONLY';
283 callback(false);
284 return;
285 }
286 else {
287 retryDelayMs = pushback;
288 this.nextRetryBackoffSec = this.initialRetryBackoffSec;
289 }
290 setTimeout(() => {
291 var _a, _b;
292 if (this.state !== 'RETRY') {
293 callback(false);
294 return;
295 }
296 if ((_b = (_a = this.retryThrottler) === null || _a === void 0 ? void 0 : _a.canRetryCall()) !== null && _b !== void 0 ? _b : true) {
297 callback(true);
298 this.attempts += 1;
299 this.startNewAttempt();
300 }
301 }, retryDelayMs);
302 }
303 countActiveCalls() {
304 let count = 0;
305 for (const call of this.underlyingCalls) {
306 if ((call === null || call === void 0 ? void 0 : call.state) === 'ACTIVE') {
307 count += 1;
308 }
309 }
310 return count;
311 }
312 handleProcessedStatus(status, callIndex, pushback) {
313 var _a, _b, _c;
314 switch (this.state) {
315 case 'COMMITTED':
316 case 'TRANSPARENT_ONLY':
317 this.commitCall(callIndex);
318 this.reportStatus(status);
319 break;
320 case 'HEDGING':
321 if (this.isStatusCodeInList((_a = this.callConfig.methodConfig.hedgingPolicy.nonFatalStatusCodes) !== null && _a !== void 0 ? _a : [], status.code)) {
322 (_b = this.retryThrottler) === null || _b === void 0 ? void 0 : _b.addCallFailed();
323 let delayMs;
324 if (pushback === null) {
325 delayMs = 0;
326 }
327 else if (pushback < 0) {
328 this.state = 'TRANSPARENT_ONLY';
329 this.commitCall(callIndex);
330 this.reportStatus(status);
331 return;
332 }
333 else {
334 delayMs = pushback;
335 }
336 setTimeout(() => {
337 this.maybeStartHedgingAttempt();
338 // If after trying to start a call there are no active calls, this was the last one
339 if (this.countActiveCalls() === 0) {
340 this.commitCall(callIndex);
341 this.reportStatus(status);
342 }
343 }, delayMs);
344 }
345 else {
346 this.commitCall(callIndex);
347 this.reportStatus(status);
348 }
349 break;
350 case 'RETRY':
351 if (this.isStatusCodeInList(this.callConfig.methodConfig.retryPolicy.retryableStatusCodes, status.code)) {
352 (_c = this.retryThrottler) === null || _c === void 0 ? void 0 : _c.addCallFailed();
353 this.maybeRetryCall(pushback, retried => {
354 if (!retried) {
355 this.commitCall(callIndex);
356 this.reportStatus(status);
357 }
358 });
359 }
360 else {
361 this.commitCall(callIndex);
362 this.reportStatus(status);
363 }
364 break;
365 }
366 }
367 getPushback(metadata) {
368 const mdValue = metadata.get('grpc-retry-pushback-ms');
369 if (mdValue.length === 0) {
370 return null;
371 }
372 try {
373 return parseInt(mdValue[0]);
374 }
375 catch (e) {
376 return -1;
377 }
378 }
379 handleChildStatus(status, callIndex) {
380 var _a;
381 if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
382 return;
383 }
384 this.trace('state=' +
385 this.state +
386 ' handling status with progress ' +
387 status.progress +
388 ' from child [' +
389 this.underlyingCalls[callIndex].call.getCallNumber() +
390 '] in state ' +
391 this.underlyingCalls[callIndex].state);
392 this.underlyingCalls[callIndex].state = 'COMPLETED';
393 if (status.code === constants_1.Status.OK) {
394 (_a = this.retryThrottler) === null || _a === void 0 ? void 0 : _a.addCallSucceeded();
395 this.commitCall(callIndex);
396 this.reportStatus(status);
397 return;
398 }
399 if (this.state === 'COMMITTED') {
400 this.reportStatus(status);
401 return;
402 }
403 const pushback = this.getPushback(status.metadata);
404 switch (status.progress) {
405 case 'NOT_STARTED':
406 // RPC never leaves the client, always safe to retry
407 this.startNewAttempt();
408 break;
409 case 'REFUSED':
410 // RPC reaches the server library, but not the server application logic
411 if (this.transparentRetryUsed) {
412 this.handleProcessedStatus(status, callIndex, pushback);
413 }
414 else {
415 this.transparentRetryUsed = true;
416 this.startNewAttempt();
417 }
418 break;
419 case 'DROP':
420 this.commitCall(callIndex);
421 this.reportStatus(status);
422 break;
423 case 'PROCESSED':
424 this.handleProcessedStatus(status, callIndex, pushback);
425 break;
426 }
427 }
428 maybeStartHedgingAttempt() {
429 if (this.state !== 'HEDGING') {
430 return;
431 }
432 if (!this.callConfig.methodConfig.hedgingPolicy) {
433 return;
434 }
435 const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
436 if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
437 return;
438 }
439 this.attempts += 1;
440 this.startNewAttempt();
441 this.maybeStartHedgingTimer();
442 }
443 maybeStartHedgingTimer() {
444 var _a, _b, _c;
445 if (this.hedgingTimer) {
446 clearTimeout(this.hedgingTimer);
447 }
448 if (this.state !== 'HEDGING') {
449 return;
450 }
451 if (!this.callConfig.methodConfig.hedgingPolicy) {
452 return;
453 }
454 const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
455 if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
456 return;
457 }
458 const hedgingDelayString = (_a = hedgingPolicy.hedgingDelay) !== null && _a !== void 0 ? _a : '0s';
459 const hedgingDelaySec = Number(hedgingDelayString.substring(0, hedgingDelayString.length - 1));
460 this.hedgingTimer = setTimeout(() => {
461 this.maybeStartHedgingAttempt();
462 }, hedgingDelaySec * 1000);
463 (_c = (_b = this.hedgingTimer).unref) === null || _c === void 0 ? void 0 : _c.call(_b);
464 }
465 startNewAttempt() {
466 const child = this.channel.createLoadBalancingCall(this.callConfig, this.methodName, this.host, this.credentials, this.deadline);
467 this.trace('Created child call [' +
468 child.getCallNumber() +
469 '] for attempt ' +
470 this.attempts);
471 const index = this.underlyingCalls.length;
472 this.underlyingCalls.push({
473 state: 'ACTIVE',
474 call: child,
475 nextMessageToSend: 0,
476 });
477 const previousAttempts = this.attempts - 1;
478 const initialMetadata = this.initialMetadata.clone();
479 if (previousAttempts > 0) {
480 initialMetadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
481 }
482 let receivedMetadata = false;
483 child.start(initialMetadata, {
484 onReceiveMetadata: metadata => {
485 this.trace('Received metadata from child [' + child.getCallNumber() + ']');
486 this.commitCall(index);
487 receivedMetadata = true;
488 if (previousAttempts > 0) {
489 metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
490 }
491 if (this.underlyingCalls[index].state === 'ACTIVE') {
492 this.listener.onReceiveMetadata(metadata);
493 }
494 },
495 onReceiveMessage: message => {
496 this.trace('Received message from child [' + child.getCallNumber() + ']');
497 this.commitCall(index);
498 if (this.underlyingCalls[index].state === 'ACTIVE') {
499 this.listener.onReceiveMessage(message);
500 }
501 },
502 onReceiveStatus: status => {
503 this.trace('Received status from child [' + child.getCallNumber() + ']');
504 if (!receivedMetadata && previousAttempts > 0) {
505 status.metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
506 }
507 this.handleChildStatus(status, index);
508 },
509 });
510 this.sendNextChildMessage(index);
511 if (this.readStarted) {
512 child.startRead();
513 }
514 }
515 start(metadata, listener) {
516 this.trace('start called');
517 this.listener = listener;
518 this.initialMetadata = metadata;
519 this.attempts += 1;
520 this.startNewAttempt();
521 this.maybeStartHedgingTimer();
522 }
523 handleChildWriteCompleted(childIndex) {
524 var _a, _b;
525 const childCall = this.underlyingCalls[childIndex];
526 const messageIndex = childCall.nextMessageToSend;
527 (_b = (_a = this.getBufferEntry(messageIndex)).callback) === null || _b === void 0 ? void 0 : _b.call(_a);
528 this.clearSentMessages();
529 childCall.nextMessageToSend += 1;
530 this.sendNextChildMessage(childIndex);
531 }
532 sendNextChildMessage(childIndex) {
533 const childCall = this.underlyingCalls[childIndex];
534 if (childCall.state === 'COMPLETED') {
535 return;
536 }
537 if (this.getBufferEntry(childCall.nextMessageToSend)) {
538 const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
539 switch (bufferEntry.entryType) {
540 case 'MESSAGE':
541 childCall.call.sendMessageWithContext({
542 callback: error => {
543 // Ignore error
544 this.handleChildWriteCompleted(childIndex);
545 },
546 }, bufferEntry.message.message);
547 break;
548 case 'HALF_CLOSE':
549 childCall.nextMessageToSend += 1;
550 childCall.call.halfClose();
551 break;
552 case 'FREED':
553 // Should not be possible
554 break;
555 }
556 }
557 }
558 sendMessageWithContext(context, message) {
559 var _a;
560 this.trace('write() called with message of length ' + message.length);
561 const writeObj = {
562 message,
563 flags: context.flags,
564 };
565 const messageIndex = this.getNextBufferIndex();
566 const bufferEntry = {
567 entryType: 'MESSAGE',
568 message: writeObj,
569 allocated: this.bufferTracker.allocate(message.length, this.callNumber),
570 };
571 this.writeBuffer.push(bufferEntry);
572 if (bufferEntry.allocated) {
573 (_a = context.callback) === null || _a === void 0 ? void 0 : _a.call(context);
574 for (const [callIndex, call] of this.underlyingCalls.entries()) {
575 if (call.state === 'ACTIVE' &&
576 call.nextMessageToSend === messageIndex) {
577 call.call.sendMessageWithContext({
578 callback: error => {
579 // Ignore error
580 this.handleChildWriteCompleted(callIndex);
581 },
582 }, message);
583 }
584 }
585 }
586 else {
587 this.commitCallWithMostMessages();
588 // commitCallWithMostMessages can fail if we are between ping attempts
589 if (this.committedCallIndex === null) {
590 return;
591 }
592 const call = this.underlyingCalls[this.committedCallIndex];
593 bufferEntry.callback = context.callback;
594 if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
595 call.call.sendMessageWithContext({
596 callback: error => {
597 // Ignore error
598 this.handleChildWriteCompleted(this.committedCallIndex);
599 },
600 }, message);
601 }
602 }
603 }
604 startRead() {
605 this.trace('startRead called');
606 this.readStarted = true;
607 for (const underlyingCall of this.underlyingCalls) {
608 if ((underlyingCall === null || underlyingCall === void 0 ? void 0 : underlyingCall.state) === 'ACTIVE') {
609 underlyingCall.call.startRead();
610 }
611 }
612 }
613 halfClose() {
614 this.trace('halfClose called');
615 const halfCloseIndex = this.getNextBufferIndex();
616 this.writeBuffer.push({
617 entryType: 'HALF_CLOSE',
618 allocated: false,
619 });
620 for (const call of this.underlyingCalls) {
621 if ((call === null || call === void 0 ? void 0 : call.state) === 'ACTIVE' &&
622 call.nextMessageToSend === halfCloseIndex) {
623 call.nextMessageToSend += 1;
624 call.call.halfClose();
625 }
626 }
627 }
628 setCredentials(newCredentials) {
629 throw new Error('Method not implemented.');
630 }
631 getMethod() {
632 return this.methodName;
633 }
634 getHost() {
635 return this.host;
636 }
637}
638exports.RetryingCall = RetryingCall;
639//# sourceMappingURL=retrying-call.js.map
\No newline at end of file