1 | "use strict";
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | Object.defineProperty(exports, "__esModule", { value: true });
|
19 | exports.RetryingCall = exports.MessageBufferTracker = exports.RetryThrottler = void 0;
|
20 | const constants_1 = require("./constants");
|
21 | const metadata_1 = require("./metadata");
|
22 | const logging = require("./logging");
|
23 | const TRACER_NAME = 'retrying_call';
|
24 | class RetryThrottler {
|
25 | constructor(maxTokens, tokenRatio, previousRetryThrottler) {
|
26 | this.maxTokens = maxTokens;
|
27 | this.tokenRatio = tokenRatio;
|
28 | if (previousRetryThrottler) {
|
29 | |
30 |
|
31 | this.tokens = previousRetryThrottler.tokens * (maxTokens / previousRetryThrottler.maxTokens);
|
32 | }
|
33 | else {
|
34 | this.tokens = maxTokens;
|
35 | }
|
36 | }
|
37 | addCallSucceeded() {
|
38 | this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens);
|
39 | }
|
40 | addCallFailed() {
|
41 | this.tokens = Math.min(this.tokens - 1, 0);
|
42 | }
|
43 | canRetryCall() {
|
44 | return this.tokens > this.maxTokens / 2;
|
45 | }
|
46 | }
|
47 | exports.RetryThrottler = RetryThrottler;
|
48 | class MessageBufferTracker {
|
49 | constructor(totalLimit, limitPerCall) {
|
50 | this.totalLimit = totalLimit;
|
51 | this.limitPerCall = limitPerCall;
|
52 | this.totalAllocated = 0;
|
53 | this.allocatedPerCall = new Map();
|
54 | }
|
55 | allocate(size, callId) {
|
56 | var _a;
|
57 | const currentPerCall = (_a = this.allocatedPerCall.get(callId)) !== null && _a !== void 0 ? _a : 0;
|
58 | if (this.limitPerCall - currentPerCall < size || this.totalLimit - this.totalAllocated < size) {
|
59 | return false;
|
60 | }
|
61 | this.allocatedPerCall.set(callId, currentPerCall + size);
|
62 | this.totalAllocated += size;
|
63 | return true;
|
64 | }
|
65 | free(size, callId) {
|
66 | var _a;
|
67 | if (this.totalAllocated < size) {
|
68 | throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`);
|
69 | }
|
70 | this.totalAllocated -= size;
|
71 | const currentPerCall = (_a = this.allocatedPerCall.get(callId)) !== null && _a !== void 0 ? _a : 0;
|
72 | if (currentPerCall < size) {
|
73 | throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`);
|
74 | }
|
75 | this.allocatedPerCall.set(callId, currentPerCall - size);
|
76 | }
|
77 | freeAll(callId) {
|
78 | var _a;
|
79 | const currentPerCall = (_a = this.allocatedPerCall.get(callId)) !== null && _a !== void 0 ? _a : 0;
|
80 | if (this.totalAllocated < currentPerCall) {
|
81 | throw new Error(`Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`);
|
82 | }
|
83 | this.totalAllocated -= currentPerCall;
|
84 | this.allocatedPerCall.delete(callId);
|
85 | }
|
86 | }
|
87 | exports.MessageBufferTracker = MessageBufferTracker;
|
88 | const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';
|
89 | class RetryingCall {
|
90 | constructor(channel, callConfig, methodName, host, credentials, deadline, callNumber, bufferTracker, retryThrottler) {
|
91 | this.channel = channel;
|
92 | this.callConfig = callConfig;
|
93 | this.methodName = methodName;
|
94 | this.host = host;
|
95 | this.credentials = credentials;
|
96 | this.deadline = deadline;
|
97 | this.callNumber = callNumber;
|
98 | this.bufferTracker = bufferTracker;
|
99 | this.retryThrottler = retryThrottler;
|
100 | this.listener = null;
|
101 | this.initialMetadata = null;
|
102 | this.underlyingCalls = [];
|
103 | this.writeBuffer = [];
|
104 | this.transparentRetryUsed = false;
|
105 | |
106 |
|
107 |
|
108 | this.attempts = 0;
|
109 | this.hedgingTimer = null;
|
110 | this.committedCallIndex = null;
|
111 | this.initialRetryBackoffSec = 0;
|
112 | this.nextRetryBackoffSec = 0;
|
113 | if (callConfig.methodConfig.retryPolicy) {
|
114 | this.state = 'RETRY';
|
115 | const retryPolicy = callConfig.methodConfig.retryPolicy;
|
116 | this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(retryPolicy.initialBackoff.substring(0, retryPolicy.initialBackoff.length - 1));
|
117 | }
|
118 | else if (callConfig.methodConfig.hedgingPolicy) {
|
119 | this.state = 'HEDGING';
|
120 | }
|
121 | else {
|
122 | this.state = 'TRANSPARENT_ONLY';
|
123 | }
|
124 | }
|
125 | getCallNumber() {
|
126 | return this.callNumber;
|
127 | }
|
128 | trace(text) {
|
129 | logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '[' + this.callNumber + '] ' + text);
|
130 | }
|
131 | reportStatus(statusObject) {
|
132 | this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"');
|
133 | process.nextTick(() => {
|
134 | var _a;
|
135 | (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveStatus(statusObject);
|
136 | });
|
137 | }
|
138 | cancelWithStatus(status, details) {
|
139 | this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
|
140 | this.reportStatus({ code: status, details, metadata: new metadata_1.Metadata() });
|
141 | for (const { call } of this.underlyingCalls) {
|
142 | call.cancelWithStatus(status, details);
|
143 | }
|
144 | }
|
145 | getPeer() {
|
146 | if (this.committedCallIndex !== null) {
|
147 | return this.underlyingCalls[this.committedCallIndex].call.getPeer();
|
148 | }
|
149 | else {
|
150 | return 'unknown';
|
151 | }
|
152 | }
|
153 | commitCall(index) {
|
154 | if (this.state === 'COMMITTED') {
|
155 | return;
|
156 | }
|
157 | if (this.underlyingCalls[index].state === 'COMPLETED') {
|
158 | return;
|
159 | }
|
160 | this.trace('Committing call [' + this.underlyingCalls[index].call.getCallNumber() + '] at index ' + index);
|
161 | this.state = 'COMMITTED';
|
162 | this.committedCallIndex = index;
|
163 | for (let i = 0; i < this.underlyingCalls.length; i++) {
|
164 | if (i === index) {
|
165 | continue;
|
166 | }
|
167 | if (this.underlyingCalls[i].state === 'COMPLETED') {
|
168 | continue;
|
169 | }
|
170 | this.underlyingCalls[i].state = 'COMPLETED';
|
171 | this.underlyingCalls[i].call.cancelWithStatus(constants_1.Status.CANCELLED, 'Discarded in favor of other hedged attempt');
|
172 | }
|
173 | for (let messageIndex = 0; messageIndex < this.underlyingCalls[index].nextMessageToSend - 1; messageIndex += 1) {
|
174 | const bufferEntry = this.writeBuffer[messageIndex];
|
175 | if (bufferEntry.entryType === 'MESSAGE') {
|
176 | this.bufferTracker.free(bufferEntry.message.message.length, this.callNumber);
|
177 | this.writeBuffer[messageIndex] = {
|
178 | entryType: 'FREED'
|
179 | };
|
180 | }
|
181 | }
|
182 | }
|
183 | commitCallWithMostMessages() {
|
184 | let mostMessages = -1;
|
185 | let callWithMostMessages = -1;
|
186 | for (const [index, childCall] of this.underlyingCalls.entries()) {
|
187 | if (childCall.nextMessageToSend > mostMessages) {
|
188 | mostMessages = childCall.nextMessageToSend;
|
189 | callWithMostMessages = index;
|
190 | }
|
191 | }
|
192 | this.commitCall(callWithMostMessages);
|
193 | }
|
194 | isStatusCodeInList(list, code) {
|
195 | return list.some((value => value === code || value.toString().toLowerCase() === constants_1.Status[code].toLowerCase()));
|
196 | }
|
197 | getNextRetryBackoffMs() {
|
198 | var _a;
|
199 | const retryPolicy = (_a = this.callConfig) === null || _a === void 0 ? void 0 : _a.methodConfig.retryPolicy;
|
200 | if (!retryPolicy) {
|
201 | return 0;
|
202 | }
|
203 | const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000;
|
204 | const maxBackoffSec = Number(retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1));
|
205 | this.nextRetryBackoffSec = Math.min(this.nextRetryBackoffSec * retryPolicy.backoffMultiplier, maxBackoffSec);
|
206 | return nextBackoffMs;
|
207 | }
|
208 | maybeRetryCall(pushback, callback) {
|
209 | if (this.state !== 'RETRY') {
|
210 | callback(false);
|
211 | return;
|
212 | }
|
213 | const retryPolicy = this.callConfig.methodConfig.retryPolicy;
|
214 | if (this.attempts >= retryPolicy.maxAttempts) {
|
215 | callback(false);
|
216 | return;
|
217 | }
|
218 | let retryDelayMs;
|
219 | if (pushback === null) {
|
220 | retryDelayMs = this.getNextRetryBackoffMs();
|
221 | }
|
222 | else if (pushback < 0) {
|
223 | this.state = 'TRANSPARENT_ONLY';
|
224 | callback(false);
|
225 | return;
|
226 | }
|
227 | else {
|
228 | retryDelayMs = pushback;
|
229 | this.nextRetryBackoffSec = this.initialRetryBackoffSec;
|
230 | }
|
231 | setTimeout(() => {
|
232 | var _a, _b;
|
233 | if (this.state !== 'RETRY') {
|
234 | callback(false);
|
235 | return;
|
236 | }
|
237 | if ((_b = (_a = this.retryThrottler) === null || _a === void 0 ? void 0 : _a.canRetryCall()) !== null && _b !== void 0 ? _b : true) {
|
238 | callback(true);
|
239 | this.attempts += 1;
|
240 | this.startNewAttempt();
|
241 | }
|
242 | }, retryDelayMs);
|
243 | }
|
244 | countActiveCalls() {
|
245 | let count = 0;
|
246 | for (const call of this.underlyingCalls) {
|
247 | if ((call === null || call === void 0 ? void 0 : call.state) === 'ACTIVE') {
|
248 | count += 1;
|
249 | }
|
250 | }
|
251 | return count;
|
252 | }
|
253 | handleProcessedStatus(status, callIndex, pushback) {
|
254 | var _a, _b;
|
255 | switch (this.state) {
|
256 | case 'COMMITTED':
|
257 | case 'TRANSPARENT_ONLY':
|
258 | this.commitCall(callIndex);
|
259 | this.reportStatus(status);
|
260 | break;
|
261 | case 'HEDGING':
|
262 | if (this.isStatusCodeInList(this.callConfig.methodConfig.hedgingPolicy.nonFatalStatusCodes, status.code)) {
|
263 | (_a = this.retryThrottler) === null || _a === void 0 ? void 0 : _a.addCallFailed();
|
264 | let delayMs;
|
265 | if (pushback === null) {
|
266 | delayMs = 0;
|
267 | }
|
268 | else if (pushback < 0) {
|
269 | this.state = 'TRANSPARENT_ONLY';
|
270 | this.commitCall(callIndex);
|
271 | this.reportStatus(status);
|
272 | return;
|
273 | }
|
274 | else {
|
275 | delayMs = pushback;
|
276 | }
|
277 | setTimeout(() => {
|
278 | this.maybeStartHedgingAttempt();
|
279 |
|
280 | if (this.countActiveCalls() === 0) {
|
281 | this.commitCall(callIndex);
|
282 | this.reportStatus(status);
|
283 | }
|
284 | }, delayMs);
|
285 | }
|
286 | else {
|
287 | this.commitCall(callIndex);
|
288 | this.reportStatus(status);
|
289 | }
|
290 | break;
|
291 | case 'RETRY':
|
292 | if (this.isStatusCodeInList(this.callConfig.methodConfig.retryPolicy.retryableStatusCodes, status.code)) {
|
293 | (_b = this.retryThrottler) === null || _b === void 0 ? void 0 : _b.addCallFailed();
|
294 | this.maybeRetryCall(pushback, (retried) => {
|
295 | if (!retried) {
|
296 | this.commitCall(callIndex);
|
297 | this.reportStatus(status);
|
298 | }
|
299 | });
|
300 | }
|
301 | else {
|
302 | this.commitCall(callIndex);
|
303 | this.reportStatus(status);
|
304 | }
|
305 | break;
|
306 | }
|
307 | }
|
308 | getPushback(metadata) {
|
309 | const mdValue = metadata.get('grpc-retry-pushback-ms');
|
310 | if (mdValue.length === 0) {
|
311 | return null;
|
312 | }
|
313 | try {
|
314 | return parseInt(mdValue[0]);
|
315 | }
|
316 | catch (e) {
|
317 | return -1;
|
318 | }
|
319 | }
|
320 | handleChildStatus(status, callIndex) {
|
321 | var _a;
|
322 | if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
|
323 | return;
|
324 | }
|
325 | this.underlyingCalls[callIndex].state = 'COMPLETED';
|
326 | if (status.code === constants_1.Status.OK) {
|
327 | (_a = this.retryThrottler) === null || _a === void 0 ? void 0 : _a.addCallSucceeded();
|
328 | this.commitCall(callIndex);
|
329 | this.reportStatus(status);
|
330 | return;
|
331 | }
|
332 | if (this.state === 'COMMITTED') {
|
333 | this.reportStatus(status);
|
334 | return;
|
335 | }
|
336 | const pushback = this.getPushback(status.metadata);
|
337 | switch (status.progress) {
|
338 | case 'NOT_STARTED':
|
339 |
|
340 | this.startNewAttempt();
|
341 | break;
|
342 | case 'REFUSED':
|
343 |
|
344 | if (this.transparentRetryUsed) {
|
345 | this.handleProcessedStatus(status, callIndex, pushback);
|
346 | }
|
347 | else {
|
348 | this.transparentRetryUsed = true;
|
349 | this.startNewAttempt();
|
350 | }
|
351 | ;
|
352 | break;
|
353 | case 'DROP':
|
354 | this.commitCall(callIndex);
|
355 | this.reportStatus(status);
|
356 | break;
|
357 | case 'PROCESSED':
|
358 | this.handleProcessedStatus(status, callIndex, pushback);
|
359 | break;
|
360 | }
|
361 | }
|
362 | maybeStartHedgingAttempt() {
|
363 | if (this.state !== 'HEDGING') {
|
364 | return;
|
365 | }
|
366 | if (!this.callConfig.methodConfig.hedgingPolicy) {
|
367 | return;
|
368 | }
|
369 | const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
|
370 | if (this.attempts >= hedgingPolicy.maxAttempts) {
|
371 | return;
|
372 | }
|
373 | this.attempts += 1;
|
374 | this.startNewAttempt();
|
375 | this.maybeStartHedgingTimer();
|
376 | }
|
377 | maybeStartHedgingTimer() {
|
378 | var _a, _b, _c;
|
379 | if (this.hedgingTimer) {
|
380 | clearTimeout(this.hedgingTimer);
|
381 | }
|
382 | if (this.state !== 'HEDGING') {
|
383 | return;
|
384 | }
|
385 | if (!this.callConfig.methodConfig.hedgingPolicy) {
|
386 | return;
|
387 | }
|
388 | const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
|
389 | if (this.attempts >= hedgingPolicy.maxAttempts) {
|
390 | return;
|
391 | }
|
392 | const hedgingDelayString = (_a = hedgingPolicy.hedgingDelay) !== null && _a !== void 0 ? _a : '0s';
|
393 | const hedgingDelaySec = Number(hedgingDelayString.substring(0, hedgingDelayString.length - 1));
|
394 | this.hedgingTimer = setTimeout(() => {
|
395 | this.maybeStartHedgingAttempt();
|
396 | }, hedgingDelaySec * 1000);
|
397 | (_c = (_b = this.hedgingTimer).unref) === null || _c === void 0 ? void 0 : _c.call(_b);
|
398 | }
|
399 | startNewAttempt() {
|
400 | const child = this.channel.createLoadBalancingCall(this.callConfig, this.methodName, this.host, this.credentials, this.deadline);
|
401 | this.trace('Created child call [' + child.getCallNumber() + '] for attempt ' + this.attempts);
|
402 | const index = this.underlyingCalls.length;
|
403 | this.underlyingCalls.push({ state: 'ACTIVE', call: child, nextMessageToSend: 0 });
|
404 | const previousAttempts = this.attempts - 1;
|
405 | const initialMetadata = this.initialMetadata.clone();
|
406 | if (previousAttempts > 0) {
|
407 | initialMetadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
|
408 | }
|
409 | let receivedMetadata = false;
|
410 | child.start(initialMetadata, {
|
411 | onReceiveMetadata: metadata => {
|
412 | this.commitCall(index);
|
413 | receivedMetadata = true;
|
414 | if (previousAttempts > 0) {
|
415 | metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
|
416 | }
|
417 | if (this.underlyingCalls[index].state === 'ACTIVE') {
|
418 | this.listener.onReceiveMetadata(metadata);
|
419 | }
|
420 | },
|
421 | onReceiveMessage: message => {
|
422 | this.commitCall(index);
|
423 | if (this.underlyingCalls[index].state === 'ACTIVE') {
|
424 | this.listener.onReceiveMessage(message);
|
425 | }
|
426 | },
|
427 | onReceiveStatus: status => {
|
428 | if (!receivedMetadata && previousAttempts > 0) {
|
429 | status.metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
|
430 | }
|
431 | this.commitCall(index);
|
432 | this.handleChildStatus(status, index);
|
433 | }
|
434 | });
|
435 | }
|
436 | start(metadata, listener) {
|
437 | this.trace('start called');
|
438 | this.listener = listener;
|
439 | this.initialMetadata = metadata;
|
440 | this.attempts += 1;
|
441 | this.startNewAttempt();
|
442 | this.maybeStartHedgingTimer();
|
443 | }
|
444 | sendNextChildMessage(childIndex) {
|
445 | const childCall = this.underlyingCalls[childIndex];
|
446 | if (childCall.state === 'COMPLETED') {
|
447 | return;
|
448 | }
|
449 | if (this.writeBuffer[childCall.nextMessageToSend]) {
|
450 | const bufferEntry = this.writeBuffer[childCall.nextMessageToSend];
|
451 | switch (bufferEntry.entryType) {
|
452 | case 'MESSAGE':
|
453 | childCall.call.sendMessageWithContext({
|
454 | callback: (error) => {
|
455 |
|
456 | childCall.nextMessageToSend += 1;
|
457 | this.sendNextChildMessage(childIndex);
|
458 | }
|
459 | }, bufferEntry.message.message);
|
460 | break;
|
461 | case 'HALF_CLOSE':
|
462 | childCall.nextMessageToSend += 1;
|
463 | childCall.call.halfClose();
|
464 | break;
|
465 | case 'FREED':
|
466 |
|
467 | break;
|
468 | }
|
469 | }
|
470 | }
|
471 | sendMessageWithContext(context, message) {
|
472 | var _a;
|
473 | this.trace('write() called with message of length ' + message.length);
|
474 | const writeObj = {
|
475 | message,
|
476 | flags: context.flags,
|
477 | };
|
478 | const messageIndex = this.writeBuffer.length;
|
479 | const bufferEntry = {
|
480 | entryType: 'MESSAGE',
|
481 | message: writeObj
|
482 | };
|
483 | this.writeBuffer[messageIndex] = bufferEntry;
|
484 | if (this.bufferTracker.allocate(message.length, this.callNumber)) {
|
485 | (_a = context.callback) === null || _a === void 0 ? void 0 : _a.call(context);
|
486 | for (const [callIndex, call] of this.underlyingCalls.entries()) {
|
487 | if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
|
488 | call.call.sendMessageWithContext({
|
489 | callback: (error) => {
|
490 |
|
491 | call.nextMessageToSend += 1;
|
492 | this.sendNextChildMessage(callIndex);
|
493 | }
|
494 | }, message);
|
495 | }
|
496 | }
|
497 | }
|
498 | else {
|
499 | this.commitCallWithMostMessages();
|
500 | bufferEntry.callback = context.callback;
|
501 | }
|
502 | }
|
503 | startRead() {
|
504 | this.trace('startRead called');
|
505 | for (const underlyingCall of this.underlyingCalls) {
|
506 | if ((underlyingCall === null || underlyingCall === void 0 ? void 0 : underlyingCall.state) === 'ACTIVE') {
|
507 | underlyingCall.call.startRead();
|
508 | }
|
509 | }
|
510 | }
|
511 | halfClose() {
|
512 | this.trace('halfClose called');
|
513 | const halfCloseIndex = this.writeBuffer.length;
|
514 | this.writeBuffer[halfCloseIndex] = {
|
515 | entryType: 'HALF_CLOSE'
|
516 | };
|
517 | for (const call of this.underlyingCalls) {
|
518 | if ((call === null || call === void 0 ? void 0 : call.state) === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) {
|
519 | call.nextMessageToSend += 1;
|
520 | call.call.halfClose();
|
521 | }
|
522 | }
|
523 | }
|
524 | setCredentials(newCredentials) {
|
525 | throw new Error("Method not implemented.");
|
526 | }
|
527 | getMethod() {
|
528 | return this.methodName;
|
529 | }
|
530 | getHost() {
|
531 | return this.host;
|
532 | }
|
533 | }
|
534 | exports.RetryingCall = RetryingCall;
|
535 |
|
\ | No newline at end of file |