1 | 'use strict';
|
2 |
|
3 | var Queue = require('double-ended-queue');
|
4 | var utils = require('./utils');
|
5 | var Command = require('./command');
|
6 |
|
7 | function Multi (client, args) {
|
8 | this._client = client;
|
9 | this.queue = new Queue();
|
10 | var command, tmp_args;
|
11 | if (args) {
|
12 | for (var i = 0; i < args.length; i++) {
|
13 | command = args[i][0];
|
14 | tmp_args = args[i].slice(1);
|
15 | if (Array.isArray(command)) {
|
16 | this[command[0]].apply(this, command.slice(1).concat(tmp_args));
|
17 | } else {
|
18 | this[command].apply(this, tmp_args);
|
19 | }
|
20 | }
|
21 | }
|
22 | }
|
23 |
|
24 | function pipeline_transaction_command (self, command_obj, index) {
|
25 |
|
26 | var tmp = command_obj.callback;
|
27 | command_obj.callback = function (err, reply) {
|
28 |
|
29 | if (err && index !== -1) {
|
30 | if (tmp) {
|
31 | tmp(err);
|
32 | }
|
33 | err.position = index;
|
34 | self.errors.push(err);
|
35 | }
|
36 |
|
37 |
|
38 | self.wants_buffers[index] = command_obj.buffer_args;
|
39 | command_obj.callback = tmp;
|
40 | };
|
41 | self._client.internal_send_command(command_obj);
|
42 | }
|
43 |
|
44 | Multi.prototype.exec_atomic = Multi.prototype.EXEC_ATOMIC = Multi.prototype.execAtomic = function exec_atomic (callback) {
|
45 | if (this.queue.length < 2) {
|
46 | return this.exec_batch(callback);
|
47 | }
|
48 | return this.exec(callback);
|
49 | };
|
50 |
|
51 | function multi_callback (self, err, replies) {
|
52 | var i = 0, command_obj;
|
53 |
|
54 | if (err) {
|
55 | err.errors = self.errors;
|
56 | if (self.callback) {
|
57 | self.callback(err);
|
58 |
|
59 | } else if (err.code !== 'CONNECTION_BROKEN') {
|
60 | self._client.emit('error', err);
|
61 | }
|
62 | return;
|
63 | }
|
64 |
|
65 | if (replies) {
|
66 | while (command_obj = self.queue.shift()) {
|
67 | if (replies[i] instanceof Error) {
|
68 | var match = replies[i].message.match(utils.err_code);
|
69 |
|
70 | if (match) {
|
71 | replies[i].code = match[1];
|
72 | }
|
73 | replies[i].command = command_obj.command.toUpperCase();
|
74 | if (typeof command_obj.callback === 'function') {
|
75 | command_obj.callback(replies[i]);
|
76 | }
|
77 | } else {
|
78 |
|
79 | replies[i] = self._client.handle_reply(replies[i], command_obj.command, self.wants_buffers[i]);
|
80 | if (typeof command_obj.callback === 'function') {
|
81 | command_obj.callback(null, replies[i]);
|
82 | }
|
83 | }
|
84 | i++;
|
85 | }
|
86 | }
|
87 |
|
88 | if (self.callback) {
|
89 | self.callback(null, replies);
|
90 | }
|
91 | }
|
92 |
|
93 | Multi.prototype.exec_transaction = function exec_transaction (callback) {
|
94 | if (this.monitoring || this._client.monitoring) {
|
95 | var err = new RangeError(
|
96 | 'Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.'
|
97 | );
|
98 | err.command = 'EXEC';
|
99 | err.code = 'EXECABORT';
|
100 | return utils.reply_in_order(this._client, callback, err);
|
101 | }
|
102 | var self = this;
|
103 | var len = self.queue.length;
|
104 | self.errors = [];
|
105 | self.callback = callback;
|
106 | self._client.cork();
|
107 | self.wants_buffers = new Array(len);
|
108 | pipeline_transaction_command(self, new Command('multi', []), -1);
|
109 |
|
110 | for (var index = 0; index < len; index++) {
|
111 |
|
112 | pipeline_transaction_command(self, self.queue.get(index), index);
|
113 | }
|
114 |
|
115 | self._client.internal_send_command(new Command('exec', [], function (err, replies) {
|
116 | multi_callback(self, err, replies);
|
117 | }));
|
118 | self._client.uncork();
|
119 | return !self._client.should_buffer;
|
120 | };
|
121 |
|
122 | function batch_callback (self, cb, i) {
|
123 | return function batch_callback (err, res) {
|
124 | if (err) {
|
125 | self.results[i] = err;
|
126 |
|
127 | self.results[i].position = i;
|
128 | } else {
|
129 | self.results[i] = res;
|
130 | }
|
131 | cb(err, res);
|
132 | };
|
133 | }
|
134 |
|
135 | Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function exec_batch (callback) {
|
136 | var self = this;
|
137 | var len = self.queue.length;
|
138 | var index = 0;
|
139 | var command_obj;
|
140 | if (len === 0) {
|
141 | utils.reply_in_order(self._client, callback, null, []);
|
142 | return !self._client.should_buffer;
|
143 | }
|
144 | self._client.cork();
|
145 | if (!callback) {
|
146 | while (command_obj = self.queue.shift()) {
|
147 | self._client.internal_send_command(command_obj);
|
148 | }
|
149 | self._client.uncork();
|
150 | return !self._client.should_buffer;
|
151 | }
|
152 | var callback_without_own_cb = function (err, res) {
|
153 | if (err) {
|
154 | self.results.push(err);
|
155 |
|
156 | var i = self.results.length - 1;
|
157 | self.results[i].position = i;
|
158 | } else {
|
159 | self.results.push(res);
|
160 | }
|
161 |
|
162 |
|
163 | };
|
164 | var last_callback = function (cb) {
|
165 | return function (err, res) {
|
166 | cb(err, res);
|
167 | callback(null, self.results);
|
168 | };
|
169 | };
|
170 | self.results = [];
|
171 | while (command_obj = self.queue.shift()) {
|
172 | if (typeof command_obj.callback === 'function') {
|
173 | command_obj.callback = batch_callback(self, command_obj.callback, index);
|
174 | } else {
|
175 | command_obj.callback = callback_without_own_cb;
|
176 | }
|
177 | if (typeof callback === 'function' && index === len - 1) {
|
178 | command_obj.callback = last_callback(command_obj.callback);
|
179 | }
|
180 | this._client.internal_send_command(command_obj);
|
181 | index++;
|
182 | }
|
183 | self._client.uncork();
|
184 | return !self._client.should_buffer;
|
185 | };
|
186 |
|
187 | module.exports = Multi;
|