UNPKG

22.8 kBJavaScriptView Raw
1'use strict';
2
3var utils = require('./utils');
4var debug = require('./debug');
5var Multi = require('./multi');
6var Command = require('./command');
7var no_password_is_set = /no password is set/;
8var loading = /LOADING/;
9var RedisClient = require('../').RedisClient;
10
11/********************************************************************************************
12 Replace built-in redis functions
13
14 The callback may be hooked as needed. The same does not apply to the rest of the function.
15 State should not be set outside of the callback if not absolutly necessary.
16 This is important to make sure it works the same as single command or in a multi context.
17 To make sure everything works with the offline queue use the "call_on_write" function.
18 This is going to be executed while writing to the stream.
19
20 TODO: Implement individal command generation as soon as possible to prevent divergent code
21 on single and multi calls!
22********************************************************************************************/
23
24RedisClient.prototype.multi = RedisClient.prototype.MULTI = function multi (args) {
25 var multi = new Multi(this, args);
26 multi.exec = multi.EXEC = multi.exec_transaction;
27 return multi;
28};
29
30// ATTENTION: This is not a native function but is still handled as a individual command as it behaves just the same as multi
31RedisClient.prototype.batch = RedisClient.prototype.BATCH = function batch (args) {
32 return new Multi(this, args);
33};
34
35function select_callback (self, db, callback) {
36 return function (err, res) {
37 if (err === null) {
38 // Store db in this.select_db to restore it on reconnect
39 self.selected_db = db;
40 }
41 utils.callback_or_emit(self, callback, err, res);
42 };
43}
44
45RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (db, callback) {
46 return this.internal_send_command(new Command('select', [db], select_callback(this, db, callback)));
47};
48
49Multi.prototype.select = Multi.prototype.SELECT = function select (db, callback) {
50 this.queue.push(new Command('select', [db], select_callback(this._client, db, callback)));
51 return this;
52};
53
54RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function monitor (callback) {
55 // Use a individual command, as this is a special case that does not has to be checked for any other command
56 var self = this;
57 var call_on_write = function () {
58 // Activating monitor mode has to happen before Redis returned the callback. The monitor result is returned first.
59 // Therefore we expect the command to be properly processed. If this is not the case, it's not an issue either.
60 self.monitoring = true;
61 };
62 return this.internal_send_command(new Command('monitor', [], callback, call_on_write));
63};
64
65// Only works with batch, not in a transaction
66Multi.prototype.monitor = Multi.prototype.MONITOR = function monitor (callback) {
67 // Use a individual command, as this is a special case that does not has to be checked for any other command
68 if (this.exec !== this.exec_transaction) {
69 var self = this;
70 var call_on_write = function () {
71 self._client.monitoring = true;
72 };
73 this.queue.push(new Command('monitor', [], callback, call_on_write));
74 return this;
75 }
76 // Set multi monitoring to indicate the exec that it should abort
77 // Remove this "hack" as soon as Redis might fix this
78 this.monitoring = true;
79 return this;
80};
81
82function quit_callback (self, callback) {
83 return function (err, res) {
84 if (err && err.code === 'NR_CLOSED') {
85 // Pretent the quit command worked properly in this case.
86 // Either the quit landed in the offline queue and was flushed at the reconnect
87 // or the offline queue is deactivated and the command was rejected right away
88 // or the stream is not writable
89 // or while sending the quit, the connection ended / closed
90 err = null;
91 res = 'OK';
92 }
93 utils.callback_or_emit(self, callback, err, res);
94 if (self.stream.writable) {
95 // If the socket is still alive, kill it. This could happen if quit got a NR_CLOSED error code
96 self.stream.destroy();
97 }
98 };
99}
100
101RedisClient.prototype.QUIT = RedisClient.prototype.quit = function quit (callback) {
102 // TODO: Consider this for v.3
103 // Allow the quit command to be fired as soon as possible to prevent it landing in the offline queue.
104 // this.ready = this.offline_queue.length === 0;
105 var backpressure_indicator = this.internal_send_command(new Command('quit', [], quit_callback(this, callback)));
106 // Calling quit should always end the connection, no matter if there's a connection or not
107 this.closing = true;
108 this.ready = false;
109 return backpressure_indicator;
110};
111
112// Only works with batch, not in a transaction
113Multi.prototype.QUIT = Multi.prototype.quit = function quit (callback) {
114 var self = this._client;
115 var call_on_write = function () {
116 // If called in a multi context, we expect redis is available
117 self.closing = true;
118 self.ready = false;
119 };
120 this.queue.push(new Command('quit', [], quit_callback(self, callback), call_on_write));
121 return this;
122};
123
124function info_callback (self, callback) {
125 return function (err, res) {
126 if (res) {
127 var obj = {};
128 var lines = res.toString().split('\r\n');
129 var line, parts, sub_parts;
130
131 for (var i = 0; i < lines.length; i++) {
132 parts = lines[i].split(':');
133 if (parts[1]) {
134 if (parts[0].indexOf('db') === 0) {
135 sub_parts = parts[1].split(',');
136 obj[parts[0]] = {};
137 while (line = sub_parts.pop()) {
138 line = line.split('=');
139 obj[parts[0]][line[0]] = +line[1];
140 }
141 } else {
142 obj[parts[0]] = parts[1];
143 }
144 }
145 }
146 obj.versions = [];
147 if (obj.redis_version) {
148 obj.redis_version.split('.').forEach(function (num) {
149 obj.versions.push(+num);
150 });
151 }
152 // Expose info key/vals to users
153 self.server_info = obj;
154 } else {
155 self.server_info = {};
156 }
157 utils.callback_or_emit(self, callback, err, res);
158 };
159}
160
161// Store info in this.server_info after each call
162RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section, callback) {
163 var args = [];
164 if (typeof section === 'function') {
165 callback = section;
166 } else if (section !== undefined) {
167 args = Array.isArray(section) ? section : [section];
168 }
169 return this.internal_send_command(new Command('info', args, info_callback(this, callback)));
170};
171
172Multi.prototype.info = Multi.prototype.INFO = function info (section, callback) {
173 var args = [];
174 if (typeof section === 'function') {
175 callback = section;
176 } else if (section !== undefined) {
177 args = Array.isArray(section) ? section : [section];
178 }
179 this.queue.push(new Command('info', args, info_callback(this._client, callback)));
180 return this;
181};
182
183function auth_callback (self, pass, callback) {
184 return function (err, res) {
185 if (err) {
186 if (no_password_is_set.test(err.message)) {
187 self.warn('Warning: Redis server does not require a password, but a password was supplied.');
188 err = null;
189 res = 'OK';
190 } else if (loading.test(err.message)) {
191 // If redis is still loading the db, it will not authenticate and everything else will fail
192 debug('Redis still loading, trying to authenticate later');
193 setTimeout(function () {
194 self.auth(pass, callback);
195 }, 100);
196 return;
197 }
198 }
199 utils.callback_or_emit(self, callback, err, res);
200 };
201}
202
203RedisClient.prototype.auth = RedisClient.prototype.AUTH = function auth (pass, callback) {
204 debug('Sending auth to ' + this.address + ' id ' + this.connection_id);
205
206 // Stash auth for connect and reconnect.
207 this.auth_pass = pass;
208 var ready = this.ready;
209 this.ready = ready || this.offline_queue.length === 0;
210 var tmp = this.internal_send_command(new Command('auth', [pass], auth_callback(this, pass, callback)));
211 this.ready = ready;
212 return tmp;
213};
214
215// Only works with batch, not in a transaction
216Multi.prototype.auth = Multi.prototype.AUTH = function auth (pass, callback) {
217 debug('Sending auth to ' + this.address + ' id ' + this.connection_id);
218
219 // Stash auth for connect and reconnect.
220 this.auth_pass = pass;
221 this.queue.push(new Command('auth', [pass], auth_callback(this._client, callback)));
222 return this;
223};
224
225RedisClient.prototype.client = RedisClient.prototype.CLIENT = function client () {
226 var arr,
227 len = arguments.length,
228 callback,
229 i = 0;
230 if (Array.isArray(arguments[0])) {
231 arr = arguments[0];
232 callback = arguments[1];
233 } else if (Array.isArray(arguments[1])) {
234 if (len === 3) {
235 callback = arguments[2];
236 }
237 len = arguments[1].length;
238 arr = new Array(len + 1);
239 arr[0] = arguments[0];
240 for (; i < len; i += 1) {
241 arr[i + 1] = arguments[1][i];
242 }
243 } else {
244 len = arguments.length;
245 // The later should not be the average use case
246 if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
247 len--;
248 callback = arguments[len];
249 }
250 arr = new Array(len);
251 for (; i < len; i += 1) {
252 arr[i] = arguments[i];
253 }
254 }
255 var self = this;
256 var call_on_write = undefined;
257 // CLIENT REPLY ON|OFF|SKIP
258 /* istanbul ignore next: TODO: Remove this as soon as Travis runs Redis 3.2 */
259 if (arr.length === 2 && arr[0].toString().toUpperCase() === 'REPLY') {
260 var reply_on_off = arr[1].toString().toUpperCase();
261 if (reply_on_off === 'ON' || reply_on_off === 'OFF' || reply_on_off === 'SKIP') {
262 call_on_write = function () {
263 self.reply = reply_on_off;
264 };
265 }
266 }
267 return this.internal_send_command(new Command('client', arr, callback, call_on_write));
268};
269
270Multi.prototype.client = Multi.prototype.CLIENT = function client () {
271 var arr,
272 len = arguments.length,
273 callback,
274 i = 0;
275 if (Array.isArray(arguments[0])) {
276 arr = arguments[0];
277 callback = arguments[1];
278 } else if (Array.isArray(arguments[1])) {
279 if (len === 3) {
280 callback = arguments[2];
281 }
282 len = arguments[1].length;
283 arr = new Array(len + 1);
284 arr[0] = arguments[0];
285 for (; i < len; i += 1) {
286 arr[i + 1] = arguments[1][i];
287 }
288 } else {
289 len = arguments.length;
290 // The later should not be the average use case
291 if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
292 len--;
293 callback = arguments[len];
294 }
295 arr = new Array(len);
296 for (; i < len; i += 1) {
297 arr[i] = arguments[i];
298 }
299 }
300 var self = this._client;
301 var call_on_write = undefined;
302 // CLIENT REPLY ON|OFF|SKIP
303 /* istanbul ignore next: TODO: Remove this as soon as Travis runs Redis 3.2 */
304 if (arr.length === 2 && arr[0].toString().toUpperCase() === 'REPLY') {
305 var reply_on_off = arr[1].toString().toUpperCase();
306 if (reply_on_off === 'ON' || reply_on_off === 'OFF' || reply_on_off === 'SKIP') {
307 call_on_write = function () {
308 self.reply = reply_on_off;
309 };
310 }
311 }
312 this.queue.push(new Command('client', arr, callback, call_on_write));
313 return this;
314};
315
316RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () {
317 var arr,
318 len = arguments.length,
319 callback,
320 i = 0;
321 if (Array.isArray(arguments[0])) {
322 arr = arguments[0];
323 callback = arguments[1];
324 } else if (Array.isArray(arguments[1])) {
325 if (len === 3) {
326 callback = arguments[2];
327 }
328 len = arguments[1].length;
329 arr = new Array(len + 1);
330 arr[0] = arguments[0];
331 for (; i < len; i += 1) {
332 arr[i + 1] = arguments[1][i];
333 }
334 } else if (typeof arguments[1] === 'object' && (arguments.length === 2 || arguments.length === 3 && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined'))) {
335 arr = [arguments[0]];
336 for (var field in arguments[1]) {
337 arr.push(field, arguments[1][field]);
338 }
339 callback = arguments[2];
340 } else {
341 len = arguments.length;
342 // The later should not be the average use case
343 if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
344 len--;
345 callback = arguments[len];
346 }
347 arr = new Array(len);
348 for (; i < len; i += 1) {
349 arr[i] = arguments[i];
350 }
351 }
352 return this.internal_send_command(new Command('hmset', arr, callback));
353};
354
355Multi.prototype.hmset = Multi.prototype.HMSET = function hmset () {
356 var arr,
357 len = arguments.length,
358 callback,
359 i = 0;
360 if (Array.isArray(arguments[0])) {
361 arr = arguments[0];
362 callback = arguments[1];
363 } else if (Array.isArray(arguments[1])) {
364 if (len === 3) {
365 callback = arguments[2];
366 }
367 len = arguments[1].length;
368 arr = new Array(len + 1);
369 arr[0] = arguments[0];
370 for (; i < len; i += 1) {
371 arr[i + 1] = arguments[1][i];
372 }
373 } else if (typeof arguments[1] === 'object' && (arguments.length === 2 || arguments.length === 3 && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined'))) {
374 arr = [arguments[0]];
375 for (var field in arguments[1]) {
376 arr.push(field, arguments[1][field]);
377 }
378 callback = arguments[2];
379 } else {
380 len = arguments.length;
381 // The later should not be the average use case
382 if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
383 len--;
384 callback = arguments[len];
385 }
386 arr = new Array(len);
387 for (; i < len; i += 1) {
388 arr[i] = arguments[i];
389 }
390 }
391 this.queue.push(new Command('hmset', arr, callback));
392 return this;
393};
394
395RedisClient.prototype.subscribe = RedisClient.prototype.SUBSCRIBE = function subscribe () {
396 var arr,
397 len = arguments.length,
398 callback,
399 i = 0;
400 if (Array.isArray(arguments[0])) {
401 arr = arguments[0];
402 callback = arguments[1];
403 } else {
404 len = arguments.length;
405 // The later should not be the average use case
406 if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
407 len--;
408 callback = arguments[len];
409 }
410 arr = new Array(len);
411 for (; i < len; i += 1) {
412 arr[i] = arguments[i];
413 }
414 }
415 var self = this;
416 var call_on_write = function () {
417 self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
418 };
419 return this.internal_send_command(new Command('subscribe', arr, callback, call_on_write));
420};
421
422Multi.prototype.subscribe = Multi.prototype.SUBSCRIBE = function subscribe () {
423 var arr,
424 len = arguments.length,
425 callback,
426 i = 0;
427 if (Array.isArray(arguments[0])) {
428 arr = arguments[0];
429 callback = arguments[1];
430 } else {
431 len = arguments.length;
432 // The later should not be the average use case
433 if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
434 len--;
435 callback = arguments[len];
436 }
437 arr = new Array(len);
438 for (; i < len; i += 1) {
439 arr[i] = arguments[i];
440 }
441 }
442 var self = this._client;
443 var call_on_write = function () {
444 self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
445 };
446 this.queue.push(new Command('subscribe', arr, callback, call_on_write));
447 return this;
448};
449
450RedisClient.prototype.unsubscribe = RedisClient.prototype.UNSUBSCRIBE = function unsubscribe () {
451 var arr,
452 len = arguments.length,
453 callback,
454 i = 0;
455 if (Array.isArray(arguments[0])) {
456 arr = arguments[0];
457 callback = arguments[1];
458 } else {
459 len = arguments.length;
460 // The later should not be the average use case
461 if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
462 len--;
463 callback = arguments[len];
464 }
465 arr = new Array(len);
466 for (; i < len; i += 1) {
467 arr[i] = arguments[i];
468 }
469 }
470 var self = this;
471 var call_on_write = function () {
472 // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
473 self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
474 };
475 return this.internal_send_command(new Command('unsubscribe', arr, callback, call_on_write));
476};
477
478Multi.prototype.unsubscribe = Multi.prototype.UNSUBSCRIBE = function unsubscribe () {
479 var arr,
480 len = arguments.length,
481 callback,
482 i = 0;
483 if (Array.isArray(arguments[0])) {
484 arr = arguments[0];
485 callback = arguments[1];
486 } else {
487 len = arguments.length;
488 // The later should not be the average use case
489 if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
490 len--;
491 callback = arguments[len];
492 }
493 arr = new Array(len);
494 for (; i < len; i += 1) {
495 arr[i] = arguments[i];
496 }
497 }
498 var self = this._client;
499 var call_on_write = function () {
500 // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
501 self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
502 };
503 this.queue.push(new Command('unsubscribe', arr, callback, call_on_write));
504 return this;
505};
506
507RedisClient.prototype.psubscribe = RedisClient.prototype.PSUBSCRIBE = function psubscribe () {
508 var arr,
509 len = arguments.length,
510 callback,
511 i = 0;
512 if (Array.isArray(arguments[0])) {
513 arr = arguments[0];
514 callback = arguments[1];
515 } else {
516 len = arguments.length;
517 // The later should not be the average use case
518 if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
519 len--;
520 callback = arguments[len];
521 }
522 arr = new Array(len);
523 for (; i < len; i += 1) {
524 arr[i] = arguments[i];
525 }
526 }
527 var self = this;
528 var call_on_write = function () {
529 self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
530 };
531 return this.internal_send_command(new Command('psubscribe', arr, callback, call_on_write));
532};
533
534Multi.prototype.psubscribe = Multi.prototype.PSUBSCRIBE = function psubscribe () {
535 var arr,
536 len = arguments.length,
537 callback,
538 i = 0;
539 if (Array.isArray(arguments[0])) {
540 arr = arguments[0];
541 callback = arguments[1];
542 } else {
543 len = arguments.length;
544 // The later should not be the average use case
545 if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
546 len--;
547 callback = arguments[len];
548 }
549 arr = new Array(len);
550 for (; i < len; i += 1) {
551 arr[i] = arguments[i];
552 }
553 }
554 var self = this._client;
555 var call_on_write = function () {
556 self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
557 };
558 this.queue.push(new Command('psubscribe', arr, callback, call_on_write));
559 return this;
560};
561
562RedisClient.prototype.punsubscribe = RedisClient.prototype.PUNSUBSCRIBE = function punsubscribe () {
563 var arr,
564 len = arguments.length,
565 callback,
566 i = 0;
567 if (Array.isArray(arguments[0])) {
568 arr = arguments[0];
569 callback = arguments[1];
570 } else {
571 len = arguments.length;
572 // The later should not be the average use case
573 if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
574 len--;
575 callback = arguments[len];
576 }
577 arr = new Array(len);
578 for (; i < len; i += 1) {
579 arr[i] = arguments[i];
580 }
581 }
582 var self = this;
583 var call_on_write = function () {
584 // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
585 self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
586 };
587 return this.internal_send_command(new Command('punsubscribe', arr, callback, call_on_write));
588};
589
590Multi.prototype.punsubscribe = Multi.prototype.PUNSUBSCRIBE = function punsubscribe () {
591 var arr,
592 len = arguments.length,
593 callback,
594 i = 0;
595 if (Array.isArray(arguments[0])) {
596 arr = arguments[0];
597 callback = arguments[1];
598 } else {
599 len = arguments.length;
600 // The later should not be the average use case
601 if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
602 len--;
603 callback = arguments[len];
604 }
605 arr = new Array(len);
606 for (; i < len; i += 1) {
607 arr[i] = arguments[i];
608 }
609 }
610 var self = this._client;
611 var call_on_write = function () {
612 // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
613 self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
614 };
615 this.queue.push(new Command('punsubscribe', arr, callback, call_on_write));
616 return this;
617};