UNPKG

8.69 kBPlain TextView Raw
1import { EventEmitter } from "events";
2
3import IGCExtentProvider from "../../common/IGCExtentProvider";
4import IGCManager from "../../common/IGCManager";
5import IExtentStore from "../../common/persistence/IExtentStore";
6import ILogger from "../generated/utils/ILogger";
7import { DEFAULT_GC_INTERVAL_MS } from "../utils/constants";
8
9enum Status {
10 Initializing,
11 Running,
12 Closing,
13 Closed
14}
15
16/**
17 * GC manager to clean up unused extents mapped local files based on mark and sweep
18 * algorithm.
19 *
20 * In the future, GC manager can also help merging small extent mapped files
21 * into one big file to improve the performance.
22 *
23 * @export
24 * @class BlobGCManager
25 * @implements {IGCManager}
26 */
27export default class BlobGCManager implements IGCManager {
28 private _status: Status = Status.Closed;
29 private emitter: EventEmitter = new EventEmitter();
30
31 /**
32 * Creates an instance of BlobGCManager.
33 *
34 * @param {IBlobMetadataStore} blobDataStore
35 * @param {(err: Error) => void} errorHandler Error handler callback to handle critical errors during GC loop
36 * When an error happens, GC loop will close automatically
37 * @param {ILogger} logger
38 * @param {number} [gcIntervalInMS=DEFAULT_GC_INTERVAL_MS]
39 * @memberof BlobGCManager
40 */
41 constructor(
42 private readonly referredExtentsProvider: IGCExtentProvider,
43 private readonly allExtentsProvider: IGCExtentProvider,
44 private readonly extentStore: IExtentStore,
45 private readonly errorHandler: (err: Error) => void,
46 private readonly logger: ILogger,
47 public readonly gcIntervalInMS: number = DEFAULT_GC_INTERVAL_MS
48 ) {
49 this.emitter.once("error", this.errorHandler);
50
51 // Avoid infinite GC loop
52 if (gcIntervalInMS <= 0) {
53 this.gcIntervalInMS = 1;
54 }
55 }
56
57 public get status(): Status {
58 return this._status;
59 }
60
61 /**
62 * Initialize and start GC manager.
63 *
64 * @returns {Promise<void>}
65 * @memberof BlobGCManager
66 */
67 public async start(): Promise<void> {
68 if (this._status === Status.Running) {
69 this.logger.info(
70 `BlobGCManager:start() BlobGCManager successfully started. BlobGCManager is already in Running status.`
71 );
72 return;
73 }
74
75 if (this._status !== Status.Closed) {
76 const error = new Error(
77 `BlobGCManager:start() BlobGCManager cannot start, current manager is under ${
78 Status[this._status]
79 }`
80 );
81 this.logger.error(error.message);
82 throw error;
83 }
84
85 this.logger.info(
86 `BlobGCManager:start() Starting BlobGCManager. Set status to Initializing.`
87 );
88 this._status = Status.Initializing;
89
90 if (!this.referredExtentsProvider.isInitialized()) {
91 this.logger.info(
92 `BlobGCManager:start() blobDataStore doesn't boot up. Starting blobDataStore.`
93 );
94 await this.referredExtentsProvider.init();
95 this.logger.info(
96 `BlobGCManager:start() blobDataStore successfully started.`
97 );
98 }
99
100 if (!this.allExtentsProvider.isInitialized()) {
101 this.logger.info(
102 `BlobGCManager:start() extentMetadata doesn't boot up. Starting extentMetadata.`
103 );
104 await this.allExtentsProvider.init();
105 this.logger.info(
106 `BlobGCManager:start() extentMetadata successfully started.`
107 );
108 }
109
110 if (!this.extentStore.isInitialized()) {
111 this.logger.info(
112 `BlobGCManager:start() extentStore doesn't boot up. Starting extentStore.`
113 );
114 await this.extentStore.init();
115 this.logger.info(
116 `BlobGCManager:start() extentStore successfully started.`
117 );
118 }
119
120 this.logger.info(
121 `BlobGCManager:start() Trigger mark and sweep loop. Set status to Running.`
122 );
123 this._status = Status.Running;
124
125 this.markSweepLoop()
126 .then(() => {
127 this.logger.info(
128 `BlobGCManager:start() Mark and sweep loop is closed.`
129 );
130 this.emitter.emit("closed");
131 })
132 .catch(err => {
133 this.logger.info(
134 `BlobGCManager:start() Mark and sweep loop emits error: ${err.name} ${err.message}`
135 );
136 this.logger.info(`BlobGCManager:start() Set status to Closed.`);
137 this._status = Status.Closed;
138 this.emitter.emit("error", err);
139 });
140
141 this.logger.info(
142 `BlobGCManager:start() BlobGCManager successfully started.`
143 );
144 }
145
146 public async close(): Promise<void> {
147 if (this._status === Status.Closed) {
148 this.logger.info(
149 `BlobGCManager:close() BlobGCManager successfully closed. BlobGCManager is already in Closed status.`
150 );
151 return;
152 }
153
154 if (this._status !== Status.Running) {
155 const error = new Error(
156 `BlobGCManager:close() BlobGCManager cannot close, current manager is under ${
157 Status[this._status]
158 }`
159 );
160 this.logger.error(error.message);
161 throw error;
162 }
163
164 this.logger.info(
165 `BlobGCManager:close() Start closing BlobGCManager. Set status to Closing.`
166 );
167 this._status = Status.Closing;
168
169 this.emitter.emit("abort");
170
171 return new Promise<void>(resolve => {
172 this.emitter.once("closed", () => {
173 this.logger.info(
174 `BlobGCManager:close() BlobGCManager successfully closed. Set status to Closed.`
175 );
176 this._status = Status.Closed;
177 resolve();
178 });
179 });
180 }
181
182 private async markSweepLoop(): Promise<void> {
183 while (this._status === Status.Running) {
184 this.logger.info(
185 `BlobGCManager:markSweepLoop() Start next mark and sweep.`
186 );
187 const start = Date.now();
188 await this.markSweep();
189 const period = Date.now() - start;
190 this.logger.info(
191 `BlobGCManager:markSweepLoop() Mark and sweep finished, taken ${period}ms.`
192 );
193
194 if (this._status === Status.Running) {
195 this.logger.info(
196 `BlobGCManager:markSweepLoop() Sleep for ${this.gcIntervalInMS}ms.`
197 );
198 await this.sleep(this.gcIntervalInMS);
199 }
200 }
201 }
202
203 /**
204 * Typical mark-sweep GC algorithm.
205 *
206 * @private
207 * @returns {Promise<void>}
208 * @memberof BlobGCManager
209 */
210 private async markSweep(): Promise<void> {
211 // mark
212 this.logger.info(`BlobGCManager:markSweep() Get all extents.`);
213 const allExtents = await this.getAllExtents();
214 this.logger.info(
215 `BlobGCManager:markSweep() Got ${allExtents.size} extents.`
216 );
217
218 if (this._status !== Status.Running) {
219 return;
220 }
221
222 this.logger.info(`BlobGCManager:markSweep() Get referred extents.`);
223 const iter = this.referredExtentsProvider.iteratorExtents();
224 for (
225 let res = await iter.next();
226 (res.done === false || res.value.length > 0) &&
227 this._status === Status.Running;
228 res = await iter.next()
229 ) {
230 const chunks = res.value;
231 for (const chunk of chunks) {
232 allExtents.delete(chunk); // TODO: Mark instead of removing from Set to improve performance
233 }
234 }
235 this.logger.info(
236 `BlobGCManager:markSweep() Got referred extents, unreferenced extents count is ${allExtents.size}.`
237 );
238
239 // sweep
240 if (allExtents.size > 0) {
241 this.logger.info(
242 `BlobGCManager:markSweep() Try to delete ${allExtents.entries} unreferenced extents.`
243 );
244 const deletedCount = await this.extentStore.deleteExtents(allExtents);
245 this.logger.info(
246 `BlobGCManager:markSweep() Deleted unreferenced ${deletedCount} extents, after excluding active write extents.`
247 );
248 }
249 }
250
251 private async getAllExtents(): Promise<Set<string>> {
252 const ids: Set<string> = new Set<string>();
253
254 const iter = this.allExtentsProvider.iteratorExtents();
255 for (
256 let res = await iter.next();
257 (res.done === false || res.value.length > 0) &&
258 this._status === Status.Running;
259 res = await iter.next()
260 ) {
261 for (const chunk of res.value) {
262 ids.add(chunk);
263 }
264 }
265
266 return ids;
267 }
268
269 private async sleep(timeInMS: number): Promise<void> {
270 if (timeInMS === 0) {
271 return;
272 }
273
274 return new Promise<void>(resolve => {
275 let timer: NodeJS.Timeout;
276 const abortListener = () => {
277 if (timer) {
278 clearTimeout(timer);
279 }
280 this.emitter.removeListener("abort", abortListener);
281 resolve();
282 };
283
284 // https://stackoverflow.com/questions/45802988/typescript-use-correct-version-of-settimeout-node-vs-window
285 timer = (setTimeout(() => {
286 this.emitter.removeListener("abort", abortListener);
287 resolve();
288 }, timeInMS) as any) as NodeJS.Timeout;
289 timer.unref();
290 this.emitter.on("abort", abortListener);
291 });
292 }
293}
294
\No newline at end of file