1 | import { EventEmitter } from "events";
|
2 |
|
3 | import IGCExtentProvider from "../../common/IGCExtentProvider";
|
4 | import IGCManager from "../../common/IGCManager";
|
5 | import IExtentStore from "../../common/persistence/IExtentStore";
|
6 | import ILogger from "../generated/utils/ILogger";
|
7 | import { DEFAULT_GC_INTERVAL_MS } from "../utils/constants";
|
8 |
|
9 | enum Status {
|
10 | Initializing,
|
11 | Running,
|
12 | Closing,
|
13 | Closed
|
14 | }
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 | export default class BlobGCManager implements IGCManager {
|
28 | private _status: Status = Status.Closed;
|
29 | private emitter: EventEmitter = new EventEmitter();
|
30 |
|
31 | |
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 |
|
38 |
|
39 |
|
40 |
|
41 | constructor(
|
42 | private readonly referredExtentsProvider: IGCExtentProvider,
|
43 | private readonly allExtentsProvider: IGCExtentProvider,
|
44 | private readonly extentStore: IExtentStore,
|
45 | private readonly errorHandler: (err: Error) => void,
|
46 | private readonly logger: ILogger,
|
47 | public readonly gcIntervalInMS: number = DEFAULT_GC_INTERVAL_MS
|
48 | ) {
|
49 | this.emitter.once("error", this.errorHandler);
|
50 |
|
51 |
|
52 | if (gcIntervalInMS <= 0) {
|
53 | this.gcIntervalInMS = 1;
|
54 | }
|
55 | }
|
56 |
|
57 | public get status(): Status {
|
58 | return this._status;
|
59 | }
|
60 |
|
61 | |
62 |
|
63 |
|
64 |
|
65 |
|
66 |
|
67 | public async start(): Promise<void> {
|
68 | if (this._status === Status.Running) {
|
69 | this.logger.info(
|
70 | `BlobGCManager:start() BlobGCManager successfully started. BlobGCManager is already in Running status.`
|
71 | );
|
72 | return;
|
73 | }
|
74 |
|
75 | if (this._status !== Status.Closed) {
|
76 | const error = new Error(
|
77 | `BlobGCManager:start() BlobGCManager cannot start, current manager is under ${
|
78 | Status[this._status]
|
79 | }`
|
80 | );
|
81 | this.logger.error(error.message);
|
82 | throw error;
|
83 | }
|
84 |
|
85 | this.logger.info(
|
86 | `BlobGCManager:start() Starting BlobGCManager. Set status to Initializing.`
|
87 | );
|
88 | this._status = Status.Initializing;
|
89 |
|
90 | if (!this.referredExtentsProvider.isInitialized()) {
|
91 | this.logger.info(
|
92 | `BlobGCManager:start() blobDataStore doesn't boot up. Starting blobDataStore.`
|
93 | );
|
94 | await this.referredExtentsProvider.init();
|
95 | this.logger.info(
|
96 | `BlobGCManager:start() blobDataStore successfully started.`
|
97 | );
|
98 | }
|
99 |
|
100 | if (!this.allExtentsProvider.isInitialized()) {
|
101 | this.logger.info(
|
102 | `BlobGCManager:start() extentMetadata doesn't boot up. Starting extentMetadata.`
|
103 | );
|
104 | await this.allExtentsProvider.init();
|
105 | this.logger.info(
|
106 | `BlobGCManager:start() extentMetadata successfully started.`
|
107 | );
|
108 | }
|
109 |
|
110 | if (!this.extentStore.isInitialized()) {
|
111 | this.logger.info(
|
112 | `BlobGCManager:start() extentStore doesn't boot up. Starting extentStore.`
|
113 | );
|
114 | await this.extentStore.init();
|
115 | this.logger.info(
|
116 | `BlobGCManager:start() extentStore successfully started.`
|
117 | );
|
118 | }
|
119 |
|
120 | this.logger.info(
|
121 | `BlobGCManager:start() Trigger mark and sweep loop. Set status to Running.`
|
122 | );
|
123 | this._status = Status.Running;
|
124 |
|
125 | this.markSweepLoop()
|
126 | .then(() => {
|
127 | this.logger.info(
|
128 | `BlobGCManager:start() Mark and sweep loop is closed.`
|
129 | );
|
130 | this.emitter.emit("closed");
|
131 | })
|
132 | .catch(err => {
|
133 | this.logger.info(
|
134 | `BlobGCManager:start() Mark and sweep loop emits error: ${err.name} ${err.message}`
|
135 | );
|
136 | this.logger.info(`BlobGCManager:start() Set status to Closed.`);
|
137 | this._status = Status.Closed;
|
138 | this.emitter.emit("error", err);
|
139 | });
|
140 |
|
141 | this.logger.info(
|
142 | `BlobGCManager:start() BlobGCManager successfully started.`
|
143 | );
|
144 | }
|
145 |
|
146 | public async close(): Promise<void> {
|
147 | if (this._status === Status.Closed) {
|
148 | this.logger.info(
|
149 | `BlobGCManager:close() BlobGCManager successfully closed. BlobGCManager is already in Closed status.`
|
150 | );
|
151 | return;
|
152 | }
|
153 |
|
154 | if (this._status !== Status.Running) {
|
155 | const error = new Error(
|
156 | `BlobGCManager:close() BlobGCManager cannot close, current manager is under ${
|
157 | Status[this._status]
|
158 | }`
|
159 | );
|
160 | this.logger.error(error.message);
|
161 | throw error;
|
162 | }
|
163 |
|
164 | this.logger.info(
|
165 | `BlobGCManager:close() Start closing BlobGCManager. Set status to Closing.`
|
166 | );
|
167 | this._status = Status.Closing;
|
168 |
|
169 | this.emitter.emit("abort");
|
170 |
|
171 | return new Promise<void>(resolve => {
|
172 | this.emitter.once("closed", () => {
|
173 | this.logger.info(
|
174 | `BlobGCManager:close() BlobGCManager successfully closed. Set status to Closed.`
|
175 | );
|
176 | this._status = Status.Closed;
|
177 | resolve();
|
178 | });
|
179 | });
|
180 | }
|
181 |
|
182 | private async markSweepLoop(): Promise<void> {
|
183 | while (this._status === Status.Running) {
|
184 | this.logger.info(
|
185 | `BlobGCManager:markSweepLoop() Start next mark and sweep.`
|
186 | );
|
187 | const start = Date.now();
|
188 | await this.markSweep();
|
189 | const period = Date.now() - start;
|
190 | this.logger.info(
|
191 | `BlobGCManager:markSweepLoop() Mark and sweep finished, taken ${period}ms.`
|
192 | );
|
193 |
|
194 | if (this._status === Status.Running) {
|
195 | this.logger.info(
|
196 | `BlobGCManager:markSweepLoop() Sleep for ${this.gcIntervalInMS}ms.`
|
197 | );
|
198 | await this.sleep(this.gcIntervalInMS);
|
199 | }
|
200 | }
|
201 | }
|
202 |
|
203 | |
204 |
|
205 |
|
206 |
|
207 |
|
208 |
|
209 |
|
210 | private async markSweep(): Promise<void> {
|
211 |
|
212 | this.logger.info(`BlobGCManager:markSweep() Get all extents.`);
|
213 | const allExtents = await this.getAllExtents();
|
214 | this.logger.info(
|
215 | `BlobGCManager:markSweep() Got ${allExtents.size} extents.`
|
216 | );
|
217 |
|
218 | if (this._status !== Status.Running) {
|
219 | return;
|
220 | }
|
221 |
|
222 | this.logger.info(`BlobGCManager:markSweep() Get referred extents.`);
|
223 | const iter = this.referredExtentsProvider.iteratorExtents();
|
224 | for (
|
225 | let res = await iter.next();
|
226 | (res.done === false || res.value.length > 0) &&
|
227 | this._status === Status.Running;
|
228 | res = await iter.next()
|
229 | ) {
|
230 | const chunks = res.value;
|
231 | for (const chunk of chunks) {
|
232 | allExtents.delete(chunk);
|
233 | }
|
234 | }
|
235 | this.logger.info(
|
236 | `BlobGCManager:markSweep() Got referred extents, unreferenced extents count is ${allExtents.size}.`
|
237 | );
|
238 |
|
239 |
|
240 | if (allExtents.size > 0) {
|
241 | this.logger.info(
|
242 | `BlobGCManager:markSweep() Try to delete ${allExtents.entries} unreferenced extents.`
|
243 | );
|
244 | const deletedCount = await this.extentStore.deleteExtents(allExtents);
|
245 | this.logger.info(
|
246 | `BlobGCManager:markSweep() Deleted unreferenced ${deletedCount} extents, after excluding active write extents.`
|
247 | );
|
248 | }
|
249 | }
|
250 |
|
251 | private async getAllExtents(): Promise<Set<string>> {
|
252 | const ids: Set<string> = new Set<string>();
|
253 |
|
254 | const iter = this.allExtentsProvider.iteratorExtents();
|
255 | for (
|
256 | let res = await iter.next();
|
257 | (res.done === false || res.value.length > 0) &&
|
258 | this._status === Status.Running;
|
259 | res = await iter.next()
|
260 | ) {
|
261 | for (const chunk of res.value) {
|
262 | ids.add(chunk);
|
263 | }
|
264 | }
|
265 |
|
266 | return ids;
|
267 | }
|
268 |
|
269 | private async sleep(timeInMS: number): Promise<void> {
|
270 | if (timeInMS === 0) {
|
271 | return;
|
272 | }
|
273 |
|
274 | return new Promise<void>(resolve => {
|
275 | let timer: NodeJS.Timeout;
|
276 | const abortListener = () => {
|
277 | if (timer) {
|
278 | clearTimeout(timer);
|
279 | }
|
280 | this.emitter.removeListener("abort", abortListener);
|
281 | resolve();
|
282 | };
|
283 |
|
284 |
|
285 | timer = (setTimeout(() => {
|
286 | this.emitter.removeListener("abort", abortListener);
|
287 | resolve();
|
288 | }, timeInMS) as any) as NodeJS.Timeout;
|
289 | timer.unref();
|
290 | this.emitter.on("abort", abortListener);
|
291 | });
|
292 | }
|
293 | }
|
294 |
|
\ | No newline at end of file |