UNPKG

6.51 kBJavaScriptView Raw
1'use strict';
2
3var Queue = require('denque');
4var utils = require('./utils');
5var Command = require('./command');
6
7function Multi (client, args) {
8 this._client = client;
9 this.queue = new Queue();
10 var command, tmp_args;
11 if (args) { // Either undefined or an array. Fail hard if it's not an array
12 for (var i = 0; i < args.length; i++) {
13 command = args[i][0];
14 tmp_args = args[i].slice(1);
15 if (Array.isArray(command)) {
16 this[command[0]].apply(this, command.slice(1).concat(tmp_args));
17 } else {
18 this[command].apply(this, tmp_args);
19 }
20 }
21 }
22}
23
24function pipeline_transaction_command (self, command_obj, index) {
25 // Queueing is done first, then the commands are executed
26 var tmp = command_obj.callback;
27 command_obj.callback = function (err, reply) {
28 // Ignore the multi command. This is applied by node_redis and the user does not benefit by it
29 if (err && index !== -1) {
30 if (tmp) {
31 tmp(err);
32 }
33 err.position = index;
34 self.errors.push(err);
35 }
36 // Keep track of who wants buffer responses:
37 // By the time the callback is called the command_obj got the buffer_args attribute attached
38 self.wants_buffers[index] = command_obj.buffer_args;
39 command_obj.callback = tmp;
40 };
41 self._client.internal_send_command(command_obj);
42}
43
44Multi.prototype.exec_atomic = Multi.prototype.EXEC_ATOMIC = Multi.prototype.execAtomic = function exec_atomic (callback) {
45 if (this.queue.length < 2) {
46 return this.exec_batch(callback);
47 }
48 return this.exec(callback);
49};
50
51function multi_callback (self, err, replies) {
52 var i = 0, command_obj;
53
54 if (err) {
55 err.errors = self.errors;
56 if (self.callback) {
57 self.callback(err);
58 // Exclude connection errors so that those errors won't be emitted twice
59 } else if (err.code !== 'CONNECTION_BROKEN') {
60 self._client.emit('error', err);
61 }
62 return;
63 }
64
65 if (replies) {
66 while (command_obj = self.queue.shift()) {
67 if (replies[i] instanceof Error) {
68 var match = replies[i].message.match(utils.err_code);
69 // LUA script could return user errors that don't behave like all other errors!
70 if (match) {
71 replies[i].code = match[1];
72 }
73 replies[i].command = command_obj.command.toUpperCase();
74 if (typeof command_obj.callback === 'function') {
75 command_obj.callback(replies[i]);
76 }
77 } else {
78 // If we asked for strings, even in detect_buffers mode, then return strings:
79 replies[i] = self._client.handle_reply(replies[i], command_obj.command, self.wants_buffers[i]);
80 if (typeof command_obj.callback === 'function') {
81 command_obj.callback(null, replies[i]);
82 }
83 }
84 i++;
85 }
86 }
87
88 if (self.callback) {
89 self.callback(null, replies);
90 }
91}
92
93Multi.prototype.exec_transaction = function exec_transaction (callback) {
94 if (this.monitoring || this._client.monitoring) {
95 var err = new RangeError(
96 'Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.'
97 );
98 err.command = 'EXEC';
99 err.code = 'EXECABORT';
100 return utils.reply_in_order(this._client, callback, err);
101 }
102 var self = this;
103 var len = self.queue.length;
104 self.errors = [];
105 self.callback = callback;
106 self._client.cork();
107 self.wants_buffers = new Array(len);
108 pipeline_transaction_command(self, new Command('multi', []), -1);
109 // Drain queue, callback will catch 'QUEUED' or error
110 for (var index = 0; index < len; index++) {
111 // The commands may not be shifted off, since they are needed in the result handler
112 pipeline_transaction_command(self, self.queue.get(index), index);
113 }
114
115 self._client.internal_send_command(new Command('exec', [], function (err, replies) {
116 multi_callback(self, err, replies);
117 }));
118 self._client.uncork();
119 return !self._client.should_buffer;
120};
121
122function batch_callback (self, cb, i) {
123 return function batch_callback (err, res) {
124 if (err) {
125 self.results[i] = err;
126 // Add the position to the error
127 self.results[i].position = i;
128 } else {
129 self.results[i] = res;
130 }
131 cb(err, res);
132 };
133}
134
135Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function exec_batch (callback) {
136 var self = this;
137 var len = self.queue.length;
138 var index = 0;
139 var command_obj;
140 if (len === 0) {
141 utils.reply_in_order(self._client, callback, null, []);
142 return !self._client.should_buffer;
143 }
144 self._client.cork();
145 if (!callback) {
146 while (command_obj = self.queue.shift()) {
147 self._client.internal_send_command(command_obj);
148 }
149 self._client.uncork();
150 return !self._client.should_buffer;
151 }
152 var callback_without_own_cb = function (err, res) {
153 if (err) {
154 self.results.push(err);
155 // Add the position to the error
156 var i = self.results.length - 1;
157 self.results[i].position = i;
158 } else {
159 self.results.push(res);
160 }
161 // Do not emit an error here. Otherwise each error would result in one emit.
162 // The errors will be returned in the result anyway
163 };
164 var last_callback = function (cb) {
165 return function (err, res) {
166 cb(err, res);
167 callback(null, self.results);
168 };
169 };
170 self.results = [];
171 while (command_obj = self.queue.shift()) {
172 if (typeof command_obj.callback === 'function') {
173 command_obj.callback = batch_callback(self, command_obj.callback, index);
174 } else {
175 command_obj.callback = callback_without_own_cb;
176 }
177 if (typeof callback === 'function' && index === len - 1) {
178 command_obj.callback = last_callback(command_obj.callback);
179 }
180 this._client.internal_send_command(command_obj);
181 index++;
182 }
183 self._client.uncork();
184 return !self._client.should_buffer;
185};
186
187module.exports = Multi;