1 | "use strict";
|
2 | var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
|
3 | return new (P || (P = Promise))(function (resolve, reject) {
|
4 | function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
|
5 | function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
|
6 | function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
|
7 | step((generator = generator.apply(thisArg, _arguments || [])).next());
|
8 | });
|
9 | };
|
10 | Object.defineProperty(exports, "__esModule", { value: true });
|
11 | const os = require("os");
|
12 | const BluebirdPromise = require("bluebird");
|
13 | const semaphore = require("semaphore");
|
14 | const SemaphoreUtil_1 = require("../Utility/SemaphoreUtil");
|
15 | const LogUtil_1 = require("../Utility/LogUtil");
|
16 | const Timer_1 = require("../Primitives/Timer");
|
17 | const ServerNode_1 = require("./ServerNode");
|
18 | const Topology_1 = require("./Topology");
|
19 | const GetDatabaseTopologyCommand_1 = require("../ServerWide/Commands/GetDatabaseTopologyCommand");
|
20 | const StatusCode_1 = require("./StatusCode");
|
21 | const NodeSelector_1 = require("./NodeSelector");
|
22 | const Certificate_1 = require("../Auth/Certificate");
|
23 | const HttpCache_1 = require("./HttpCache");
|
24 | const Exceptions_1 = require("../Exceptions");
|
25 | const GetClientConfigurationOperation_1 = require("../Documents/Operations/Configuration/GetClientConfigurationOperation");
|
26 | const Constants_1 = require("../Constants");
|
27 | const Stopwatch_1 = require("../Utility/Stopwatch");
|
28 | const PromiseUtil = require("../Utility/PromiseUtil");
|
29 | const GetStatisticsOperation_1 = require("../Documents/Operations/GetStatisticsOperation");
|
30 | const TypeUtil_1 = require("../Utility/TypeUtil");
|
31 | const Serializer_1 = require("../Mapping/Json/Serializer");
|
32 | const UriUtil_1 = require("../Utility/UriUtil");
|
33 | const StreamUtil = require("../Utility/StreamUtil");
|
34 | const HttpUtil_1 = require("../Utility/HttpUtil");
|
35 | const PromiseUtil_1 = require("../Utility/PromiseUtil");
|
36 | const DEFAULT_REQUEST_OPTIONS = {};
|
37 | const log = LogUtil_1.getLogger({ module: "RequestExecutor" });
|
38 | class IndexAndResponse {
|
39 | constructor(index, response) {
|
40 | this.index = index;
|
41 | this.response = response;
|
42 | }
|
43 | }
|
44 | class NodeStatus {
|
45 | constructor(nodeIndex, node, nodeStatusCallback) {
|
46 | this.nodeIndex = nodeIndex;
|
47 | this.node = node;
|
48 | this._timerPeriodInMs = 100;
|
49 | this._nodeStatusCallback = nodeStatusCallback;
|
50 | }
|
51 | _nextTimerPeriod() {
|
52 | if (this._timerPeriodInMs <= 5000) {
|
53 | return 5000;
|
54 | }
|
55 | this._timerPeriodInMs = this._timerPeriodInMs + 100;
|
56 | return this._timerPeriodInMs;
|
57 | }
|
58 | startTimer() {
|
59 | const that = this;
|
60 | this._timer = new Timer_1.Timer(function timerActionNodeStatusCallback() {
|
61 | return that._nodeStatusCallback(that);
|
62 | }, this._timerPeriodInMs);
|
63 | }
|
64 | updateTimer() {
|
65 | this._timer.change(this._nextTimerPeriod());
|
66 | }
|
67 | dispose() {
|
68 | this._timer.dispose();
|
69 | }
|
70 | }
|
71 | exports.NodeStatus = NodeStatus;
|
72 | class RequestExecutor {
|
73 | constructor(database, authOptions, conventions) {
|
74 | this._updateDatabaseTopologySemaphore = semaphore();
|
75 | this._updateClientConfigurationSemaphore = semaphore();
|
76 | this._failedNodesTimers = new Map();
|
77 | this._certificate = null;
|
78 | this.aggressiveCaching = null;
|
79 | this.numberOfServerRequests = 0;
|
80 | this._clientConfigurationEtag = 0;
|
81 | this._topologyEtag = 0;
|
82 | this._log = LogUtil_1.getLogger({
|
83 | module: `${this.constructor.name}-${Math.floor(Math.random() * 10000)}`
|
84 | });
|
85 | this._cache = new HttpCache_1.HttpCache(conventions.maxHttpCacheSize);
|
86 | this._readBalanceBehavior = conventions.readBalanceBehavior;
|
87 | this._databaseName = database;
|
88 | this._lastReturnedResponse = new Date();
|
89 | this._conventions = conventions.clone();
|
90 | this._authOptions = authOptions;
|
91 | this._certificate = Certificate_1.Certificate.createFromOptions(this._authOptions);
|
92 | this._setDefaultRequestOptions();
|
93 | }
|
94 | get _firstTopologyUpdatePromise() {
|
95 | return this._firstTopologyUpdatePromiseInternal;
|
96 | }
|
97 | set _firstTopologyUpdatePromise(value) {
|
98 | this._firstTopologyUpdatePromiseInternal = value;
|
99 | if (value) {
|
100 | this._firstTopologyUpdateStatus = PromiseUtil_1.PromiseStatusTracker.track(value);
|
101 | }
|
102 | }
|
103 | get customHttpRequestOptions() {
|
104 | return this._customHttpRequestOptions;
|
105 | }
|
106 | set customHttpRequestOptions(value) {
|
107 | this._customHttpRequestOptions = value;
|
108 | this._setDefaultRequestOptions();
|
109 | }
|
110 | getAuthOptions() {
|
111 | return this._authOptions;
|
112 | }
|
113 | getTopologyEtag() {
|
114 | return this._topologyEtag;
|
115 | }
|
116 | get conventions() {
|
117 | return this._conventions;
|
118 | }
|
119 | getClientConfigurationEtag() {
|
120 | return this._clientConfigurationEtag;
|
121 | }
|
122 | get cache() {
|
123 | return this._cache;
|
124 | }
|
125 | get disposed() {
|
126 | return this._disposed;
|
127 | }
|
128 | getUrl() {
|
129 | if (!this._nodeSelector) {
|
130 | return null;
|
131 | }
|
132 | const preferredNode = this._nodeSelector.getPreferredNode();
|
133 | return preferredNode
|
134 | ? preferredNode.currentNode.url
|
135 | : null;
|
136 | }
|
137 | getTopology() {
|
138 | return this._nodeSelector
|
139 | ? this._nodeSelector.getTopology()
|
140 | : null;
|
141 | }
|
142 | getTopologyNodes() {
|
143 | const topology = this.getTopology();
|
144 | return topology
|
145 | ? [...topology.nodes]
|
146 | : null;
|
147 | }
|
148 | static create(initialUrls, database, opts) {
|
149 | const { authOptions, documentConventions } = opts || {};
|
150 | const executor = new RequestExecutor(database, authOptions, documentConventions);
|
151 | executor._firstTopologyUpdatePromise = executor._firstTopologyUpdate(initialUrls);
|
152 | executor._firstTopologyUpdatePromise.catch(TypeUtil_1.TypeUtil.NOOP);
|
153 | return executor;
|
154 | }
|
155 | static createForSingleNodeWithConfigurationUpdates(url, database, opts) {
|
156 | const executor = this.createForSingleNodeWithoutConfigurationUpdates(url, database, opts);
|
157 | executor._disableClientConfigurationUpdates = false;
|
158 | return executor;
|
159 | }
|
160 | static createForSingleNodeWithoutConfigurationUpdates(url, database, opts) {
|
161 | const { authOptions, documentConventions } = opts;
|
162 | const initialUrls = RequestExecutor._validateUrls([url], authOptions);
|
163 | const executor = new RequestExecutor(database, authOptions, documentConventions);
|
164 | const topology = new Topology_1.Topology();
|
165 | topology.etag = -1;
|
166 | const serverNode = new ServerNode_1.ServerNode({
|
167 | url: initialUrls[0],
|
168 | database
|
169 | });
|
170 | topology.nodes = [serverNode];
|
171 | executor._nodeSelector = new NodeSelector_1.NodeSelector(topology);
|
172 | executor._topologyEtag = -2;
|
173 | executor._disableTopologyUpdates = true;
|
174 | executor._disableClientConfigurationUpdates = true;
|
175 | return executor;
|
176 | }
|
177 | _ensureNodeSelector() {
|
178 | return __awaiter(this, void 0, void 0, function* () {
|
179 | if (this._firstTopologyUpdatePromise
|
180 | && (!this._firstTopologyUpdateStatus.isFullfilled()
|
181 | || this._firstTopologyUpdateStatus.isRejected())) {
|
182 | yield this._firstTopologyUpdatePromise;
|
183 | }
|
184 | if (!this._nodeSelector) {
|
185 | const topology = new Topology_1.Topology(this._topologyEtag, this.getTopologyNodes().slice());
|
186 | this._nodeSelector = new NodeSelector_1.NodeSelector(topology);
|
187 | }
|
188 | });
|
189 | }
|
190 | getPreferredNode() {
|
191 | return __awaiter(this, void 0, void 0, function* () {
|
192 | yield this._ensureNodeSelector();
|
193 | return this._nodeSelector.getPreferredNode();
|
194 | });
|
195 | }
|
196 | getNodeBySessionId(sessionId) {
|
197 | return __awaiter(this, void 0, void 0, function* () {
|
198 | yield this._ensureNodeSelector();
|
199 | return this._nodeSelector.getNodeBySessionId(sessionId);
|
200 | });
|
201 | }
|
202 | getFastestNode() {
|
203 | return __awaiter(this, void 0, void 0, function* () {
|
204 | yield this._ensureNodeSelector();
|
205 | return this._nodeSelector.getFastestNode();
|
206 | });
|
207 | }
|
208 | _updateClientConfigurationInternal() {
|
209 | return __awaiter(this, void 0, void 0, function* () {
|
210 | const oldDisableClientConfigurationUpdates = this._disableClientConfigurationUpdates;
|
211 | this._disableClientConfigurationUpdates = true;
|
212 | try {
|
213 | if (this._disposed) {
|
214 | return;
|
215 | }
|
216 | const command = new GetClientConfigurationOperation_1.GetClientConfigurationCommand();
|
217 | const { currentNode, currentIndex } = this.chooseNodeForRequest(command, null);
|
218 | yield this.execute(command, null, {
|
219 | chosenNode: currentNode,
|
220 | nodeIndex: currentIndex,
|
221 | shouldRetry: false
|
222 | });
|
223 | const clientConfigOpResult = command.result;
|
224 | if (!clientConfigOpResult) {
|
225 | return;
|
226 | }
|
227 | this._conventions.updateFrom(clientConfigOpResult.configuration);
|
228 | this._clientConfigurationEtag = clientConfigOpResult.etag;
|
229 | }
|
230 | catch (err) {
|
231 | this._log.error(err, "Error getting client configuration.");
|
232 | }
|
233 | finally {
|
234 | this._disableClientConfigurationUpdates = oldDisableClientConfigurationUpdates;
|
235 | }
|
236 | });
|
237 | }
|
238 | _updateClientConfiguration() {
|
239 | return __awaiter(this, void 0, void 0, function* () {
|
240 | if (this._disposed) {
|
241 | return;
|
242 | }
|
243 | let semAcquiredContext;
|
244 | try {
|
245 | semAcquiredContext = SemaphoreUtil_1.acquireSemaphore(this._updateClientConfigurationSemaphore);
|
246 | yield semAcquiredContext.promise;
|
247 | yield this._updateClientConfigurationInternal();
|
248 | }
|
249 | finally {
|
250 | if (semAcquiredContext) {
|
251 | semAcquiredContext.dispose();
|
252 | }
|
253 | }
|
254 | });
|
255 | }
|
256 | updateTopology(node, timeout, forceUpdate = false) {
|
257 | if (this._disposed) {
|
258 | return Promise.resolve(false);
|
259 | }
|
260 | if (this._disableTopologyUpdates) {
|
261 | return Promise.resolve(false);
|
262 | }
|
263 | const acquiredSemContext = SemaphoreUtil_1.acquireSemaphore(this._updateDatabaseTopologySemaphore, { timeout });
|
264 | const result = BluebirdPromise.resolve(acquiredSemContext.promise)
|
265 | .then(() => {
|
266 | if (this._disposed) {
|
267 | return false;
|
268 | }
|
269 | this._log.info(`Update topology from ${node.url}.`);
|
270 | const getTopology = new GetDatabaseTopologyCommand_1.GetDatabaseTopologyCommand();
|
271 | const getTopologyPromise = this.execute(getTopology, null, {
|
272 | chosenNode: node,
|
273 | nodeIndex: null,
|
274 | shouldRetry: false,
|
275 | });
|
276 | return getTopologyPromise
|
277 | .then(() => {
|
278 | const topology = getTopology.result;
|
279 | if (!this._nodeSelector) {
|
280 | this._nodeSelector = new NodeSelector_1.NodeSelector(topology);
|
281 | if (this._readBalanceBehavior === "FastestNode") {
|
282 | this._nodeSelector.scheduleSpeedTest();
|
283 | }
|
284 | }
|
285 | else if (this._nodeSelector.onUpdateTopology(topology, forceUpdate)) {
|
286 | this._disposeAllFailedNodesTimers();
|
287 | if (this._readBalanceBehavior === "FastestNode") {
|
288 | this._nodeSelector.scheduleSpeedTest();
|
289 | }
|
290 | }
|
291 | this._topologyEtag = this._nodeSelector.getTopology().etag;
|
292 | return true;
|
293 | });
|
294 | }, (reason) => {
|
295 | if (reason.name === "TimeoutError") {
|
296 | return false;
|
297 | }
|
298 | throw reason;
|
299 | })
|
300 | .finally(() => {
|
301 | acquiredSemContext.dispose();
|
302 | });
|
303 | return Promise.resolve(result);
|
304 | }
|
305 | static _validateUrls(initialUrls, authOptions) {
|
306 | const cleanUrls = [...Array(initialUrls.length)];
|
307 | let requireHttps = !!authOptions;
|
308 | for (let index = 0; index < initialUrls.length; index++) {
|
309 | const url = initialUrls[index];
|
310 | UriUtil_1.validateUri(url);
|
311 | cleanUrls[index] = url.replace(/\/$/, "");
|
312 | requireHttps = requireHttps || url.startsWith("https://");
|
313 | }
|
314 | if (!requireHttps) {
|
315 | return cleanUrls;
|
316 | }
|
317 | for (const url of initialUrls) {
|
318 | if (!url.startsWith("http://")) {
|
319 | continue;
|
320 | }
|
321 | if (authOptions && authOptions.certificate) {
|
322 | Exceptions_1.throwError("InvalidOperationException", "The url " + url + " is using HTTP, but a certificate is specified, which require us to use HTTPS");
|
323 | }
|
324 | Exceptions_1.throwError("InvalidOperationException", "The url " + url
|
325 | + " is using HTTP, but other urls are using HTTPS, and mixing of HTTP and HTTPS is not allowed.");
|
326 | }
|
327 | return cleanUrls;
|
328 | }
|
329 | _initializeUpdateTopologyTimer() {
|
330 | if (this._updateTopologyTimer || this._disposed) {
|
331 | return;
|
332 | }
|
333 | this._log.info("Initialize update topology timer.");
|
334 | const minInMs = 60 * 1000;
|
335 | const that = this;
|
336 | this._updateTopologyTimer =
|
337 | new Timer_1.Timer(function timerActionUpdateTopology() {
|
338 | return that._updateTopologyCallback();
|
339 | }, minInMs, minInMs);
|
340 | }
|
341 | _updateTopologyCallback() {
|
342 | const time = new Date();
|
343 | const minInMs = 60 * 1000;
|
344 | if (time.valueOf() - this._lastReturnedResponse.valueOf() <= minInMs) {
|
345 | return;
|
346 | }
|
347 | let serverNode;
|
348 | try {
|
349 | const selector = this._nodeSelector;
|
350 | if (!selector) {
|
351 | return;
|
352 | }
|
353 | const preferredNode = selector.getPreferredNode();
|
354 | serverNode = preferredNode.currentNode;
|
355 | }
|
356 | catch (err) {
|
357 | this._log.warn(err, "Couldn't get preferred node Topology from _updateTopologyTimer");
|
358 | return;
|
359 | }
|
360 | return this.updateTopology(serverNode, 0)
|
361 | .catch(err => {
|
362 | this._log.error(err, "Couldn't update topology from _updateTopologyTimer");
|
363 | return null;
|
364 | });
|
365 | }
|
366 | _firstTopologyUpdate(inputUrls) {
|
367 | return __awaiter(this, void 0, void 0, function* () {
|
368 | const initialUrls = RequestExecutor._validateUrls(inputUrls, this._authOptions);
|
369 | const topologyUpdateErrors = [];
|
370 | const tryUpdateTopology = (url, database) => __awaiter(this, void 0, void 0, function* () {
|
371 | const serverNode = new ServerNode_1.ServerNode({ url, database });
|
372 | try {
|
373 | yield this.updateTopology(serverNode, TypeUtil_1.TypeUtil.MAX_INT32);
|
374 | this._initializeUpdateTopologyTimer();
|
375 | this._topologyTakenFromNode = serverNode;
|
376 | return true;
|
377 | }
|
378 | catch (error) {
|
379 | if (error.name === "DatabaseDoesNotExistException") {
|
380 | this._lastKnownUrls = initialUrls;
|
381 | throw error;
|
382 | }
|
383 | if (initialUrls.length === 0) {
|
384 | this._lastKnownUrls = initialUrls;
|
385 | Exceptions_1.throwError("InvalidOperationException", `Cannot get topology from server: ${url}.`, error);
|
386 | }
|
387 | topologyUpdateErrors.push({ url, error });
|
388 | return false;
|
389 | }
|
390 | });
|
391 | const tryUpdateTopologyOnAllNodes = () => __awaiter(this, void 0, void 0, function* () {
|
392 | for (const url of initialUrls) {
|
393 | if (yield tryUpdateTopology(url, this._databaseName)) {
|
394 | return;
|
395 | }
|
396 | }
|
397 | return false;
|
398 | });
|
399 | yield tryUpdateTopologyOnAllNodes();
|
400 | const topology = new Topology_1.Topology();
|
401 | topology.etag = this._topologyEtag;
|
402 | let topologyNodes = this.getTopologyNodes();
|
403 | if (!topologyNodes) {
|
404 | topologyNodes = initialUrls.map(url => {
|
405 | const serverNode = new ServerNode_1.ServerNode({
|
406 | url,
|
407 | database: this._databaseName
|
408 | });
|
409 | serverNode.clusterTag = "!";
|
410 | return serverNode;
|
411 | });
|
412 | }
|
413 | topology.nodes = topologyNodes;
|
414 | this._nodeSelector = new NodeSelector_1.NodeSelector(topology);
|
415 | if (initialUrls && initialUrls.length > 0) {
|
416 | this._initializeUpdateTopologyTimer();
|
417 | return;
|
418 | }
|
419 | this._lastKnownUrls = initialUrls;
|
420 | const details = topologyUpdateErrors
|
421 | .map(x => `${x.url} -> ${x.error && x.error.stack ? x.error.stack : x.error}`)
|
422 | .join(", ");
|
423 | this._throwExceptions(details);
|
424 | });
|
425 | }
|
426 | _throwExceptions(details) {
|
427 | Exceptions_1.throwError("InvalidOperationException", "Failed to retrieve database topology from all known nodes"
|
428 | + os.EOL + details);
|
429 | }
|
430 | _disposeAllFailedNodesTimers() {
|
431 | for (const item of this._failedNodesTimers) {
|
432 | item[1].dispose();
|
433 | }
|
434 | this._failedNodesTimers.clear();
|
435 | }
|
436 | chooseNodeForRequest(cmd, sessionInfo) {
|
437 | if (!cmd.isReadRequest) {
|
438 | return this._nodeSelector.getPreferredNode();
|
439 | }
|
440 | switch (this._readBalanceBehavior) {
|
441 | case "None":
|
442 | return this._nodeSelector.getPreferredNode();
|
443 | case "RoundRobin":
|
444 | return this._nodeSelector.getNodeBySessionId(sessionInfo ? sessionInfo.sessionId : 0);
|
445 | case "FastestNode":
|
446 | return this._nodeSelector.getFastestNode();
|
447 | default:
|
448 | Exceptions_1.throwError("NotSupportedException", `Invalid read balance behavior: ${this._readBalanceBehavior}`);
|
449 | }
|
450 | }
|
451 | execute(command, sessionInfo, options) {
|
452 | if (options) {
|
453 | return this._executeOnSpecificNode(command, sessionInfo, options);
|
454 | }
|
455 | this._log.info(`Execute command ${command.constructor.name}`);
|
456 | const topologyUpdate = this._firstTopologyUpdatePromise;
|
457 | const topologyUpdateStatus = this._firstTopologyUpdateStatus;
|
458 | if ((topologyUpdate && topologyUpdateStatus.isResolved()) || this._disableTopologyUpdates) {
|
459 | const currentIndexAndNode = this.chooseNodeForRequest(command, sessionInfo);
|
460 | return this._executeOnSpecificNode(command, sessionInfo, {
|
461 | chosenNode: currentIndexAndNode.currentNode,
|
462 | nodeIndex: currentIndexAndNode.currentIndex,
|
463 | shouldRetry: true
|
464 | });
|
465 | }
|
466 | else {
|
467 | return this._unlikelyExecute(command, topologyUpdate, sessionInfo);
|
468 | }
|
469 | }
|
470 | _unlikelyExecute(command, topologyUpdate, sessionInfo) {
|
471 | return __awaiter(this, void 0, void 0, function* () {
|
472 | try {
|
473 | if (!this._firstTopologyUpdatePromise) {
|
474 | if (!this._lastKnownUrls) {
|
475 | Exceptions_1.throwError("InvalidOperationException", "No known topology and no previously known one, cannot proceed, likely a bug");
|
476 | }
|
477 | topologyUpdate = this._firstTopologyUpdate(this._lastKnownUrls);
|
478 | }
|
479 | yield topologyUpdate;
|
480 | }
|
481 | catch (reason) {
|
482 | if (this._firstTopologyUpdatePromise === topologyUpdate) {
|
483 | this._firstTopologyUpdatePromise = null;
|
484 | }
|
485 | this._log.warn(reason, "Error doing topology update.");
|
486 | throw reason;
|
487 | }
|
488 | const currentIndexAndNode = this.chooseNodeForRequest(command, sessionInfo);
|
489 | return this._executeOnSpecificNode(command, sessionInfo, {
|
490 | chosenNode: currentIndexAndNode.currentNode,
|
491 | nodeIndex: currentIndexAndNode.currentIndex,
|
492 | shouldRetry: true
|
493 | });
|
494 | });
|
495 | }
|
496 | _getFromCache(command, useCache, url, cachedItemMetadataCallback) {
|
497 | if (useCache
|
498 | && command.canCache
|
499 | && command.isReadRequest
|
500 | && command.responseType === "Object") {
|
501 | return this._cache.get(url, cachedItemMetadataCallback);
|
502 | }
|
503 | cachedItemMetadataCallback({
|
504 | changeVector: null,
|
505 | response: null
|
506 | });
|
507 | return new HttpCache_1.ReleaseCacheItem(null);
|
508 | }
|
509 | _executeOnSpecificNode(command, sessionInfo = null, options = null) {
|
510 | return __awaiter(this, void 0, void 0, function* () {
|
511 | const { chosenNode, nodeIndex, shouldRetry } = options;
|
512 | this._log.info(`Actual execute ${command.constructor.name} on ${chosenNode.url}`
|
513 | + ` ${shouldRetry ? "with" : "without"} retry.`);
|
514 | const req = this._createRequest(chosenNode, command);
|
515 | const noCaching = sessionInfo ? sessionInfo.noCaching : false;
|
516 | let cachedChangeVector;
|
517 | let cachedValue;
|
518 | const cachedItem = this._getFromCache(command, !noCaching, req.uri.toString(), (cachedItemMetadata) => {
|
519 | cachedChangeVector = cachedItemMetadata.changeVector;
|
520 | cachedValue = cachedItemMetadata.response;
|
521 | });
|
522 | if (cachedChangeVector) {
|
523 | const aggressiveCacheOptions = this.aggressiveCaching;
|
524 | if (aggressiveCacheOptions
|
525 | && cachedItem.age < aggressiveCacheOptions.duration
|
526 | && !cachedItem.mightHaveBeenModified
|
527 | && command.canCacheAggressively) {
|
528 | return command.setResponseFromCache(cachedValue);
|
529 | }
|
530 | req.headers["If-None-Match"] = `"${cachedChangeVector}"`;
|
531 | }
|
532 | if (!this._disableClientConfigurationUpdates) {
|
533 | req.headers[Constants_1.HEADERS.CLIENT_CONFIGURATION_ETAG] = `"${this._clientConfigurationEtag}"`;
|
534 | }
|
535 | if (sessionInfo && sessionInfo.lastClusterTransactionIndex) {
|
536 | req.headers[Constants_1.HEADERS.LAST_KNOWN_CLUSTER_TRANSACTION_INDEX] =
|
537 | sessionInfo.lastClusterTransactionIndex;
|
538 | }
|
539 | if (!this._disableTopologyUpdates) {
|
540 | req.headers[Constants_1.HEADERS.TOPOLOGY_ETAG] = `"${this._topologyEtag}"`;
|
541 | }
|
542 | const sp = Stopwatch_1.Stopwatch.createStarted();
|
543 | let response = null;
|
544 | let responseDispose = "Automatic";
|
545 | let bodyStream;
|
546 | this.numberOfServerRequests++;
|
547 | try {
|
548 | if (this._shouldExecuteOnAll(chosenNode, command)) {
|
549 | response = yield this._executeOnAllToFigureOutTheFastest(chosenNode, command);
|
550 | }
|
551 | else {
|
552 | const responseAndBody = yield command.send(req);
|
553 | bodyStream = responseAndBody.bodyStream;
|
554 | response = responseAndBody.response;
|
555 | }
|
556 | if (sessionInfo && sessionInfo.lastClusterTransactionIndex) {
|
557 | const version = response.caseless.get(Constants_1.HEADERS.SERVER_VERSION);
|
558 | if (version && "4.1" === version) {
|
559 | Exceptions_1.throwError("ClientVersionMismatchException", "The server on " + chosenNode.url + " has an old version and can't perform "
|
560 | + "the command since this command dependent on a cluster transaction "
|
561 | + " which this node doesn't support.");
|
562 | }
|
563 | }
|
564 | sp.stop();
|
565 | }
|
566 | catch (error) {
|
567 | this._log.warn(error, `Error executing '${command.constructor.name}' `
|
568 | + `on specific node '${chosenNode.url}'`
|
569 | + `${chosenNode.database ? "db " + chosenNode.database : ""}.`);
|
570 | if (!shouldRetry) {
|
571 | throw error;
|
572 | }
|
573 | sp.stop();
|
574 | const serverDownHandledSuccessfully = yield this._handleServerDown(req.uri, chosenNode, nodeIndex, command, req, response, null, error, sessionInfo, shouldRetry);
|
575 | if (!serverDownHandledSuccessfully) {
|
576 | this._throwFailedToContactAllNodes(command, req, error, null);
|
577 | }
|
578 | return;
|
579 | }
|
580 | command.statusCode = response.statusCode;
|
581 | const refreshTopology = response
|
582 | && response.caseless
|
583 | && response.caseless.get(Constants_1.HEADERS.REFRESH_TOPOLOGY);
|
584 | const refreshClientConfiguration = response
|
585 | && response.caseless
|
586 | && response.caseless.get(Constants_1.HEADERS.REFRESH_CLIENT_CONFIGURATION);
|
587 | try {
|
588 | if (response.statusCode === StatusCode_1.StatusCodes.NotModified) {
|
589 | cachedItem.notModified();
|
590 | if (command.responseType === "Object") {
|
591 | yield command.setResponseFromCache(cachedValue);
|
592 | }
|
593 | return;
|
594 | }
|
595 | if (response.statusCode >= 400) {
|
596 | const unsuccessfulResponseHandled = yield this._handleUnsuccessfulResponse(chosenNode, nodeIndex, command, req, response, bodyStream, req.uri, sessionInfo, shouldRetry);
|
597 | if (!unsuccessfulResponseHandled) {
|
598 | const dbMissingHeader = response.caseless.get(Constants_1.HEADERS.DATABASE_MISSING);
|
599 | if (dbMissingHeader) {
|
600 | Exceptions_1.throwError("DatabaseDoesNotExistException", dbMissingHeader);
|
601 | }
|
602 | if (command.failedNodes.size === 0) {
|
603 | Exceptions_1.throwError("InvalidOperationException", "Received unsuccessful response and couldn't recover from it. "
|
604 | + "Also, no record of exceptions per failed nodes. "
|
605 | + "This is weird and should not happen.");
|
606 | }
|
607 | if (command.failedNodes.size === 1) {
|
608 | const values = [...command.failedNodes.values()];
|
609 | if (values && values.some(x => !!x)) {
|
610 | const err = values.filter(x => !!x).map(x => x)[0];
|
611 | Exceptions_1.throwError(err.name, err.message, err);
|
612 | }
|
613 | }
|
614 | Exceptions_1.throwError("AllTopologyNodesDownException", "Received unsuccessful response from all servers"
|
615 | + " and couldn't recover from it.");
|
616 | }
|
617 | return;
|
618 | }
|
619 | responseDispose = yield command.processResponse(this._cache, response, bodyStream, req.uri);
|
620 | this._lastReturnedResponse = new Date();
|
621 | }
|
622 | finally {
|
623 | if (responseDispose === "Automatic") {
|
624 | HttpUtil_1.closeHttpResponse(response);
|
625 | }
|
626 | if (refreshTopology || refreshClientConfiguration) {
|
627 | const serverNode = new ServerNode_1.ServerNode({
|
628 | url: chosenNode.url,
|
629 | database: this._databaseName
|
630 | });
|
631 | const topologyTask = refreshTopology
|
632 | ? BluebirdPromise.resolve(this.updateTopology(serverNode, 0))
|
633 | .tapCatch(err => this._log.warn(err, "Error refreshing topology."))
|
634 | : BluebirdPromise.resolve(false);
|
635 | const clientConfigurationTask = refreshClientConfiguration
|
636 | ? BluebirdPromise.resolve(this._updateClientConfiguration())
|
637 | .tapCatch(err => this._log.warn(err, "Error refreshing client configuration."))
|
638 | .then(() => true)
|
639 | : BluebirdPromise.resolve(false);
|
640 | yield Promise.all([topologyTask, clientConfigurationTask]);
|
641 | }
|
642 | }
|
643 | });
|
644 | }
|
645 | _throwFailedToContactAllNodes(command, req, e, timeoutException) {
|
646 | let message = "Tried to send "
|
647 | + command.constructor.name
|
648 | + " request via "
|
649 | + (req.method || "GET") + " "
|
650 | + req.uri + " to all configured nodes in the topology, "
|
651 | + "all of them seem to be down or not responding. I've tried to access the following nodes: ";
|
652 | if (this._nodeSelector) {
|
653 | const topology = this._nodeSelector.getTopology();
|
654 | if (topology) {
|
655 | message += topology.nodes.map(x => x.url).join(", ");
|
656 | }
|
657 | }
|
658 | const tplFromNode = this._topologyTakenFromNode;
|
659 | if (tplFromNode && this._nodeSelector) {
|
660 | const topology = this._nodeSelector.getTopology();
|
661 | if (topology) {
|
662 | const nodesText = topology.nodes
|
663 | .map(x => `( url: ${x.url}, clusterTag: ${x.clusterTag}, serverRole: ${x.serverRole})`)
|
664 | .join(", ");
|
665 | message += os.EOL
|
666 | + `I was able to fetch ${tplFromNode.database} topology from ${tplFromNode.url}.`
|
667 | + os.EOL
|
668 | + `Fetched topology: ${nodesText}`;
|
669 | }
|
670 | }
|
671 | const innerErr = timeoutException || e;
|
672 | Exceptions_1.throwError("AllTopologyNodesDownException", message, innerErr);
|
673 | }
|
674 | inSpeedTestPhase() {
|
675 | return this._nodeSelector
|
676 | && this._nodeSelector.inSpeedTestPhase();
|
677 | }
|
678 | _handleUnsuccessfulResponse(chosenNode, nodeIndex, command, req, response, responseBodyStream, url, sessionInfo, shouldRetry) {
|
679 | return __awaiter(this, void 0, void 0, function* () {
|
680 | responseBodyStream.resume();
|
681 | const readBody = () => StreamUtil.readToEnd(responseBodyStream);
|
682 | switch (response.statusCode) {
|
683 | case StatusCode_1.StatusCodes.NotFound:
|
684 | this._cache.setNotFound(url);
|
685 | switch (command.responseType) {
|
686 | case "Empty":
|
687 | return Promise.resolve(true);
|
688 | case "Object":
|
689 | return command.setResponseAsync(null, false)
|
690 | .then(() => true);
|
691 | default:
|
692 | command.setResponseRaw(response, null);
|
693 | break;
|
694 | }
|
695 | return true;
|
696 | case StatusCode_1.StatusCodes.Forbidden:
|
697 | Exceptions_1.throwError("AuthorizationException", `Forbidden access to ${chosenNode.database}@${chosenNode.url}`
|
698 | + `, ${req.method || "GET"} ${req.uri}`);
|
699 | break;
|
700 | case StatusCode_1.StatusCodes.Gone:
|
701 | if (!shouldRetry) {
|
702 | return false;
|
703 | }
|
704 | return this.updateTopology(chosenNode, Number.MAX_VALUE, true)
|
705 | .then(() => {
|
706 | const currentIndexAndNode = this.chooseNodeForRequest(command, sessionInfo);
|
707 | return this._executeOnSpecificNode(command, sessionInfo, {
|
708 | chosenNode: currentIndexAndNode.currentNode,
|
709 | nodeIndex: currentIndexAndNode.currentIndex,
|
710 | shouldRetry: false
|
711 | });
|
712 | })
|
713 | .then(() => true);
|
714 | case StatusCode_1.StatusCodes.GatewayTimeout:
|
715 | case StatusCode_1.StatusCodes.RequestTimeout:
|
716 | case StatusCode_1.StatusCodes.BadGateway:
|
717 | case StatusCode_1.StatusCodes.ServiceUnavailable:
|
718 | return this._handleServerDown(url, chosenNode, nodeIndex, command, req, response, yield readBody(), null, sessionInfo, shouldRetry);
|
719 | case StatusCode_1.StatusCodes.Conflict:
|
720 | RequestExecutor._handleConflict(response, yield readBody());
|
721 | break;
|
722 | default:
|
723 | command.onResponseFailure(response);
|
724 | Exceptions_1.ExceptionDispatcher.throwException(response, yield readBody());
|
725 | }
|
726 | });
|
727 | }
|
728 | _executeOnAllToFigureOutTheFastest(chosenNode, command) {
|
729 | let preferredTask = null;
|
730 | const nodes = this._nodeSelector.getTopology().nodes;
|
731 | const tasks = nodes.map(x => null);
|
732 | let task;
|
733 | for (let i = 0; i < nodes.length; i++) {
|
734 | const taskNumber = i;
|
735 | this.numberOfServerRequests++;
|
736 | task = BluebirdPromise.resolve()
|
737 | .then(() => {
|
738 | const req = this._createRequest(nodes[taskNumber], command);
|
739 | return command.send(req)
|
740 | .then(responseAndBodyStream => {
|
741 | return responseAndBodyStream.response;
|
742 | });
|
743 | })
|
744 | .then(commandResult => new IndexAndResponse(taskNumber, commandResult))
|
745 | .catch(err => {
|
746 | tasks[taskNumber] = null;
|
747 | return BluebirdPromise.reject(err);
|
748 | });
|
749 | if (nodes[i].clusterTag === chosenNode.clusterTag) {
|
750 | preferredTask = task;
|
751 | }
|
752 | tasks[i] = task;
|
753 | }
|
754 | const result = PromiseUtil.raceToResolution(tasks)
|
755 | .then(fastest => {
|
756 | this._nodeSelector.recordFastest(fastest.index, nodes[fastest.index]);
|
757 | })
|
758 | .catch((err) => {
|
759 | this._log.warn(err, "Error executing on all to find fastest node.");
|
760 | })
|
761 | .then(() => preferredTask)
|
762 | .then(taskResult => taskResult.response);
|
763 | return Promise.resolve(result);
|
764 | }
|
765 | _shouldExecuteOnAll(chosenNode, command) {
|
766 | return this._readBalanceBehavior === "FastestNode" &&
|
767 | this._nodeSelector &&
|
768 | this._nodeSelector.inSpeedTestPhase() &&
|
769 | this._nodeSelectorHasMultipleNodes() &&
|
770 | command.isReadRequest &&
|
771 | command.responseType === "Object" &&
|
772 | !!chosenNode;
|
773 | }
|
774 | _nodeSelectorHasMultipleNodes() {
|
775 | const selector = this._nodeSelector;
|
776 | if (!selector) {
|
777 | return false;
|
778 | }
|
779 | const topology = selector.getTopology();
|
780 | return topology && topology.nodes && topology.nodes.length > 1;
|
781 | }
|
782 | _handleServerDown(url, chosenNode, nodeIndex, command, req, response, body, error, sessionInfo, shouldRetry) {
|
783 | return __awaiter(this, void 0, void 0, function* () {
|
784 | if (!command.failedNodes) {
|
785 | command.failedNodes = new Map();
|
786 | }
|
787 | RequestExecutor._addFailedResponseToCommand(chosenNode, command, req, response, body, error);
|
788 | if (nodeIndex === null) {
|
789 | return false;
|
790 | }
|
791 | this._spawnHealthChecks(chosenNode, nodeIndex);
|
792 | if (!this._nodeSelector) {
|
793 | return false;
|
794 | }
|
795 | this._nodeSelector.onFailedRequest(nodeIndex);
|
796 | const currentIndexAndNode = this._nodeSelector.getPreferredNode();
|
797 | if (command.failedNodes.has(currentIndexAndNode.currentNode)) {
|
798 | return false;
|
799 | }
|
800 | yield this._executeOnSpecificNode(command, sessionInfo, {
|
801 | chosenNode: currentIndexAndNode.currentNode,
|
802 | nodeIndex: currentIndexAndNode.currentIndex,
|
803 | shouldRetry: shouldRetry
|
804 | });
|
805 | return true;
|
806 | });
|
807 | }
|
808 | static _addFailedResponseToCommand(chosenNode, command, req, response, body, e) {
|
809 | if (response && body) {
|
810 | const responseJson = body;
|
811 | try {
|
812 | const resExceptionSchema = Serializer_1.JsonSerializer
|
813 | .getDefaultForCommandPayload()
|
814 | .deserialize(responseJson);
|
815 | const readException = Exceptions_1.ExceptionDispatcher.get(resExceptionSchema, response.statusCode);
|
816 | command.failedNodes.set(chosenNode, readException);
|
817 | }
|
818 | catch (_) {
|
819 | log.warn(_, "Error parsing server error.");
|
820 | const unrecognizedErrSchema = {
|
821 | url: req.uri,
|
822 | message: "Unrecognized response from the server",
|
823 | error: responseJson,
|
824 | type: "Unparsable Server Response"
|
825 | };
|
826 | const exceptionToUse = Exceptions_1.ExceptionDispatcher.get(unrecognizedErrSchema, response.statusCode);
|
827 | command.failedNodes.set(chosenNode, exceptionToUse);
|
828 | }
|
829 | return;
|
830 | }
|
831 | const exceptionSchema = {
|
832 | url: req.uri.toString(),
|
833 | message: e.message,
|
834 | error: `An exception occurred while contacting ${req.uri} . ${os.EOL + e.stack}`,
|
835 | type: e.name
|
836 | };
|
837 | command.failedNodes.set(chosenNode, Exceptions_1.ExceptionDispatcher.get(exceptionSchema, StatusCode_1.StatusCodes.ServiceUnavailable));
|
838 | }
|
839 | _createRequest(node, command) {
|
840 | const req = Object.assign(command.createRequest(node), this._defaultRequestOptions);
|
841 | req.headers = req.headers || {};
|
842 | if (this._authOptions) {
|
843 | const agentOptions = this._certificate.toAgentOptions();
|
844 | req.agentOptions = Object.assign(req.agentOptions || {}, agentOptions);
|
845 | }
|
846 | if (!req.headers[Constants_1.HEADERS.CLIENT_VERSION]) {
|
847 | req.headers[Constants_1.HEADERS.CLIENT_VERSION] = RequestExecutor.CLIENT_VERSION;
|
848 | }
|
849 | if (RequestExecutor.requestPostProcessor) {
|
850 | RequestExecutor.requestPostProcessor(req);
|
851 | }
|
852 | return req;
|
853 | }
|
854 | static _handleConflict(response, body) {
|
855 | Exceptions_1.ExceptionDispatcher.throwException(response, body);
|
856 | }
|
857 | _spawnHealthChecks(chosenNode, nodeIndex) {
|
858 | if (this._disposed) {
|
859 | return;
|
860 | }
|
861 | if (this._failedNodesTimers.has(chosenNode)) {
|
862 | return;
|
863 | }
|
864 | this._log.info(`Spawn health checks for node ${chosenNode.url}.`);
|
865 | const nodeStatus = new NodeStatus(nodeIndex, chosenNode, (nStatus) => this._checkNodeStatusCallback(nStatus));
|
866 | this._failedNodesTimers.set(chosenNode, nodeStatus);
|
867 | nodeStatus.startTimer();
|
868 | }
|
869 | _checkNodeStatusCallback(nodeStatus) {
|
870 | const copy = this.getTopologyNodes();
|
871 | if (nodeStatus.nodeIndex >= copy.length) {
|
872 | return;
|
873 | }
|
874 | const serverNode = copy[nodeStatus.nodeIndex];
|
875 | if (serverNode !== nodeStatus.node) {
|
876 | return;
|
877 | }
|
878 | return Promise.resolve()
|
879 | .then(() => {
|
880 | let status;
|
881 | return Promise.resolve(this._performHealthCheck(serverNode, nodeStatus.nodeIndex))
|
882 | .then(() => {
|
883 | status = this._failedNodesTimers[nodeStatus.nodeIndex];
|
884 | if (status) {
|
885 | this._failedNodesTimers.delete(nodeStatus.node);
|
886 | status.dispose();
|
887 | }
|
888 | if (this._nodeSelector) {
|
889 | this._nodeSelector.restoreNodeIndex(nodeStatus.nodeIndex);
|
890 | }
|
891 | }, err => {
|
892 | this._log.error(err, `${serverNode.clusterTag} is still down`);
|
893 | status = this._failedNodesTimers.get(nodeStatus.node);
|
894 | if (status) {
|
895 | nodeStatus.updateTimer();
|
896 | }
|
897 | });
|
898 | })
|
899 | .catch(err => {
|
900 | this._log.error(err, "Failed to check node topology, will ignore this node until next topology update.");
|
901 | });
|
902 | }
|
903 | _performHealthCheck(serverNode, nodeIndex) {
|
904 | return this._executeOnSpecificNode(RequestExecutor._failureCheckOperation.getCommand(this._conventions), null, {
|
905 | chosenNode: serverNode,
|
906 | nodeIndex,
|
907 | shouldRetry: false,
|
908 | });
|
909 | }
|
910 | _setDefaultRequestOptions() {
|
911 | this._defaultRequestOptions = Object.assign(DEFAULT_REQUEST_OPTIONS, {
|
912 | gzip: !(this._conventions.hasExplicitlySetCompressionUsage && !this._conventions.useCompression)
|
913 | }, this._customHttpRequestOptions);
|
914 | }
|
915 | dispose() {
|
916 | this._log.info("Dispose.");
|
917 | if (this._disposed) {
|
918 | return;
|
919 | }
|
920 | this._disposed = true;
|
921 | this._updateClientConfigurationSemaphore.take(TypeUtil_1.TypeUtil.NOOP);
|
922 | this._updateDatabaseTopologySemaphore.take(TypeUtil_1.TypeUtil.NOOP);
|
923 | this._cache.dispose();
|
924 | if (this._updateTopologyTimer) {
|
925 | this._updateTopologyTimer.dispose();
|
926 | }
|
927 | this._disposeAllFailedNodesTimers();
|
928 | }
|
929 | }
|
930 | RequestExecutor.CLIENT_VERSION = "4.1.0";
|
931 | RequestExecutor._failureCheckOperation = new GetStatisticsOperation_1.GetStatisticsOperation("failure=check");
|
932 | RequestExecutor.requestPostProcessor = null;
|
933 | exports.RequestExecutor = RequestExecutor;
|