UNPKG

9.04 kBJavaScriptView Raw
1'use strict';
2
3var util = require('util');
4var Promise = require('bluebird');
5var EventEmitter = require('events');
6
7// support the event library provided by node < 0.11.0
8if(typeof EventEmitter.EventEmitter === 'function')
9 EventEmitter = EventEmitter.EventEmitter;
10
11
12// constants
13var unlockScript = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end';
14var extendScript = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end';
15
16// defaults
17var defaults = {
18 driftFactor: 0.01,
19 retryCount: 3,
20 retryDelay: 200
21};
22
23
24
25
26
27// LockError
28// ---------
29// This error is returned when there is an error locking a resource.
30function LockError(message) {
31 Error.call(this);
32 Error.captureStackTrace(this, LockError);
33 this.name = 'LockError';
34 this.message = message || 'Failed to lock the resource.';
35}
36
37util.inherits(LockError, Error);
38
39
40
41
42
43
44// Lock
45// ----
46// An object of this type is returned when a resource is successfully locked. It contains
47// convenience methods `unlock` and `extend` which perform the associated Redlock method on
48// itself.
49function Lock(redlock, resource, value, expiration) {
50 this.redlock = redlock;
51 this.resource = resource;
52 this.value = value;
53 this.expiration = expiration;
54}
55
56Lock.prototype.unlock = function unlock(callback) {
57 return this.redlock.unlock(this, callback);
58};
59
60Lock.prototype.extend = function extend(ttl, callback) {
61 return this.redlock.extend(this, ttl, callback);
62};
63
64
65
66
67
68
69// Redlock
70// -------
71// A redlock object is instantiated with an array of at least one redis client and an optional
72// `options` object. Properties of the Redlock object should NOT be changed after it is first
73// used, as doing so could have unintended consequences for live locks.
74function Redlock(clients, options) {
75 // set default options
76 options = options || {};
77 this.driftFactor = typeof options.driftFactor === 'number' ? options.driftFactor : defaults.driftFactor;
78 this.retryCount = typeof options.retryCount === 'number' ? options.retryCount : defaults.retryCount;
79 this.retryDelay = typeof options.retryDelay === 'number' ? options.retryDelay : defaults.retryDelay;
80
81 // set the redis servers from additional arguments
82 this.servers = clients;
83 if(this.servers.length === 0)
84 throw new Error('Redlock must be instantiated with at least one redis server.');
85}
86
87// Inherit all the EventEmitter methods, like `on`, and `off`
88util.inherits(Redlock, EventEmitter);
89
90
91// Attach a reference to LockError per issue #7, which allows the application to use instanceof
92// to destinguish between error types.
93Redlock.LockError = LockError;
94
95
96// lock
97// ----
98// This method locks a resource using the redlock algorithm.
99//
100// ```js
101// redlock.lock(
102// 'some-resource', // the resource to lock
103// 2000, // ttl in ms
104// function(err, lock) { // callback function (optional)
105// ...
106// }
107// )
108// ```
109Redlock.prototype.acquire =
110Redlock.prototype.lock = function lock(resource, ttl, callback) {
111 return this._lock(resource, null, ttl, callback);
112};
113
114// lock
115// ----
116// This method locks a resource using the redlock algorithm,
117// and returns a bluebird disposer.
118//
119// ```js
120// using(
121// redlock.disposer(
122// 'some-resource', // the resource to lock
123// 2000 // ttl in ms
124// ),
125// function(lock) {
126// ...
127// }
128// );
129// ```
130Redlock.prototype.disposer = function disposer(resource, ttl, errorHandler) {
131 errorHandler = errorHandler || function(err) {};
132 return this._lock(resource, null, ttl).disposer(function(lock){
133 return lock.unlock().catch(errorHandler);
134 });
135};
136
137
138// unlock
139// ------
140// This method unlocks the provided lock from all servers still persisting it. It will fail
141// with an error if it is unable to release the lock on a quorum of nodes, but will make no
142// attempt to restore the lock on nodes that failed to release. It is safe to re-attempt an
143// unlock or to ignore the error, as the lock will automatically expire after its timeout.
144Redlock.prototype.release =
145Redlock.prototype.unlock = function unlock(lock, callback) {
146 var self = this;
147 return new Promise(function(resolve, reject) {
148
149 // invalidate the lock
150 lock.expiration = 0;
151
152 // the number of servers which have agreed to release this lock
153 var votes = 0;
154
155 // the number of votes needed for consensus
156 var quorum = Math.floor(self.servers.length / 2) + 1;
157
158 // the number of async redis calls still waiting to finish
159 var waiting = self.servers.length;
160
161 // release the lock on each server
162 self.servers.forEach(function(server){
163 server.eval(unlockScript, 1, lock.resource, lock.value, loop);
164 });
165
166 function loop(err, response) {
167 if(err) self.emit('clientError', err);
168
169 // - if the lock was released by this call, it will return 1
170 // - if the lock has already been released, it will return 0
171 // - it may have been re-acquired by another process
172 // - it may hava already been manually released
173 // - it may have expired
174 if(typeof response === 'number' && (response === 0 || response === 1))
175 votes++;
176
177 if(waiting-- > 1) return;
178
179 // SUCCESS: there is concensus and the lock is released
180 if(votes >= quorum)
181 return resolve();
182
183 // FAILURE: the lock could not be released
184 return reject(new LockError('Unable to fully release the lock on resource "' + lock.resource + '".'));
185 }
186 })
187
188 // optionally run callback
189 .nodeify(callback);
190};
191
192
193// extend
194// ------
195// This method extends a valid lock by the provided `ttl`.
196Redlock.prototype.extend = function extend(lock, ttl, callback) {
197 var self = this;
198
199 // the lock has expired
200 if(lock.expiration < Date.now())
201 return Promise.reject(new LockError('Cannot extend lock on resource "' + lock.resource + '" because the lock has already expired.')).nodeify(callback);
202
203 // extend the lock
204 return self._lock(lock.resource, lock.value, ttl)
205
206 // modify and return the original lock object
207 .then(function(extension){
208 lock.value = extension.value;
209 lock.expiration = extension.expiration;
210 return lock;
211 })
212
213 // optionally run callback
214 .nodeify(callback);
215};
216
217
218// _lock
219// -----
220// This method locks a resource using the redlock algorithm.
221//
222// ###Creating New Locks:
223//
224// ```js
225// redlock._lock(
226// 'some-resource', // the resource to lock
227// null, // no original lock value
228// 2000, // ttl in ms
229// function(err, lock) { // callback function (optional)
230// ...
231// }
232// )
233// ```
234//
235// ###Extending Existing Locks:
236//
237// ```js
238// redlock._lock(
239// 'some-resource', // the resource to lock
240// 'dkkk18g4gy39dx6r', // the value of the original lock
241// 2000, // ttl in ms
242// function(err, lock) { // callback function (optional)
243// ...
244// }
245// )
246// ```
247Redlock.prototype._lock = function _lock(resource, value, ttl, callback) {
248 var self = this;
249 return new Promise(function(resolve, reject) {
250 var request;
251
252 // the number of times we have attempted this lock
253 var attempts = 0;
254
255
256 // create a new lock
257 if(value === null) {
258 value = self._random();
259 request = function(server, loop){
260 return server.set(resource, value, 'NX', 'PX', ttl, loop);
261 };
262 }
263
264 // extend an existing lock
265 else {
266 request = function(server, loop){
267 return server.eval(extendScript, 1, resource, value, ttl, loop);
268 };
269 }
270
271 function attempt(){
272 attempts++;
273
274 // the time when this attempt started
275 var start = Date.now();
276
277 // the number of servers which have agreed to this lock
278 var votes = 0;
279
280 // the number of votes needed for consensus
281 var quorum = Math.floor(self.servers.length / 2) + 1;
282
283 // the number of async redis calls still waiting to finish
284 var waiting = self.servers.length;
285
286 function loop(err, response) {
287 if(err) self.emit('clientError', err);
288 if(response) votes++;
289 if(waiting-- > 1) return;
290
291 // Add 2 milliseconds to the drift to account for Redis expires precision, which is 1 ms,
292 // plus the configured allowable drift factor
293 var drift = Math.round(self.driftFactor * ttl) + 2;
294 var lock = new Lock(self, resource, value, start + ttl - drift);
295
296 // SUCCESS: there is concensus and the lock is not expired
297 if(votes >= quorum && lock.expiration > Date.now())
298 return resolve(lock);
299
300
301 // remove this lock from servers that voted for it
302 return lock.unlock(function(){
303
304 // RETRY
305 if(attempts <= self.retryCount)
306 return setTimeout(attempt, self.retryDelay);
307
308 // FAILED
309 return reject(new LockError('Exceeded ' + self.retryCount + ' attempts to lock the resource "' + resource + '".'));
310 });
311 }
312
313 return self.servers.forEach(function(server){
314 return request(server, loop);
315 });
316 }
317
318 return attempt();
319 })
320
321 // optionally run callback
322 .nodeify(callback);
323};
324
325
326Redlock.prototype._random = function _random(){
327 return Math.random().toString(36).slice(2);
328};
329
330
331module.exports = Redlock;