1 | "use strict";
|
2 | var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
|
3 | function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
|
4 | return new (P || (P = Promise))(function (resolve, reject) {
|
5 | function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
|
6 | function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
|
7 | function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
|
8 | step((generator = generator.apply(thisArg, _arguments || [])).next());
|
9 | });
|
10 | };
|
11 | var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
|
12 | if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
|
13 | if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
|
14 | return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
|
15 | };
|
16 | var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) {
|
17 | if (kind === "m") throw new TypeError("Private method is not writable");
|
18 | if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter");
|
19 | if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it");
|
20 | return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value;
|
21 | };
|
22 | var _ListWatchState_resourceVersion;
|
23 | Object.defineProperty(exports, "__esModule", { value: true });
|
24 | exports.ListWatch = void 0;
|
25 | const resource_listwatch_error_1 = require("./resource_listwatch_error");
|
26 | const debug = require("debug")("kubernetes:resource:listwatch");
|
27 | const sleep = (ms) => new Promise(res => setTimeout(res, ms));
|
28 | const doNothing = () => {
|
29 | };
|
30 | class ListWatchState {
|
31 | constructor() {
|
32 | _ListWatchState_resourceVersion.set(this, 0);
|
33 | this.running = false;
|
34 | this.errorCount = 0;
|
35 | this.successCount = 0;
|
36 | }
|
37 | set resourceVersion(value) {
|
38 | if (typeof value === "string") {
|
39 | value = parseInt(value, 10);
|
40 | }
|
41 | if (value > __classPrivateFieldGet(this, _ListWatchState_resourceVersion, "f")) {
|
42 | __classPrivateFieldSet(this, _ListWatchState_resourceVersion, value, "f");
|
43 | }
|
44 | }
|
45 | get resourceVersion() {
|
46 | return __classPrivateFieldGet(this, _ListWatchState_resourceVersion, "f");
|
47 | }
|
48 | start() {
|
49 | this.running = true;
|
50 | this.successCount = 0;
|
51 | this.errorCount = 0;
|
52 | }
|
53 | markSuccess() {
|
54 | this.successCount++;
|
55 | this.errorCount = 0;
|
56 | }
|
57 | }
|
58 | _ListWatchState_resourceVersion = new WeakMap();
|
59 | class ListWatch {
|
60 | constructor(onWatchEvent, onWatchError, client, baseURL, resourceBaseURL, opts, metrics) {
|
61 | var _a;
|
62 | this.state = new ListWatchState();
|
63 | this.onWatchEvent = onWatchEvent;
|
64 | this.onWatchError = onWatchError !== null && onWatchError !== void 0 ? onWatchError : doNothing;
|
65 | this.errorStrategy = (_a = opts.errorStrategy) !== null && _a !== void 0 ? _a : resource_listwatch_error_1.DefaultListWatchErrorStrategy;
|
66 | this.client = client;
|
67 | this.baseURL = baseURL;
|
68 | this.resourceBaseURL = resourceBaseURL;
|
69 | this.opts = opts;
|
70 | this.metrics = metrics;
|
71 | }
|
72 | handler(event) {
|
73 | return __awaiter(this, void 0, void 0, function* () {
|
74 | if (event.object.metadata.resourceVersion) {
|
75 | this.state.resourceVersion = event.object.metadata.resourceVersion;
|
76 | }
|
77 | yield this.onWatchEvent(event);
|
78 | });
|
79 | }
|
80 | resync() {
|
81 | return __awaiter(this, void 0, void 0, function* () {
|
82 | const list = yield this.client.get(this.baseURL, this.opts);
|
83 | if (list.metadata.resourceVersion) {
|
84 | this.state.resourceVersion = list.metadata.resourceVersion;
|
85 | }
|
86 | if (this.opts.onResync) {
|
87 | yield this.opts.onResync(list.items || []);
|
88 | }
|
89 | if (!this.opts.skipAddEventsOnResync) {
|
90 | for (const i of list.items || []) {
|
91 | const event = { type: "ADDED", object: i };
|
92 | yield this.onWatchEvent(event);
|
93 | }
|
94 | }
|
95 | });
|
96 | }
|
97 | run() {
|
98 | this.state.start();
|
99 | this.metrics.watchOpenCount.inc({ baseURL: this.baseURL });
|
100 | debug("starting list-watch on %o", this.resourceBaseURL);
|
101 | const initialized = this.resync();
|
102 | const { resyncAfterIterations = 10 } = this.opts;
|
103 | const done = initialized.then(() => __awaiter(this, void 0, void 0, function* () {
|
104 | this.state.markSuccess();
|
105 | const onEstablished = () => this.state.markSuccess();
|
106 | debug("initial list for list-watch on %o completed", this.resourceBaseURL);
|
107 | while (this.state.running) {
|
108 | try {
|
109 | if (this.state.successCount % resyncAfterIterations === 0) {
|
110 | debug(`resyncing after ${resyncAfterIterations} successful WATCH iterations`);
|
111 | yield this.resync();
|
112 | }
|
113 | debug("resuming watch after %o successful iterations and %o errors", this.state.successCount, this.state.errorCount);
|
114 | const watchOpts = Object.assign(Object.assign({}, this.opts), { resourceVersion: this.state.resourceVersion, onEstablished });
|
115 | const result = yield this.client.watch(this.baseURL, e => this.handler(e), e => this.onWatchError(e), watchOpts);
|
116 | if (result.resyncRequired) {
|
117 | debug(`resyncing listwatch`);
|
118 | yield this.resync();
|
119 | continue;
|
120 | }
|
121 | this.state.resourceVersion = result.resourceVersion;
|
122 | }
|
123 | catch (err) {
|
124 | yield this.handleWatchIterationError(err);
|
125 | }
|
126 | }
|
127 | this.metrics.watchOpenCount.dec({ baseURL: this.baseURL });
|
128 | }));
|
129 | return {
|
130 | initialized,
|
131 | done,
|
132 | stop() {
|
133 | this.running = false;
|
134 | },
|
135 | };
|
136 | }
|
137 | handleWatchIterationError(err) {
|
138 | return __awaiter(this, void 0, void 0, function* () {
|
139 | this.state.errorCount++;
|
140 | const reaction = this.errorStrategy(err, this.state.errorCount);
|
141 | this.metrics.watchResyncErrorCount.inc({ baseURL: this.baseURL });
|
142 | debug("encountered error while watching: %o; determined reaction: %o", err, reaction);
|
143 | if (this.opts.onError) {
|
144 | yield this.opts.onError(err);
|
145 | }
|
146 | if (this.opts.abortAfterErrorCount && this.state.errorCount > this.opts.abortAfterErrorCount) {
|
147 | this.metrics.watchOpenCount.dec({ baseURL: this.baseURL });
|
148 | throw new Error(`more than ${this.opts.abortAfterErrorCount} consecutive errors when watching ${this.baseURL}`);
|
149 | }
|
150 | if (reaction.backoff) {
|
151 | debug("resuming watch after back-off of %o ms", reaction.backoff);
|
152 | yield sleep(reaction.backoff);
|
153 | }
|
154 | if (reaction.resync) {
|
155 | debug("resuming watch with resync after error: %o", err);
|
156 | yield this.resync();
|
157 | }
|
158 | });
|
159 | }
|
160 | }
|
161 | exports.ListWatch = ListWatch;
|
162 |
|
\ | No newline at end of file |