1 | /* --------------------
|
2 | * lock-queue module
|
3 | *
|
4 | * A `Locker` is a queue where items in the queue can either require an exclusive
|
5 | * or non-exclusive lock.
|
6 | *
|
7 | * Items in the queue are executed in the order that they are added.
|
8 | * Items requiring a non-exclusive lock run concurrently.
|
9 | *
|
10 | * When the next item in the queue requires an exclusive lock, all currently running
|
11 | * items are awaited before the exclusive item begins execution.
|
12 | * All other items are then held in the queue until the exclusive-lock function has finished.
|
13 | * When the exclusive-lock function has finished, the rest of the queue begins processing again.
|
14 | * -------------------- */
|
15 |
|
16 | // Modules
|
17 | var promisify = require('promisify-any');
|
18 |
|
19 | // Imports
|
20 | var defer = require('./defer');
|
21 |
|
22 | // Exports
|
23 |
|
24 | /**
|
25 | * Locker constructor
|
26 | */
|
27 | function Locker() {
|
28 | if (!(this instanceof Locker)) return new Locker();
|
29 |
|
30 | this.locked = false; // Whether any process has an exclusive lock at present
|
31 | this.busy = false; // Whether any processes are currently running or waiting
|
32 | this.running = 0; // Number of processes currently running
|
33 | this.queue = []; // Queue of jobs awaiting lock release
|
34 | }
|
35 |
|
36 | module.exports = Locker;
|
37 |
|
38 | /**
|
39 | * Run `fn` when a non-exclusive lock becomes available (or immediately if no pending locks)
|
40 | *
|
41 | * `fn` is run with this context `ctx`. i.e. `fn.call(ctx)`
|
42 | * Returns a Promise which resolves/rejects when `fn` completes execution.
|
43 | *
|
44 | * @param {Function} fn - Function to queue
|
45 | * @param {*} [ctx] - `this` context to run function with
|
46 | * @returns {Promise} - Promise which resolves/rejects with eventual outcome of `fn()`
|
47 | */
|
48 | Locker.prototype.run = function(fn, ctx) {
|
49 | return joinQueue.call(this, fn, ctx, false);
|
50 | };
|
51 |
|
52 | /**
|
53 | * Run `fn` when an exclusive lock becomes available (or immediately if no pending locks)
|
54 | *
|
55 | * All other jobs are queued up until `fn` resolves/rejects
|
56 | * Returns a Promise which resolves/rejects when `fn` completes execution.
|
57 | *
|
58 | * @param {Function} fn - Function to queue
|
59 | * @param {*} [ctx] - `this` context to run function with
|
60 | * @returns {Promise} - Promise which resolves/rejects with eventual outcome of `fn()`
|
61 | */
|
62 | Locker.prototype.lock = function(fn, ctx) {
|
63 | return joinQueue.call(this, fn, ctx, true);
|
64 | };
|
65 |
|
66 | /**
|
67 | * Add process to the queue and run queue
|
68 | *
|
69 | * @param {Function} fn - Function to queue
|
70 | * @param {*} [ctx] - `this` context to run function with
|
71 | * @param {boolean} - `true` if function requires an exclusive lock
|
72 | * @returns {Promise} - Promise which resolves/rejects with eventual outcome of `fn()`
|
73 | */
|
74 | function joinQueue(fn, ctx, exclusive) {
|
75 | // Promisify `fn`
|
76 | fn = promisify(fn, 0);
|
77 |
|
78 | // Add into queue
|
79 | var deferred = defer();
|
80 | this.queue.push({fn: fn, ctx: ctx, deferred: deferred, exclusive: exclusive});
|
81 |
|
82 | // Run queue
|
83 | runQueue.call(this);
|
84 |
|
85 | // Return deferred promise
|
86 | return deferred.promise;
|
87 | }
|
88 |
|
89 | /**
|
90 | * Run queue
|
91 | * @returns {undefined}
|
92 | */
|
93 | function runQueue() {
|
94 | // Flag whether locker is busy (i.e. has current or pending processes)
|
95 | this.busy = (this.running || this.queue.length);
|
96 |
|
97 | // Run all items in queue until item requiring exclusive lock
|
98 | while (true) {
|
99 | var again = runNext.call(this);
|
100 | if (!again) return;
|
101 | }
|
102 | }
|
103 |
|
104 | /**
|
105 | * Run next item in queue
|
106 | * @returns {boolean} - `true` if was able to run an item, `false` if not
|
107 | */
|
108 | function runNext() {
|
109 | if (this.locked) return false;
|
110 |
|
111 | var item = this.queue[0];
|
112 | if (!item) return false;
|
113 |
|
114 | if (item.exclusive) {
|
115 | if (this.running) return false;
|
116 | this.locked = true;
|
117 | }
|
118 |
|
119 | this.queue.shift();
|
120 |
|
121 | this.running++;
|
122 |
|
123 | var self = this;
|
124 | item.fn.call(item.ctx).then(function(res) {
|
125 | runDone.call(self, item, true, res);
|
126 | }, function(err) {
|
127 | runDone.call(self, item, false, err);
|
128 | });
|
129 |
|
130 | return true;
|
131 | }
|
132 |
|
133 | /**
|
134 | * Run complete.
|
135 | * Update state, resolve deferred promise, and run queue again.
|
136 | * @param {Object} - Queue item
|
137 | * @param {boolean} - `true` if function resolved, `false` if rejected
|
138 | * @param {*} - Result of function (resolve value or reject reason)
|
139 | * @returns {undefined}
|
140 | */
|
141 | function runDone(item, resolved, res) {
|
142 | // Adjust state of lock
|
143 | this.running--;
|
144 | if (this.locked) this.locked = false;
|
145 |
|
146 | // Resolve/reject promise
|
147 | item.deferred[resolved ? 'resolve' : 'reject'](res);
|
148 |
|
149 | // Run queue again
|
150 | runQueue.call(this);
|
151 | }
|