UNPKG

43 kBJavaScriptView Raw
1"use strict";
2var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
3 return new (P || (P = Promise))(function (resolve, reject) {
4 function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
5 function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
6 function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
7 step((generator = generator.apply(thisArg, _arguments || [])).next());
8 });
9};
10Object.defineProperty(exports, "__esModule", { value: true });
11const os = require("os");
12const BluebirdPromise = require("bluebird");
13const semaphore = require("semaphore");
14const SemaphoreUtil_1 = require("../Utility/SemaphoreUtil");
15const LogUtil_1 = require("../Utility/LogUtil");
16const Timer_1 = require("../Primitives/Timer");
17const ServerNode_1 = require("./ServerNode");
18const Topology_1 = require("./Topology");
19const GetDatabaseTopologyCommand_1 = require("../ServerWide/Commands/GetDatabaseTopologyCommand");
20const StatusCode_1 = require("./StatusCode");
21const NodeSelector_1 = require("./NodeSelector");
22const Certificate_1 = require("../Auth/Certificate");
23const HttpCache_1 = require("./HttpCache");
24const Exceptions_1 = require("../Exceptions");
25const GetClientConfigurationOperation_1 = require("../Documents/Operations/Configuration/GetClientConfigurationOperation");
26const Constants_1 = require("../Constants");
27const Stopwatch_1 = require("../Utility/Stopwatch");
28const PromiseUtil = require("../Utility/PromiseUtil");
29const GetStatisticsOperation_1 = require("../Documents/Operations/GetStatisticsOperation");
30const TypeUtil_1 = require("../Utility/TypeUtil");
31const Serializer_1 = require("../Mapping/Json/Serializer");
32const UriUtil_1 = require("../Utility/UriUtil");
33const StreamUtil = require("../Utility/StreamUtil");
34const HttpUtil_1 = require("../Utility/HttpUtil");
35const PromiseUtil_1 = require("../Utility/PromiseUtil");
36const DEFAULT_REQUEST_OPTIONS = {};
37const log = LogUtil_1.getLogger({ module: "RequestExecutor" });
38class IndexAndResponse {
39 constructor(index, response) {
40 this.index = index;
41 this.response = response;
42 }
43}
44class NodeStatus {
45 constructor(nodeIndex, node, nodeStatusCallback) {
46 this.nodeIndex = nodeIndex;
47 this.node = node;
48 this._timerPeriodInMs = 100;
49 this._nodeStatusCallback = nodeStatusCallback;
50 }
51 _nextTimerPeriod() {
52 if (this._timerPeriodInMs <= 5000) {
53 return 5000;
54 }
55 this._timerPeriodInMs = this._timerPeriodInMs + 100;
56 return this._timerPeriodInMs;
57 }
58 startTimer() {
59 const that = this;
60 this._timer = new Timer_1.Timer(function timerActionNodeStatusCallback() {
61 return that._nodeStatusCallback(that);
62 }, this._timerPeriodInMs);
63 }
64 updateTimer() {
65 this._timer.change(this._nextTimerPeriod());
66 }
67 dispose() {
68 this._timer.dispose();
69 }
70}
71exports.NodeStatus = NodeStatus;
72class RequestExecutor {
73 constructor(database, authOptions, conventions) {
74 this._updateDatabaseTopologySemaphore = semaphore();
75 this._updateClientConfigurationSemaphore = semaphore();
76 this._failedNodesTimers = new Map();
77 this._certificate = null;
78 this.aggressiveCaching = null;
79 this.numberOfServerRequests = 0;
80 this._clientConfigurationEtag = 0;
81 this._topologyEtag = 0;
82 this._log = LogUtil_1.getLogger({
83 module: `${this.constructor.name}-${Math.floor(Math.random() * 10000)}`
84 });
85 this._cache = new HttpCache_1.HttpCache(conventions.maxHttpCacheSize);
86 this._readBalanceBehavior = conventions.readBalanceBehavior;
87 this._databaseName = database;
88 this._lastReturnedResponse = new Date();
89 this._conventions = conventions.clone();
90 this._authOptions = authOptions;
91 this._certificate = Certificate_1.Certificate.createFromOptions(this._authOptions);
92 this._setDefaultRequestOptions();
93 }
94 get _firstTopologyUpdatePromise() {
95 return this._firstTopologyUpdatePromiseInternal;
96 }
97 set _firstTopologyUpdatePromise(value) {
98 this._firstTopologyUpdatePromiseInternal = value;
99 if (value) {
100 this._firstTopologyUpdateStatus = PromiseUtil_1.PromiseStatusTracker.track(value);
101 }
102 }
103 get customHttpRequestOptions() {
104 return this._customHttpRequestOptions;
105 }
106 set customHttpRequestOptions(value) {
107 this._customHttpRequestOptions = value;
108 this._setDefaultRequestOptions();
109 }
110 getAuthOptions() {
111 return this._authOptions;
112 }
113 getTopologyEtag() {
114 return this._topologyEtag;
115 }
116 get conventions() {
117 return this._conventions;
118 }
119 getClientConfigurationEtag() {
120 return this._clientConfigurationEtag;
121 }
122 get cache() {
123 return this._cache;
124 }
125 get disposed() {
126 return this._disposed;
127 }
128 getUrl() {
129 if (!this._nodeSelector) {
130 return null;
131 }
132 const preferredNode = this._nodeSelector.getPreferredNode();
133 return preferredNode
134 ? preferredNode.currentNode.url
135 : null;
136 }
137 getTopology() {
138 return this._nodeSelector
139 ? this._nodeSelector.getTopology()
140 : null;
141 }
142 getTopologyNodes() {
143 const topology = this.getTopology();
144 return topology
145 ? [...topology.nodes]
146 : null;
147 }
148 static create(initialUrls, database, opts) {
149 const { authOptions, documentConventions } = opts || {};
150 const executor = new RequestExecutor(database, authOptions, documentConventions);
151 executor._firstTopologyUpdatePromise = executor._firstTopologyUpdate(initialUrls);
152 executor._firstTopologyUpdatePromise.catch(TypeUtil_1.TypeUtil.NOOP);
153 return executor;
154 }
155 static createForSingleNodeWithConfigurationUpdates(url, database, opts) {
156 const executor = this.createForSingleNodeWithoutConfigurationUpdates(url, database, opts);
157 executor._disableClientConfigurationUpdates = false;
158 return executor;
159 }
160 static createForSingleNodeWithoutConfigurationUpdates(url, database, opts) {
161 const { authOptions, documentConventions } = opts;
162 const initialUrls = RequestExecutor._validateUrls([url], authOptions);
163 const executor = new RequestExecutor(database, authOptions, documentConventions);
164 const topology = new Topology_1.Topology();
165 topology.etag = -1;
166 const serverNode = new ServerNode_1.ServerNode({
167 url: initialUrls[0],
168 database
169 });
170 topology.nodes = [serverNode];
171 executor._nodeSelector = new NodeSelector_1.NodeSelector(topology);
172 executor._topologyEtag = -2;
173 executor._disableTopologyUpdates = true;
174 executor._disableClientConfigurationUpdates = true;
175 return executor;
176 }
177 _ensureNodeSelector() {
178 return __awaiter(this, void 0, void 0, function* () {
179 if (this._firstTopologyUpdatePromise
180 && (!this._firstTopologyUpdateStatus.isFullfilled()
181 || this._firstTopologyUpdateStatus.isRejected())) {
182 yield this._firstTopologyUpdatePromise;
183 }
184 if (!this._nodeSelector) {
185 const topology = new Topology_1.Topology(this._topologyEtag, this.getTopologyNodes().slice());
186 this._nodeSelector = new NodeSelector_1.NodeSelector(topology);
187 }
188 });
189 }
190 getPreferredNode() {
191 return __awaiter(this, void 0, void 0, function* () {
192 yield this._ensureNodeSelector();
193 return this._nodeSelector.getPreferredNode();
194 });
195 }
196 getNodeBySessionId(sessionId) {
197 return __awaiter(this, void 0, void 0, function* () {
198 yield this._ensureNodeSelector();
199 return this._nodeSelector.getNodeBySessionId(sessionId);
200 });
201 }
202 getFastestNode() {
203 return __awaiter(this, void 0, void 0, function* () {
204 yield this._ensureNodeSelector();
205 return this._nodeSelector.getFastestNode();
206 });
207 }
208 _updateClientConfigurationInternal() {
209 return __awaiter(this, void 0, void 0, function* () {
210 const oldDisableClientConfigurationUpdates = this._disableClientConfigurationUpdates;
211 this._disableClientConfigurationUpdates = true;
212 try {
213 if (this._disposed) {
214 return;
215 }
216 const command = new GetClientConfigurationOperation_1.GetClientConfigurationCommand();
217 const { currentNode, currentIndex } = this.chooseNodeForRequest(command, null);
218 yield this.execute(command, null, {
219 chosenNode: currentNode,
220 nodeIndex: currentIndex,
221 shouldRetry: false
222 });
223 const clientConfigOpResult = command.result;
224 if (!clientConfigOpResult) {
225 return;
226 }
227 this._conventions.updateFrom(clientConfigOpResult.configuration);
228 this._clientConfigurationEtag = clientConfigOpResult.etag;
229 }
230 catch (err) {
231 this._log.error(err, "Error getting client configuration.");
232 }
233 finally {
234 this._disableClientConfigurationUpdates = oldDisableClientConfigurationUpdates;
235 }
236 });
237 }
238 _updateClientConfiguration() {
239 return __awaiter(this, void 0, void 0, function* () {
240 if (this._disposed) {
241 return;
242 }
243 let semAcquiredContext;
244 try {
245 semAcquiredContext = SemaphoreUtil_1.acquireSemaphore(this._updateClientConfigurationSemaphore);
246 yield semAcquiredContext.promise;
247 yield this._updateClientConfigurationInternal();
248 }
249 finally {
250 if (semAcquiredContext) {
251 semAcquiredContext.dispose();
252 }
253 }
254 });
255 }
256 updateTopology(node, timeout, forceUpdate = false) {
257 if (this._disposed) {
258 return Promise.resolve(false);
259 }
260 if (this._disableTopologyUpdates) {
261 return Promise.resolve(false);
262 }
263 const acquiredSemContext = SemaphoreUtil_1.acquireSemaphore(this._updateDatabaseTopologySemaphore, { timeout });
264 const result = BluebirdPromise.resolve(acquiredSemContext.promise)
265 .then(() => {
266 if (this._disposed) {
267 return false;
268 }
269 this._log.info(`Update topology from ${node.url}.`);
270 const getTopology = new GetDatabaseTopologyCommand_1.GetDatabaseTopologyCommand();
271 const getTopologyPromise = this.execute(getTopology, null, {
272 chosenNode: node,
273 nodeIndex: null,
274 shouldRetry: false,
275 });
276 return getTopologyPromise
277 .then(() => {
278 const topology = getTopology.result;
279 if (!this._nodeSelector) {
280 this._nodeSelector = new NodeSelector_1.NodeSelector(topology);
281 if (this._readBalanceBehavior === "FastestNode") {
282 this._nodeSelector.scheduleSpeedTest();
283 }
284 }
285 else if (this._nodeSelector.onUpdateTopology(topology, forceUpdate)) {
286 this._disposeAllFailedNodesTimers();
287 if (this._readBalanceBehavior === "FastestNode") {
288 this._nodeSelector.scheduleSpeedTest();
289 }
290 }
291 this._topologyEtag = this._nodeSelector.getTopology().etag;
292 return true;
293 });
294 }, (reason) => {
295 if (reason.name === "TimeoutError") {
296 return false;
297 }
298 throw reason;
299 })
300 .finally(() => {
301 acquiredSemContext.dispose();
302 });
303 return Promise.resolve(result);
304 }
305 static _validateUrls(initialUrls, authOptions) {
306 const cleanUrls = [...Array(initialUrls.length)];
307 let requireHttps = !!authOptions;
308 for (let index = 0; index < initialUrls.length; index++) {
309 const url = initialUrls[index];
310 UriUtil_1.validateUri(url);
311 cleanUrls[index] = url.replace(/\/$/, "");
312 requireHttps = requireHttps || url.startsWith("https://");
313 }
314 if (!requireHttps) {
315 return cleanUrls;
316 }
317 for (const url of initialUrls) {
318 if (!url.startsWith("http://")) {
319 continue;
320 }
321 if (authOptions && authOptions.certificate) {
322 Exceptions_1.throwError("InvalidOperationException", "The url " + url + " is using HTTP, but a certificate is specified, which require us to use HTTPS");
323 }
324 Exceptions_1.throwError("InvalidOperationException", "The url " + url
325 + " is using HTTP, but other urls are using HTTPS, and mixing of HTTP and HTTPS is not allowed.");
326 }
327 return cleanUrls;
328 }
329 _initializeUpdateTopologyTimer() {
330 if (this._updateTopologyTimer || this._disposed) {
331 return;
332 }
333 this._log.info("Initialize update topology timer.");
334 const minInMs = 60 * 1000;
335 const that = this;
336 this._updateTopologyTimer =
337 new Timer_1.Timer(function timerActionUpdateTopology() {
338 return that._updateTopologyCallback();
339 }, minInMs, minInMs);
340 }
341 _updateTopologyCallback() {
342 const time = new Date();
343 const minInMs = 60 * 1000;
344 if (time.valueOf() - this._lastReturnedResponse.valueOf() <= minInMs) {
345 return;
346 }
347 let serverNode;
348 try {
349 const selector = this._nodeSelector;
350 if (!selector) {
351 return;
352 }
353 const preferredNode = selector.getPreferredNode();
354 serverNode = preferredNode.currentNode;
355 }
356 catch (err) {
357 this._log.warn(err, "Couldn't get preferred node Topology from _updateTopologyTimer");
358 return;
359 }
360 return this.updateTopology(serverNode, 0)
361 .catch(err => {
362 this._log.error(err, "Couldn't update topology from _updateTopologyTimer");
363 return null;
364 });
365 }
366 _firstTopologyUpdate(inputUrls) {
367 return __awaiter(this, void 0, void 0, function* () {
368 const initialUrls = RequestExecutor._validateUrls(inputUrls, this._authOptions);
369 const topologyUpdateErrors = [];
370 const tryUpdateTopology = (url, database) => __awaiter(this, void 0, void 0, function* () {
371 const serverNode = new ServerNode_1.ServerNode({ url, database });
372 try {
373 yield this.updateTopology(serverNode, TypeUtil_1.TypeUtil.MAX_INT32);
374 this._initializeUpdateTopologyTimer();
375 this._topologyTakenFromNode = serverNode;
376 return true;
377 }
378 catch (error) {
379 if (error.name === "DatabaseDoesNotExistException") {
380 this._lastKnownUrls = initialUrls;
381 throw error;
382 }
383 if (initialUrls.length === 0) {
384 this._lastKnownUrls = initialUrls;
385 Exceptions_1.throwError("InvalidOperationException", `Cannot get topology from server: ${url}.`, error);
386 }
387 topologyUpdateErrors.push({ url, error });
388 return false;
389 }
390 });
391 const tryUpdateTopologyOnAllNodes = () => __awaiter(this, void 0, void 0, function* () {
392 for (const url of initialUrls) {
393 if (yield tryUpdateTopology(url, this._databaseName)) {
394 return;
395 }
396 }
397 return false;
398 });
399 yield tryUpdateTopologyOnAllNodes();
400 const topology = new Topology_1.Topology();
401 topology.etag = this._topologyEtag;
402 let topologyNodes = this.getTopologyNodes();
403 if (!topologyNodes) {
404 topologyNodes = initialUrls.map(url => {
405 const serverNode = new ServerNode_1.ServerNode({
406 url,
407 database: this._databaseName
408 });
409 serverNode.clusterTag = "!";
410 return serverNode;
411 });
412 }
413 topology.nodes = topologyNodes;
414 this._nodeSelector = new NodeSelector_1.NodeSelector(topology);
415 if (initialUrls && initialUrls.length > 0) {
416 this._initializeUpdateTopologyTimer();
417 return;
418 }
419 this._lastKnownUrls = initialUrls;
420 const details = topologyUpdateErrors
421 .map(x => `${x.url} -> ${x.error && x.error.stack ? x.error.stack : x.error}`)
422 .join(", ");
423 this._throwExceptions(details);
424 });
425 }
426 _throwExceptions(details) {
427 Exceptions_1.throwError("InvalidOperationException", "Failed to retrieve database topology from all known nodes"
428 + os.EOL + details);
429 }
430 _disposeAllFailedNodesTimers() {
431 for (const item of this._failedNodesTimers) {
432 item[1].dispose();
433 }
434 this._failedNodesTimers.clear();
435 }
436 chooseNodeForRequest(cmd, sessionInfo) {
437 if (!cmd.isReadRequest) {
438 return this._nodeSelector.getPreferredNode();
439 }
440 switch (this._readBalanceBehavior) {
441 case "None":
442 return this._nodeSelector.getPreferredNode();
443 case "RoundRobin":
444 return this._nodeSelector.getNodeBySessionId(sessionInfo ? sessionInfo.sessionId : 0);
445 case "FastestNode":
446 return this._nodeSelector.getFastestNode();
447 default:
448 Exceptions_1.throwError("NotSupportedException", `Invalid read balance behavior: ${this._readBalanceBehavior}`);
449 }
450 }
451 execute(command, sessionInfo, options) {
452 if (options) {
453 return this._executeOnSpecificNode(command, sessionInfo, options);
454 }
455 this._log.info(`Execute command ${command.constructor.name}`);
456 const topologyUpdate = this._firstTopologyUpdatePromise;
457 const topologyUpdateStatus = this._firstTopologyUpdateStatus;
458 if ((topologyUpdate && topologyUpdateStatus.isResolved()) || this._disableTopologyUpdates) {
459 const currentIndexAndNode = this.chooseNodeForRequest(command, sessionInfo);
460 return this._executeOnSpecificNode(command, sessionInfo, {
461 chosenNode: currentIndexAndNode.currentNode,
462 nodeIndex: currentIndexAndNode.currentIndex,
463 shouldRetry: true
464 });
465 }
466 else {
467 return this._unlikelyExecute(command, topologyUpdate, sessionInfo);
468 }
469 }
470 _unlikelyExecute(command, topologyUpdate, sessionInfo) {
471 return __awaiter(this, void 0, void 0, function* () {
472 try {
473 if (!this._firstTopologyUpdatePromise) {
474 if (!this._lastKnownUrls) {
475 Exceptions_1.throwError("InvalidOperationException", "No known topology and no previously known one, cannot proceed, likely a bug");
476 }
477 topologyUpdate = this._firstTopologyUpdate(this._lastKnownUrls);
478 }
479 yield topologyUpdate;
480 }
481 catch (reason) {
482 if (this._firstTopologyUpdatePromise === topologyUpdate) {
483 this._firstTopologyUpdatePromise = null;
484 }
485 this._log.warn(reason, "Error doing topology update.");
486 throw reason;
487 }
488 const currentIndexAndNode = this.chooseNodeForRequest(command, sessionInfo);
489 return this._executeOnSpecificNode(command, sessionInfo, {
490 chosenNode: currentIndexAndNode.currentNode,
491 nodeIndex: currentIndexAndNode.currentIndex,
492 shouldRetry: true
493 });
494 });
495 }
496 _getFromCache(command, useCache, url, cachedItemMetadataCallback) {
497 if (useCache
498 && command.canCache
499 && command.isReadRequest
500 && command.responseType === "Object") {
501 return this._cache.get(url, cachedItemMetadataCallback);
502 }
503 cachedItemMetadataCallback({
504 changeVector: null,
505 response: null
506 });
507 return new HttpCache_1.ReleaseCacheItem(null);
508 }
509 _executeOnSpecificNode(command, sessionInfo = null, options = null) {
510 return __awaiter(this, void 0, void 0, function* () {
511 const { chosenNode, nodeIndex, shouldRetry } = options;
512 this._log.info(`Actual execute ${command.constructor.name} on ${chosenNode.url}`
513 + ` ${shouldRetry ? "with" : "without"} retry.`);
514 const req = this._createRequest(chosenNode, command);
515 const noCaching = sessionInfo ? sessionInfo.noCaching : false;
516 let cachedChangeVector;
517 let cachedValue;
518 const cachedItem = this._getFromCache(command, !noCaching, req.uri.toString(), (cachedItemMetadata) => {
519 cachedChangeVector = cachedItemMetadata.changeVector;
520 cachedValue = cachedItemMetadata.response;
521 });
522 if (cachedChangeVector) {
523 const aggressiveCacheOptions = this.aggressiveCaching;
524 if (aggressiveCacheOptions
525 && cachedItem.age < aggressiveCacheOptions.duration
526 && !cachedItem.mightHaveBeenModified
527 && command.canCacheAggressively) {
528 return command.setResponseFromCache(cachedValue);
529 }
530 req.headers["If-None-Match"] = `"${cachedChangeVector}"`;
531 }
532 if (!this._disableClientConfigurationUpdates) {
533 req.headers[Constants_1.HEADERS.CLIENT_CONFIGURATION_ETAG] = `"${this._clientConfigurationEtag}"`;
534 }
535 if (sessionInfo && sessionInfo.lastClusterTransactionIndex) {
536 req.headers[Constants_1.HEADERS.LAST_KNOWN_CLUSTER_TRANSACTION_INDEX] =
537 sessionInfo.lastClusterTransactionIndex;
538 }
539 if (!this._disableTopologyUpdates) {
540 req.headers[Constants_1.HEADERS.TOPOLOGY_ETAG] = `"${this._topologyEtag}"`;
541 }
542 const sp = Stopwatch_1.Stopwatch.createStarted();
543 let response = null;
544 let responseDispose = "Automatic";
545 let bodyStream;
546 this.numberOfServerRequests++;
547 try {
548 if (this._shouldExecuteOnAll(chosenNode, command)) {
549 response = yield this._executeOnAllToFigureOutTheFastest(chosenNode, command);
550 }
551 else {
552 const responseAndBody = yield command.send(req);
553 bodyStream = responseAndBody.bodyStream;
554 response = responseAndBody.response;
555 }
556 if (sessionInfo && sessionInfo.lastClusterTransactionIndex) {
557 const version = response.caseless.get(Constants_1.HEADERS.SERVER_VERSION);
558 if (version && "4.1" === version) {
559 Exceptions_1.throwError("ClientVersionMismatchException", "The server on " + chosenNode.url + " has an old version and can't perform "
560 + "the command since this command dependent on a cluster transaction "
561 + " which this node doesn't support.");
562 }
563 }
564 sp.stop();
565 }
566 catch (error) {
567 this._log.warn(error, `Error executing '${command.constructor.name}' `
568 + `on specific node '${chosenNode.url}'`
569 + `${chosenNode.database ? "db " + chosenNode.database : ""}.`);
570 if (!shouldRetry) {
571 throw error;
572 }
573 sp.stop();
574 const serverDownHandledSuccessfully = yield this._handleServerDown(req.uri, chosenNode, nodeIndex, command, req, response, null, error, sessionInfo, shouldRetry);
575 if (!serverDownHandledSuccessfully) {
576 this._throwFailedToContactAllNodes(command, req, error, null);
577 }
578 return;
579 }
580 command.statusCode = response.statusCode;
581 const refreshTopology = response
582 && response.caseless
583 && response.caseless.get(Constants_1.HEADERS.REFRESH_TOPOLOGY);
584 const refreshClientConfiguration = response
585 && response.caseless
586 && response.caseless.get(Constants_1.HEADERS.REFRESH_CLIENT_CONFIGURATION);
587 try {
588 if (response.statusCode === StatusCode_1.StatusCodes.NotModified) {
589 cachedItem.notModified();
590 if (command.responseType === "Object") {
591 yield command.setResponseFromCache(cachedValue);
592 }
593 return;
594 }
595 if (response.statusCode >= 400) {
596 const unsuccessfulResponseHandled = yield this._handleUnsuccessfulResponse(chosenNode, nodeIndex, command, req, response, bodyStream, req.uri, sessionInfo, shouldRetry);
597 if (!unsuccessfulResponseHandled) {
598 const dbMissingHeader = response.caseless.get(Constants_1.HEADERS.DATABASE_MISSING);
599 if (dbMissingHeader) {
600 Exceptions_1.throwError("DatabaseDoesNotExistException", dbMissingHeader);
601 }
602 if (command.failedNodes.size === 0) {
603 Exceptions_1.throwError("InvalidOperationException", "Received unsuccessful response and couldn't recover from it. "
604 + "Also, no record of exceptions per failed nodes. "
605 + "This is weird and should not happen.");
606 }
607 if (command.failedNodes.size === 1) {
608 const values = [...command.failedNodes.values()];
609 if (values && values.some(x => !!x)) {
610 const err = values.filter(x => !!x).map(x => x)[0];
611 Exceptions_1.throwError(err.name, err.message, err);
612 }
613 }
614 Exceptions_1.throwError("AllTopologyNodesDownException", "Received unsuccessful response from all servers"
615 + " and couldn't recover from it.");
616 }
617 return;
618 }
619 responseDispose = yield command.processResponse(this._cache, response, bodyStream, req.uri);
620 this._lastReturnedResponse = new Date();
621 }
622 finally {
623 if (responseDispose === "Automatic") {
624 HttpUtil_1.closeHttpResponse(response);
625 }
626 if (refreshTopology || refreshClientConfiguration) {
627 const serverNode = new ServerNode_1.ServerNode({
628 url: chosenNode.url,
629 database: this._databaseName
630 });
631 const topologyTask = refreshTopology
632 ? BluebirdPromise.resolve(this.updateTopology(serverNode, 0))
633 .tapCatch(err => this._log.warn(err, "Error refreshing topology."))
634 : BluebirdPromise.resolve(false);
635 const clientConfigurationTask = refreshClientConfiguration
636 ? BluebirdPromise.resolve(this._updateClientConfiguration())
637 .tapCatch(err => this._log.warn(err, "Error refreshing client configuration."))
638 .then(() => true)
639 : BluebirdPromise.resolve(false);
640 yield Promise.all([topologyTask, clientConfigurationTask]);
641 }
642 }
643 });
644 }
645 _throwFailedToContactAllNodes(command, req, e, timeoutException) {
646 let message = "Tried to send "
647 + command.constructor.name
648 + " request via "
649 + (req.method || "GET") + " "
650 + req.uri + " to all configured nodes in the topology, "
651 + "all of them seem to be down or not responding. I've tried to access the following nodes: ";
652 if (this._nodeSelector) {
653 const topology = this._nodeSelector.getTopology();
654 if (topology) {
655 message += topology.nodes.map(x => x.url).join(", ");
656 }
657 }
658 const tplFromNode = this._topologyTakenFromNode;
659 if (tplFromNode && this._nodeSelector) {
660 const topology = this._nodeSelector.getTopology();
661 if (topology) {
662 const nodesText = topology.nodes
663 .map(x => `( url: ${x.url}, clusterTag: ${x.clusterTag}, serverRole: ${x.serverRole})`)
664 .join(", ");
665 message += os.EOL
666 + `I was able to fetch ${tplFromNode.database} topology from ${tplFromNode.url}.`
667 + os.EOL
668 + `Fetched topology: ${nodesText}`;
669 }
670 }
671 const innerErr = timeoutException || e;
672 Exceptions_1.throwError("AllTopologyNodesDownException", message, innerErr);
673 }
674 inSpeedTestPhase() {
675 return this._nodeSelector
676 && this._nodeSelector.inSpeedTestPhase();
677 }
678 _handleUnsuccessfulResponse(chosenNode, nodeIndex, command, req, response, responseBodyStream, url, sessionInfo, shouldRetry) {
679 return __awaiter(this, void 0, void 0, function* () {
680 responseBodyStream.resume();
681 const readBody = () => StreamUtil.readToEnd(responseBodyStream);
682 switch (response.statusCode) {
683 case StatusCode_1.StatusCodes.NotFound:
684 this._cache.setNotFound(url);
685 switch (command.responseType) {
686 case "Empty":
687 return Promise.resolve(true);
688 case "Object":
689 return command.setResponseAsync(null, false)
690 .then(() => true);
691 default:
692 command.setResponseRaw(response, null);
693 break;
694 }
695 return true;
696 case StatusCode_1.StatusCodes.Forbidden:
697 Exceptions_1.throwError("AuthorizationException", `Forbidden access to ${chosenNode.database}@${chosenNode.url}`
698 + `, ${req.method || "GET"} ${req.uri}`);
699 break;
700 case StatusCode_1.StatusCodes.Gone:
701 if (!shouldRetry) {
702 return false;
703 }
704 return this.updateTopology(chosenNode, Number.MAX_VALUE, true)
705 .then(() => {
706 const currentIndexAndNode = this.chooseNodeForRequest(command, sessionInfo);
707 return this._executeOnSpecificNode(command, sessionInfo, {
708 chosenNode: currentIndexAndNode.currentNode,
709 nodeIndex: currentIndexAndNode.currentIndex,
710 shouldRetry: false
711 });
712 })
713 .then(() => true);
714 case StatusCode_1.StatusCodes.GatewayTimeout:
715 case StatusCode_1.StatusCodes.RequestTimeout:
716 case StatusCode_1.StatusCodes.BadGateway:
717 case StatusCode_1.StatusCodes.ServiceUnavailable:
718 return this._handleServerDown(url, chosenNode, nodeIndex, command, req, response, yield readBody(), null, sessionInfo, shouldRetry);
719 case StatusCode_1.StatusCodes.Conflict:
720 RequestExecutor._handleConflict(response, yield readBody());
721 break;
722 default:
723 command.onResponseFailure(response);
724 Exceptions_1.ExceptionDispatcher.throwException(response, yield readBody());
725 }
726 });
727 }
728 _executeOnAllToFigureOutTheFastest(chosenNode, command) {
729 let preferredTask = null;
730 const nodes = this._nodeSelector.getTopology().nodes;
731 const tasks = nodes.map(x => null);
732 let task;
733 for (let i = 0; i < nodes.length; i++) {
734 const taskNumber = i;
735 this.numberOfServerRequests++;
736 task = BluebirdPromise.resolve()
737 .then(() => {
738 const req = this._createRequest(nodes[taskNumber], command);
739 return command.send(req)
740 .then(responseAndBodyStream => {
741 return responseAndBodyStream.response;
742 });
743 })
744 .then(commandResult => new IndexAndResponse(taskNumber, commandResult))
745 .catch(err => {
746 tasks[taskNumber] = null;
747 return BluebirdPromise.reject(err);
748 });
749 if (nodes[i].clusterTag === chosenNode.clusterTag) {
750 preferredTask = task;
751 }
752 tasks[i] = task;
753 }
754 const result = PromiseUtil.raceToResolution(tasks)
755 .then(fastest => {
756 this._nodeSelector.recordFastest(fastest.index, nodes[fastest.index]);
757 })
758 .catch((err) => {
759 this._log.warn(err, "Error executing on all to find fastest node.");
760 })
761 .then(() => preferredTask)
762 .then(taskResult => taskResult.response);
763 return Promise.resolve(result);
764 }
765 _shouldExecuteOnAll(chosenNode, command) {
766 return this._readBalanceBehavior === "FastestNode" &&
767 this._nodeSelector &&
768 this._nodeSelector.inSpeedTestPhase() &&
769 this._nodeSelectorHasMultipleNodes() &&
770 command.isReadRequest &&
771 command.responseType === "Object" &&
772 !!chosenNode;
773 }
774 _nodeSelectorHasMultipleNodes() {
775 const selector = this._nodeSelector;
776 if (!selector) {
777 return false;
778 }
779 const topology = selector.getTopology();
780 return topology && topology.nodes && topology.nodes.length > 1;
781 }
782 _handleServerDown(url, chosenNode, nodeIndex, command, req, response, body, error, sessionInfo, shouldRetry) {
783 return __awaiter(this, void 0, void 0, function* () {
784 if (!command.failedNodes) {
785 command.failedNodes = new Map();
786 }
787 RequestExecutor._addFailedResponseToCommand(chosenNode, command, req, response, body, error);
788 if (nodeIndex === null) {
789 return false;
790 }
791 this._spawnHealthChecks(chosenNode, nodeIndex);
792 if (!this._nodeSelector) {
793 return false;
794 }
795 this._nodeSelector.onFailedRequest(nodeIndex);
796 const currentIndexAndNode = this._nodeSelector.getPreferredNode();
797 if (command.failedNodes.has(currentIndexAndNode.currentNode)) {
798 return false;
799 }
800 yield this._executeOnSpecificNode(command, sessionInfo, {
801 chosenNode: currentIndexAndNode.currentNode,
802 nodeIndex: currentIndexAndNode.currentIndex,
803 shouldRetry: shouldRetry
804 });
805 return true;
806 });
807 }
808 static _addFailedResponseToCommand(chosenNode, command, req, response, body, e) {
809 if (response && body) {
810 const responseJson = body;
811 try {
812 const resExceptionSchema = Serializer_1.JsonSerializer
813 .getDefaultForCommandPayload()
814 .deserialize(responseJson);
815 const readException = Exceptions_1.ExceptionDispatcher.get(resExceptionSchema, response.statusCode);
816 command.failedNodes.set(chosenNode, readException);
817 }
818 catch (_) {
819 log.warn(_, "Error parsing server error.");
820 const unrecognizedErrSchema = {
821 url: req.uri,
822 message: "Unrecognized response from the server",
823 error: responseJson,
824 type: "Unparsable Server Response"
825 };
826 const exceptionToUse = Exceptions_1.ExceptionDispatcher.get(unrecognizedErrSchema, response.statusCode);
827 command.failedNodes.set(chosenNode, exceptionToUse);
828 }
829 return;
830 }
831 const exceptionSchema = {
832 url: req.uri.toString(),
833 message: e.message,
834 error: `An exception occurred while contacting ${req.uri} . ${os.EOL + e.stack}`,
835 type: e.name
836 };
837 command.failedNodes.set(chosenNode, Exceptions_1.ExceptionDispatcher.get(exceptionSchema, StatusCode_1.StatusCodes.ServiceUnavailable));
838 }
839 _createRequest(node, command) {
840 const req = Object.assign(command.createRequest(node), this._defaultRequestOptions);
841 req.headers = req.headers || {};
842 if (this._authOptions) {
843 const agentOptions = this._certificate.toAgentOptions();
844 req.agentOptions = Object.assign(req.agentOptions || {}, agentOptions);
845 }
846 if (!req.headers[Constants_1.HEADERS.CLIENT_VERSION]) {
847 req.headers[Constants_1.HEADERS.CLIENT_VERSION] = RequestExecutor.CLIENT_VERSION;
848 }
849 if (RequestExecutor.requestPostProcessor) {
850 RequestExecutor.requestPostProcessor(req);
851 }
852 return req;
853 }
854 static _handleConflict(response, body) {
855 Exceptions_1.ExceptionDispatcher.throwException(response, body);
856 }
857 _spawnHealthChecks(chosenNode, nodeIndex) {
858 if (this._disposed) {
859 return;
860 }
861 if (this._failedNodesTimers.has(chosenNode)) {
862 return;
863 }
864 this._log.info(`Spawn health checks for node ${chosenNode.url}.`);
865 const nodeStatus = new NodeStatus(nodeIndex, chosenNode, (nStatus) => this._checkNodeStatusCallback(nStatus));
866 this._failedNodesTimers.set(chosenNode, nodeStatus);
867 nodeStatus.startTimer();
868 }
869 _checkNodeStatusCallback(nodeStatus) {
870 const copy = this.getTopologyNodes();
871 if (nodeStatus.nodeIndex >= copy.length) {
872 return;
873 }
874 const serverNode = copy[nodeStatus.nodeIndex];
875 if (serverNode !== nodeStatus.node) {
876 return;
877 }
878 return Promise.resolve()
879 .then(() => {
880 let status;
881 return Promise.resolve(this._performHealthCheck(serverNode, nodeStatus.nodeIndex))
882 .then(() => {
883 status = this._failedNodesTimers[nodeStatus.nodeIndex];
884 if (status) {
885 this._failedNodesTimers.delete(nodeStatus.node);
886 status.dispose();
887 }
888 if (this._nodeSelector) {
889 this._nodeSelector.restoreNodeIndex(nodeStatus.nodeIndex);
890 }
891 }, err => {
892 this._log.error(err, `${serverNode.clusterTag} is still down`);
893 status = this._failedNodesTimers.get(nodeStatus.node);
894 if (status) {
895 nodeStatus.updateTimer();
896 }
897 });
898 })
899 .catch(err => {
900 this._log.error(err, "Failed to check node topology, will ignore this node until next topology update.");
901 });
902 }
903 _performHealthCheck(serverNode, nodeIndex) {
904 return this._executeOnSpecificNode(RequestExecutor._failureCheckOperation.getCommand(this._conventions), null, {
905 chosenNode: serverNode,
906 nodeIndex,
907 shouldRetry: false,
908 });
909 }
910 _setDefaultRequestOptions() {
911 this._defaultRequestOptions = Object.assign(DEFAULT_REQUEST_OPTIONS, {
912 gzip: !(this._conventions.hasExplicitlySetCompressionUsage && !this._conventions.useCompression)
913 }, this._customHttpRequestOptions);
914 }
915 dispose() {
916 this._log.info("Dispose.");
917 if (this._disposed) {
918 return;
919 }
920 this._disposed = true;
921 this._updateClientConfigurationSemaphore.take(TypeUtil_1.TypeUtil.NOOP);
922 this._updateDatabaseTopologySemaphore.take(TypeUtil_1.TypeUtil.NOOP);
923 this._cache.dispose();
924 if (this._updateTopologyTimer) {
925 this._updateTopologyTimer.dispose();
926 }
927 this._disposeAllFailedNodesTimers();
928 }
929}
930RequestExecutor.CLIENT_VERSION = "4.1.0";
931RequestExecutor._failureCheckOperation = new GetStatisticsOperation_1.GetStatisticsOperation("failure=check");
932RequestExecutor.requestPostProcessor = null;
933exports.RequestExecutor = RequestExecutor;