UNPKG

13.1 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const tslib_1 = require("tslib");
4const decorators_1 = require("./decorators");
5const path = require("path");
6const lockfile_1 = require("./lockfile");
7const errors_1 = require("./errors");
8const isProcessActive = require("is-process-active");
9const version = require('../package.json').version;
10class RWLockfile {
11 /**
12 * creates a new read/write lockfile
13 * @param base {string} - base filepath to create lock from
14 */
15 constructor(base, options = {}) {
16 this._count = { read: 0, write: 0 };
17 this.base = base;
18 this._debug = options.debug || (debugEnvVar() && require('debug')('rwlockfile'));
19 this.uuid = require('uuid/v4')();
20 this.fs = require('fs-extra');
21 this.timeout = options.timeout || 30000;
22 this.retryInterval = options.retryInterval || 10;
23 this.ifLocked = options.ifLocked || (() => { });
24 instances.push(this);
25 this.internal = new lockfile_1.default(this.file, {
26 debug: debugEnvVar() === 2 && this._debug,
27 });
28 }
29 get count() {
30 return { read: this._count.read, write: this._count.write };
31 }
32 get file() {
33 return path.resolve(this.base + '.lock');
34 }
35 async add(type, opts = {}) {
36 this._debugReport('add', type, opts);
37 if (!this._count[type])
38 await this._lock(type, opts);
39 this._count[type]++;
40 }
41 addSync(type, opts = {}) {
42 this._debugReport('addSync', type, opts);
43 this._lockSync(type, opts.reason);
44 }
45 async remove(type) {
46 this._debugReport('remove', type);
47 switch (this.count[type]) {
48 case 0:
49 break;
50 case 1:
51 await this.unlock(type);
52 break;
53 default:
54 this._count[type]--;
55 break;
56 }
57 }
58 removeSync(type) {
59 this._debugReport('removeSync', type);
60 switch (this.count[type]) {
61 case 0:
62 break;
63 case 1:
64 this.unlockSync(type);
65 break;
66 default:
67 this._count[type]--;
68 break;
69 }
70 }
71 async unlock(type) {
72 if (!type) {
73 await this.unlock('read');
74 await this.unlock('write');
75 return;
76 }
77 if (!this.count[type])
78 return;
79 await this._removeJob(type);
80 this._count[type] = 0;
81 }
82 unlockSync(type) {
83 if (!type) {
84 this.unlockSync('write');
85 this.unlockSync('read');
86 return;
87 }
88 if (!this.count[type])
89 return;
90 this._debugReport('unlockSync', type);
91 this._removeJobSync(type);
92 this._count[type] = 0;
93 }
94 async check(type) {
95 const f = await this._fetchFile();
96 const status = this._statusFromFile(type, f);
97 if (status.status === 'open')
98 return status;
99 else if (status.status === 'write_lock') {
100 if (!await isActive(status.job.pid)) {
101 this.debug(`removing inactive write pid: ${status.job.pid}`);
102 delete f.writer;
103 await this.writeFile(f);
104 return this.check(type);
105 }
106 return status;
107 }
108 else if (status.status === 'read_lock') {
109 const pids = await Promise.all(status.jobs.map(async (j) => {
110 if (!await isActive(j.pid))
111 return j.pid;
112 }));
113 const inactive = pids.filter(p => !!p);
114 if (inactive.length) {
115 this.debug(`removing inactive read pids: ${inactive}`);
116 f.readers = f.readers.filter(j => !inactive.includes(j.pid));
117 await this.writeFile(f);
118 return this.check(type);
119 }
120 if (!status.jobs.find(j => j.uuid !== this.uuid))
121 return { status: 'open', file: this.file };
122 return status;
123 }
124 else
125 throw new Error(`Unexpected status: ${status.status}`);
126 }
127 checkSync(type) {
128 const f = this._fetchFileSync();
129 const status = this._statusFromFile(type, f);
130 if (status.status === 'open')
131 return status;
132 else if (status.status === 'write_lock') {
133 if (!isActiveSync(status.job.pid)) {
134 this.debug(`removing inactive writer pid: ${status.job.pid}`);
135 delete f.writer;
136 this.writeFileSync(f);
137 return this.checkSync(type);
138 }
139 return status;
140 }
141 else if (status.status === 'read_lock') {
142 const inactive = status.jobs.map(j => j.pid).filter(pid => !isActiveSync(pid));
143 if (inactive.length) {
144 this.debug(`removing inactive reader pids: ${inactive}`);
145 f.readers = f.readers.filter(j => !inactive.includes(j.pid));
146 this.writeFileSync(f);
147 return this.checkSync(type);
148 }
149 if (!status.jobs.find(j => j.uuid !== this.uuid))
150 return { status: 'open', file: this.file };
151 return status;
152 }
153 else
154 throw new Error(`Unexpected status: ${status.status}`);
155 }
156 _statusFromFile(type, f) {
157 if (type === 'write' && this.count.write)
158 return { status: 'open', file: this.file };
159 if (type === 'read' && this.count.write)
160 return { status: 'open', file: this.file };
161 if (f.writer)
162 return { status: 'write_lock', job: f.writer, file: this.file };
163 if (type === 'write') {
164 if (f.readers.length)
165 return { status: 'read_lock', jobs: f.readers, file: this.file };
166 }
167 return { status: 'open', file: this.file };
168 }
169 _parseFile(input) {
170 function addDate(job) {
171 if (!job)
172 return;
173 return Object.assign({}, job, { created: new Date(job.created || 0) });
174 }
175 return Object.assign({}, input, { writer: addDate(input.writer), readers: input.readers.map(addDate) });
176 }
177 _stringifyFile(input) {
178 function addDate(job) {
179 if (!job)
180 return;
181 return Object.assign({}, job, { created: (job.created || new Date(0)).toISOString() });
182 }
183 return Object.assign({}, input, { writer: addDate(input.writer), readers: (input.readers || []).map(addDate) });
184 }
185 async _fetchFile() {
186 try {
187 let f = await this.fs.readJSON(this.file);
188 return this._parseFile(f);
189 }
190 catch (err) {
191 if (err.code !== 'ENOENT')
192 this.debug(err);
193 return {
194 version,
195 readers: [],
196 };
197 }
198 }
199 _fetchFileSync() {
200 try {
201 let f = this.fs.readJSONSync(this.file);
202 return this._parseFile(f);
203 }
204 catch (err) {
205 if (err.code !== 'ENOENT')
206 this.debug(err);
207 return {
208 version,
209 readers: [],
210 };
211 }
212 }
213 addJob(type, reason, f) {
214 let job = {
215 reason,
216 pid: process.pid,
217 created: new Date(),
218 uuid: this.uuid,
219 };
220 if (type === 'read')
221 f.readers.push(job);
222 else
223 f.writer = job;
224 }
225 async _removeJob(type) {
226 let f = await this._fetchFile();
227 this._removeJobFromFile(type, f);
228 await this.writeFile(f);
229 }
230 _removeJobSync(type) {
231 let f = this._fetchFileSync();
232 this._removeJobFromFile(type, f);
233 this.writeFileSync(f);
234 }
235 _removeJobFromFile(type, f) {
236 if (type === 'read')
237 f.readers = f.readers.filter(r => r.uuid !== this.uuid);
238 else if (f.writer && f.writer.uuid === this.uuid)
239 delete f.writer;
240 }
241 async _lock(type, opts) {
242 opts.timeout = opts.timeout || this.timeout;
243 opts.retryInterval = opts.retryInterval || this.retryInterval;
244 let ifLockedCb = once(opts.ifLocked || this.ifLocked);
245 while (true) {
246 try {
247 await this.tryLock(type, opts.reason, false);
248 return;
249 }
250 catch (err) {
251 if (err.code !== 'ELOCK')
252 throw err;
253 await ifLockedCb(err.status);
254 if (opts.timeout < 0)
255 throw err;
256 // try again
257 const interval = random(opts.retryInterval / 2, opts.retryInterval * 2);
258 await wait(interval);
259 opts.timeout -= interval;
260 opts.retryInterval *= 2;
261 }
262 }
263 }
264 async tryLock(type, reason, inc = true) {
265 if (this.count[type]) {
266 if (inc)
267 this._count[type]++;
268 return;
269 }
270 this.debug('tryLock', type, reason);
271 const status = await this.check(type);
272 if (status.status !== 'open') {
273 this.debug('status: %o', status);
274 throw new errors_1.RWLockfileError(status);
275 }
276 let f = await this._fetchFile();
277 this.addJob(type, reason, f);
278 await this.writeFile(f);
279 if (inc)
280 this._count[type]++;
281 this.debug('got %s lock for %s', type, reason);
282 }
283 _lockSync(type, reason) {
284 if (this._count[type]) {
285 this._count[type]++;
286 return;
287 }
288 const status = this.checkSync(type);
289 if (status.status !== 'open') {
290 this.debug('status: %o', status);
291 throw new errors_1.RWLockfileError(status);
292 }
293 let f = this._fetchFileSync();
294 this.addJob(type, reason, f);
295 this.writeFileSync(f);
296 this._count[type]++;
297 this.debug('got %s lock for %s', type, reason);
298 }
299 async writeFile(f) {
300 if (!f.writer && !f.readers.length) {
301 await this.fs.remove(this.file);
302 }
303 else {
304 await this.fs.outputJSON(this.file, this._stringifyFile(f));
305 }
306 }
307 writeFileSync(f) {
308 if (!f.writer && !f.readers.length) {
309 try {
310 this.fs.unlinkSync(this.file);
311 }
312 catch (err) {
313 if (err.code !== 'ENOENT')
314 throw err;
315 }
316 }
317 else {
318 this.fs.outputJSONSync(this.file, this._stringifyFile(f));
319 }
320 }
321 get debug() {
322 return this._debug || ((..._) => { });
323 }
324 _debugReport(action, type, { reason } = {}) {
325 const operator = (action.startsWith('unlock') && `-${this.count[type]}`) || (action.startsWith('remove') && '-1') || '+1';
326 const read = this.count['read'] + (type === 'read' ? operator : '');
327 const write = this.count['write'] + (type === 'write' ? operator : '');
328 reason = reason ? ` reason:${reason}` : '';
329 this.debug(`read:${read} write:${write}${reason} ${this.file}`);
330 }
331}
332tslib_1.__decorate([
333 decorators_1.lockfile('internal')
334], RWLockfile.prototype, "check", null);
335tslib_1.__decorate([
336 decorators_1.lockfile('internal', { sync: true })
337], RWLockfile.prototype, "checkSync", null);
338tslib_1.__decorate([
339 decorators_1.onceAtATime(0),
340 decorators_1.lockfile('internal')
341], RWLockfile.prototype, "_removeJob", null);
342tslib_1.__decorate([
343 decorators_1.lockfile('internal', { sync: true })
344], RWLockfile.prototype, "_removeJobSync", null);
345tslib_1.__decorate([
346 decorators_1.onceAtATime(0)
347], RWLockfile.prototype, "_lock", null);
348tslib_1.__decorate([
349 decorators_1.lockfile('internal')
350], RWLockfile.prototype, "tryLock", null);
351tslib_1.__decorate([
352 decorators_1.lockfile('internal', { sync: true })
353], RWLockfile.prototype, "_lockSync", null);
354exports.RWLockfile = RWLockfile;
355const instances = [];
356process.once('exit', () => {
357 for (let i of instances) {
358 try {
359 i.unlockSync();
360 }
361 catch (err) { }
362 }
363});
364function debugEnvVar() {
365 return (((process.env.RWLOCKFILE_DEBUG === '1' || process.env.HEROKU_DEBUG_ALL) && 1) ||
366 (process.env.RWLOCKFILE_DEBUG === '2' && 2) ||
367 0);
368}
369function wait(ms) {
370 return new Promise(resolve => setTimeout(resolve, ms));
371}
372function random(min, max) {
373 return Math.floor(Math.random() * (max - min) + min);
374}
375function once(fn) {
376 let ran = false;
377 return ((...args) => {
378 if (ran)
379 return;
380 ran = true;
381 return fn(...args);
382 });
383}
384async function isActive(pid) {
385 try {
386 return await isProcessActive.isActive(pid);
387 }
388 catch (err) {
389 console.error(err);
390 return false;
391 }
392}
393function isActiveSync(pid) {
394 try {
395 return isProcessActive.isActiveSync(pid);
396 }
397 catch (err) {
398 console.error(err);
399 return false;
400 }
401}
402exports.default = RWLockfile;