UNPKG

4.32 kBJavaScriptView Raw
1/* --------------------
2 * lock-queue module
3 *
4 * A `Locker` is a queue where items in the queue can either require an exclusive
5 * or non-exclusive lock.
6 *
7 * Items in the queue are executed in the order that they are added.
8 * Items requiring a non-exclusive lock run concurrently.
9 *
10 * When the next item in the queue requires an exclusive lock, all currently running
11 * items are awaited before the exclusive item begins execution.
12 * All other items are then held in the queue until the exclusive-lock function has finished.
13 * When the exclusive-lock function has finished, the rest of the queue begins processing again.
14 * -------------------- */
15
16// Modules
17var promisify = require('promisify-any');
18
19// Imports
20var defer = require('./defer');
21
22// Exports
23
24/**
25 * Locker constructor
26 */
27function Locker() {
28 if (!(this instanceof Locker)) return new Locker();
29
30 this.locked = false; // Whether any process has an exclusive lock at present
31 this.busy = false; // Whether any processes are currently running or waiting
32 this.running = 0; // Number of processes currently running
33 this.queue = []; // Queue of jobs awaiting lock release
34}
35
36module.exports = Locker;
37
38/**
39 * Run `fn` when a non-exclusive lock becomes available (or immediately if no pending locks)
40 *
41 * `fn` is run with this context `ctx`. i.e. `fn.call(ctx)`
42 * Returns a Promise which resolves/rejects when `fn` completes execution.
43 *
44 * @param {Function} fn - Function to queue
45 * @param {*} [ctx] - `this` context to run function with
46 * @returns {Promise} - Promise which resolves/rejects with eventual outcome of `fn()`
47 */
48Locker.prototype.run = function(fn, ctx) {
49 return joinQueue.call(this, fn, ctx, false);
50};
51
52/**
53 * Run `fn` when an exclusive lock becomes available (or immediately if no pending locks)
54 *
55 * All other jobs are queued up until `fn` resolves/rejects
56 * Returns a Promise which resolves/rejects when `fn` completes execution.
57 *
58 * @param {Function} fn - Function to queue
59 * @param {*} [ctx] - `this` context to run function with
60 * @returns {Promise} - Promise which resolves/rejects with eventual outcome of `fn()`
61 */
62Locker.prototype.lock = function(fn, ctx) {
63 return joinQueue.call(this, fn, ctx, true);
64};
65
66/**
67 * Add process to the queue and run queue
68 *
69 * @param {Function} fn - Function to queue
70 * @param {*} [ctx] - `this` context to run function with
71 * @param {boolean} - `true` if function requires an exclusive lock
72 * @returns {Promise} - Promise which resolves/rejects with eventual outcome of `fn()`
73 */
74function joinQueue(fn, ctx, exclusive) {
75 // Promisify `fn`
76 fn = promisify(fn, 0);
77
78 // Add into queue
79 var deferred = defer();
80 this.queue.push({fn: fn, ctx: ctx, deferred: deferred, exclusive: exclusive});
81
82 // Run queue
83 runQueue.call(this);
84
85 // Return deferred promise
86 return deferred.promise;
87}
88
89/**
90 * Run queue
91 * @returns {undefined}
92 */
93function runQueue() {
94 // Flag whether locker is busy (i.e. has current or pending processes)
95 this.busy = (this.running || this.queue.length);
96
97 // Run all items in queue until item requiring exclusive lock
98 while (true) {
99 var again = runNext.call(this);
100 if (!again) return;
101 }
102}
103
104/**
105 * Run next item in queue
106 * @returns {boolean} - `true` if was able to run an item, `false` if not
107 */
108function runNext() {
109 if (this.locked) return false;
110
111 var item = this.queue[0];
112 if (!item) return false;
113
114 if (item.exclusive) {
115 if (this.running) return false;
116 this.locked = true;
117 }
118
119 this.queue.shift();
120
121 this.running++;
122
123 var self = this;
124 item.fn.call(item.ctx).then(function(res) {
125 runDone.call(self, item, true, res);
126 }, function(err) {
127 runDone.call(self, item, false, err);
128 });
129
130 return true;
131}
132
133/**
134 * Run complete.
135 * Update state, resolve deferred promise, and run queue again.
136 * @param {Object} - Queue item
137 * @param {boolean} - `true` if function resolved, `false` if rejected
138 * @param {*} - Result of function (resolve value or reject reason)
139 * @returns {undefined}
140 */
141function runDone(item, resolved, res) {
142 // Adjust state of lock
143 this.running--;
144 if (this.locked) this.locked = false;
145
146 // Resolve/reject promise
147 item.deferred[resolved ? 'resolve' : 'reject'](res);
148
149 // Run queue again
150 runQueue.call(this);
151}