UNPKG

7.81 kBJavaScriptView Raw
1"use strict";
2var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
3 function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
4 return new (P || (P = Promise))(function (resolve, reject) {
5 function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
6 function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
7 function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
8 step((generator = generator.apply(thisArg, _arguments || [])).next());
9 });
10};
11var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
12 if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
13 if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
14 return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
15};
16var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) {
17 if (kind === "m") throw new TypeError("Private method is not writable");
18 if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter");
19 if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it");
20 return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value;
21};
22var _ListWatchState_resourceVersion;
23Object.defineProperty(exports, "__esModule", { value: true });
24exports.ListWatch = void 0;
25const resource_listwatch_error_1 = require("./resource_listwatch_error");
26const debug = require("debug")("kubernetes:resource:listwatch");
27const sleep = (ms) => new Promise(res => setTimeout(res, ms));
28const doNothing = () => {
29};
30class ListWatchState {
31 constructor() {
32 _ListWatchState_resourceVersion.set(this, 0);
33 this.running = false;
34 this.errorCount = 0;
35 this.successCount = 0;
36 }
37 set resourceVersion(value) {
38 if (typeof value === "string") {
39 value = parseInt(value, 10);
40 }
41 if (value > __classPrivateFieldGet(this, _ListWatchState_resourceVersion, "f")) {
42 __classPrivateFieldSet(this, _ListWatchState_resourceVersion, value, "f");
43 }
44 }
45 get resourceVersion() {
46 return __classPrivateFieldGet(this, _ListWatchState_resourceVersion, "f");
47 }
48 start() {
49 this.running = true;
50 this.successCount = 0;
51 this.errorCount = 0;
52 }
53 markSuccess() {
54 this.successCount++;
55 this.errorCount = 0;
56 }
57}
58_ListWatchState_resourceVersion = new WeakMap();
59class ListWatch {
60 constructor(onWatchEvent, onWatchError, client, baseURL, resourceBaseURL, opts, metrics) {
61 var _a;
62 this.state = new ListWatchState();
63 this.onWatchEvent = onWatchEvent;
64 this.onWatchError = onWatchError !== null && onWatchError !== void 0 ? onWatchError : doNothing;
65 this.errorStrategy = (_a = opts.errorStrategy) !== null && _a !== void 0 ? _a : resource_listwatch_error_1.DefaultListWatchErrorStrategy;
66 this.client = client;
67 this.baseURL = baseURL;
68 this.resourceBaseURL = resourceBaseURL;
69 this.opts = opts;
70 this.metrics = metrics;
71 }
72 handler(event) {
73 return __awaiter(this, void 0, void 0, function* () {
74 if (event.object.metadata.resourceVersion) {
75 this.state.resourceVersion = event.object.metadata.resourceVersion;
76 }
77 yield this.onWatchEvent(event);
78 });
79 }
80 resync() {
81 return __awaiter(this, void 0, void 0, function* () {
82 const list = yield this.client.get(this.baseURL, this.opts);
83 if (list.metadata.resourceVersion) {
84 this.state.resourceVersion = list.metadata.resourceVersion;
85 }
86 if (this.opts.onResync) {
87 yield this.opts.onResync(list.items || []);
88 }
89 if (!this.opts.skipAddEventsOnResync) {
90 for (const i of list.items || []) {
91 const event = { type: "ADDED", object: i };
92 yield this.onWatchEvent(event);
93 }
94 }
95 });
96 }
97 run() {
98 this.state.start();
99 this.metrics.watchOpenCount.inc({ baseURL: this.baseURL });
100 debug("starting list-watch on %o", this.resourceBaseURL);
101 const initialized = this.resync();
102 const { resyncAfterIterations = 10 } = this.opts;
103 const done = initialized.then(() => __awaiter(this, void 0, void 0, function* () {
104 this.state.markSuccess();
105 const onEstablished = () => this.state.markSuccess();
106 debug("initial list for list-watch on %o completed", this.resourceBaseURL);
107 while (this.state.running) {
108 try {
109 if (this.state.successCount % resyncAfterIterations === 0) {
110 debug(`resyncing after ${resyncAfterIterations} successful WATCH iterations`);
111 yield this.resync();
112 }
113 debug("resuming watch after %o successful iterations and %o errors", this.state.successCount, this.state.errorCount);
114 const watchOpts = Object.assign(Object.assign({}, this.opts), { resourceVersion: this.state.resourceVersion, onEstablished });
115 const result = yield this.client.watch(this.baseURL, e => this.handler(e), e => this.onWatchError(e), watchOpts);
116 if (result.resyncRequired) {
117 debug(`resyncing listwatch`);
118 yield this.resync();
119 continue;
120 }
121 this.state.resourceVersion = result.resourceVersion;
122 }
123 catch (err) {
124 yield this.handleWatchIterationError(err);
125 }
126 }
127 this.metrics.watchOpenCount.dec({ baseURL: this.baseURL });
128 }));
129 return {
130 initialized,
131 done,
132 stop() {
133 this.running = false;
134 },
135 };
136 }
137 handleWatchIterationError(err) {
138 return __awaiter(this, void 0, void 0, function* () {
139 this.state.errorCount++;
140 const reaction = this.errorStrategy(err, this.state.errorCount);
141 this.metrics.watchResyncErrorCount.inc({ baseURL: this.baseURL });
142 debug("encountered error while watching: %o; determined reaction: %o", err, reaction);
143 if (this.opts.onError) {
144 yield this.opts.onError(err);
145 }
146 if (this.opts.abortAfterErrorCount && this.state.errorCount > this.opts.abortAfterErrorCount) {
147 this.metrics.watchOpenCount.dec({ baseURL: this.baseURL });
148 throw new Error(`more than ${this.opts.abortAfterErrorCount} consecutive errors when watching ${this.baseURL}`);
149 }
150 if (reaction.backoff) {
151 debug("resuming watch after back-off of %o ms", reaction.backoff);
152 yield sleep(reaction.backoff);
153 }
154 if (reaction.resync) {
155 debug("resuming watch with resync after error: %o", err);
156 yield this.resync();
157 }
158 });
159 }
160}
161exports.ListWatch = ListWatch;
162//# sourceMappingURL=resource_listwatch.js.map
\No newline at end of file