1 | "use strict";
|
2 |
|
3 |
|
4 | exports.Connection = function ( backend, minLatency, maxLatency )
|
5 | {
|
6 | var connection = this,
|
7 | db = 0,
|
8 |
|
9 | queue,
|
10 | watch,
|
11 | block,
|
12 |
|
13 | timeout = 0,
|
14 | state = NORMAL,
|
15 | subs = 0;
|
16 |
|
17 |
|
18 | this.push = function ( client, command, args, callback )
|
19 | {
|
20 | state ( client, prep ( command, args, callback ) );
|
21 | };
|
22 |
|
23 |
|
24 |
|
25 |
|
26 | function NORMAL ( client, entry )
|
27 | {
|
28 | var i, n, matches;
|
29 |
|
30 |
|
31 |
|
32 |
|
33 | if ( entry.command === "WATCH" )
|
34 | {
|
35 | entry.override = function ()
|
36 | {
|
37 | var i, n = entry.args.length;
|
38 | if ( !watch )
|
39 | watch = {};
|
40 | for ( i = 0; i < n; i ++ )
|
41 | if ( !( entry.args [ i ] in watch ) )
|
42 | watch [ entry.args [ i ] ] = backend.getRevision ( entry.args [ i ] );
|
43 |
|
44 | return "OK";
|
45 | };
|
46 | }
|
47 |
|
48 | else if ( entry.command === "UNWATCH" )
|
49 | {
|
50 | entry.override = function ()
|
51 | {
|
52 | watch = null;
|
53 | return "OK";
|
54 | };
|
55 | }
|
56 |
|
57 | else if ( entry.command === "DISCARD" )
|
58 | {
|
59 | if ( queue )
|
60 | {
|
61 | if ( !timeout )
|
62 | timeout = setTimeout ( exec, randLat () );
|
63 |
|
64 | for ( i = 0; i < queue.length; i ++ )
|
65 | if ( queue [ i ].command === "MULTI" )
|
66 | {
|
67 | queue.splice ( i, queue.length );
|
68 |
|
69 |
|
70 |
|
71 |
|
72 | return this.push ( [ "UNWATCH" ], entry.callback );
|
73 | }
|
74 | }
|
75 |
|
76 | entry.override = function () { return "OK"; };
|
77 | }
|
78 |
|
79 | else if ( entry.command === "MULTI" )
|
80 | {
|
81 | entry.override = function ( queue )
|
82 | {
|
83 | if ( !queue ) throw new Error ( 'WOOT! no queue.' );
|
84 | var w = watch, key, entry, x = 0;
|
85 | watch = null;
|
86 | if ( w ) for ( key in w )
|
87 | if ( backend.getRevision ( key ) !== w [ key ] )
|
88 | {
|
89 |
|
90 |
|
91 | n = 0;
|
92 | while (( entry = queue.shift () ))
|
93 | {
|
94 | if ( entry.command === "EXEC" )
|
95 | {
|
96 | entry.override = function ()
|
97 | {
|
98 | var i, out = [];
|
99 | for ( i = 0; i < n; i ++ )
|
100 | out [ i ] = null;
|
101 |
|
102 | return out;
|
103 | };
|
104 |
|
105 | queue.unshift ( entry );
|
106 | break;
|
107 | }
|
108 |
|
109 | n ++;
|
110 | }
|
111 |
|
112 | return "OK";
|
113 | }
|
114 |
|
115 | var replies = [];
|
116 | var i, n = queue.length, cb = pushReply.bind ( replies );
|
117 | for ( i = 0; i < n; i ++ )
|
118 | {
|
119 | entry = queue [ i ];
|
120 | if ( entry.command !== "EXEC" )
|
121 | {
|
122 |
|
123 |
|
124 | entry.callback = cb;
|
125 |
|
126 |
|
127 |
|
128 | delete entry.block;
|
129 | }
|
130 |
|
131 | else
|
132 | {
|
133 |
|
134 |
|
135 | entry.override = entry.override.bind ( replies );
|
136 | return "OK";
|
137 | }
|
138 | }
|
139 |
|
140 | throw new Error ( "WOOT! Can't find the EXEC command in the queue." );
|
141 | };
|
142 |
|
143 |
|
144 |
|
145 | if ( timeout )
|
146 | {
|
147 | clearTimeout ( timeout );
|
148 | timeout = 0;
|
149 | }
|
150 |
|
151 | if ( queue )
|
152 | queue.push ( entry );
|
153 | else
|
154 | queue = [ entry ];
|
155 |
|
156 | return;
|
157 | }
|
158 |
|
159 | else if ( entry.command === "EXEC" )
|
160 | {
|
161 | entry.override = function ()
|
162 | {
|
163 | return this.join ? this : null;
|
164 | };
|
165 |
|
166 | if ( queue && !timeout )
|
167 | timeout = setTimeout ( exec, randLat () );
|
168 | }
|
169 |
|
170 |
|
171 |
|
172 |
|
173 | if (( matches = /^(P)?(UN)?SUBSCRIBE$/.exec ( entry.command ) ))
|
174 | {
|
175 | if( !client.$PUSHDELAY )
|
176 | client.$PUSHDELAY = new Delay ( client, 'pushMessage', minLatency );
|
177 |
|
178 | entry.override = function ()
|
179 | {
|
180 | var i, n = entry.args.length;
|
181 |
|
182 | if ( n ) for ( i = 0; i < n; i ++ )
|
183 | {
|
184 |
|
185 |
|
186 | if ( matches [ 2 ] )
|
187 | subs = backend.unsub ( matches [ 1 ] ? true : false, entry.args [ i ], client.$PUSHDELAY );
|
188 |
|
189 |
|
190 |
|
191 | else
|
192 | subs = backend.sub ( matches [ 1 ] ? true : false, entry.args [ i ], client.$PUSHDELAY );
|
193 | }
|
194 |
|
195 | else if ( matches [ 2 ] )
|
196 | {
|
197 |
|
198 |
|
199 | subs = backend.unsub ( matches [ 1 ] ? true : false, null, client.$PUSHDELAY );
|
200 | }
|
201 |
|
202 | else
|
203 | return new Error ( 'Wrong number of arguments for \'' + matches [ 0 ] + '\' command' );
|
204 |
|
205 | if ( !subs )
|
206 | state = NORMAL;
|
207 |
|
208 | return "OK";
|
209 | };
|
210 |
|
211 | if ( !matches [ 2 ] )
|
212 | state = SUBSCRIBED;
|
213 | }
|
214 |
|
215 |
|
216 |
|
217 |
|
218 | if ( entry.command === 'QUIT' )
|
219 | {
|
220 | entry.override = function ()
|
221 | {
|
222 | if ( client.$PUSHDELAY )
|
223 | {
|
224 |
|
225 |
|
226 | backend.unsub ( true, null, client.$PUSHDELAY );
|
227 | backend.unsub ( false, null, client.$PUSHDELAY );
|
228 | }
|
229 |
|
230 | return "OK";
|
231 | };
|
232 |
|
233 | state = CLOSED;
|
234 | }
|
235 |
|
236 | else if (entry.command === 'SELECT')
|
237 | {
|
238 | entry.override = function()
|
239 | {
|
240 | var n = entry.args.length;
|
241 | if (n !== 1)
|
242 | return new Error("Wrong number of arguments for 'SELECT' command.");
|
243 | var id = Number(entry.args[0]);
|
244 | if ((!id && id !== 0) || id % 1 !== 0 || id < 0)
|
245 | return new Error("invalid DB index");
|
246 |
|
247 | db = id;
|
248 | backend.selectDB(db);
|
249 | return "OK";
|
250 | }
|
251 | }
|
252 |
|
253 |
|
254 |
|
255 |
|
256 | if ( queue )
|
257 | queue.push ( entry );
|
258 |
|
259 | else
|
260 | {
|
261 | queue = [ entry ];
|
262 | timeout = setTimeout ( exec, randLat () );
|
263 | }
|
264 | };
|
265 |
|
266 |
|
267 |
|
268 |
|
269 | function SUBSCRIBED ( client, entry )
|
270 | {
|
271 |
|
272 |
|
273 |
|
274 | if ( /SUBSCRIBE|^QUIT/.test ( entry.command ) )
|
275 | NORMAL ( client, entry );
|
276 | else
|
277 | throw new Error ( "fakeredis: Connection is in pub/sub mode (" + subs + " subscriptions)." );
|
278 | }
|
279 |
|
280 |
|
281 |
|
282 |
|
283 | function CLOSED ( client, entry )
|
284 | {
|
285 | throw new Error ( "fakeredis: You've closed this connection with QUIT, cannot " + entry.command );
|
286 | }
|
287 |
|
288 |
|
289 |
|
290 |
|
291 | function BLOCKED ( client, entry )
|
292 | {
|
293 | if ( !block )
|
294 | block = [ client, entry ];
|
295 | else
|
296 | block.push ( client, entry );
|
297 | }
|
298 |
|
299 |
|
300 |
|
301 |
|
302 | function exec ()
|
303 | {
|
304 | timeout = 0;
|
305 | var q = queue, entry, func, out, err, data, resp = [];
|
306 | queue = null;
|
307 |
|
308 | if ( connection.verbose )
|
309 | console.log ( '\n' );
|
310 |
|
311 | backend.selectDB(db);
|
312 |
|
313 | if ( q ) while (( entry = q.shift () ))
|
314 | {
|
315 | if ( entry === 'SKIP' )
|
316 | continue;
|
317 |
|
318 | func = backend [ entry.command ];
|
319 | out = null;
|
320 |
|
321 | if ( connection.verbose )
|
322 | console.log ( "fakeredis>", entry.command, entry.args.join ( ' ' ) );
|
323 |
|
324 | if ( entry.override )
|
325 | {
|
326 | out = entry.override ( q );
|
327 | err = out instanceof Error ? out : null;
|
328 | data = out instanceof Error ? null : out;
|
329 | }
|
330 |
|
331 | else if ( !func || typeof func !== 'function' )
|
332 | throw new Error ( 'WOOT! Wierd queue entry : ' + JSON.stringify ( entry ) + ' / ' + JSON.stringify ( q ) );
|
333 |
|
334 | else if ( func.length && func.length !== entry.args.length )
|
335 | {
|
336 | err = new Error ( 'Wrong number of arguments for \'' + entry.command.toLowerCase () + '\' command' );
|
337 | data = null;
|
338 | }
|
339 |
|
340 | else
|
341 | {
|
342 | out = func.apply ( backend, entry.args );
|
343 | err = ( ( out && out.getError ) || null ) && new Error ( out.getError () );
|
344 | data = err ? null : ( out && out.getStatus && out.getStatus () ) || out;
|
345 |
|
346 |
|
347 |
|
348 | if ( entry.block && err === null && data === null )
|
349 | {
|
350 | if ( resp.length )
|
351 | flush ( resp );
|
352 |
|
353 | q.unshift ( entry );
|
354 | queue = q;
|
355 | state = BLOCKED;
|
356 | backend.sub ( false, backend.UPDATE, connection );
|
357 |
|
358 | if ( entry.block && typeof entry.block === 'number' )
|
359 | setTimeout ( unblock.bind ( null, entry ), entry.block * 1000 );
|
360 |
|
361 | return;
|
362 | }
|
363 | }
|
364 |
|
365 | if ( !err && !data && typeof out === "undefined" )
|
366 | throw new Error ( "WOOT! Backend returned undefined." );
|
367 | if ( out && out.rev )
|
368 | throw new Error ( "WOOT! Returning the whole keyspace entry." );
|
369 |
|
370 | if ( data === true )
|
371 | throw new Error ( "TRUE THAT! " + JSON.stringify ( entry ) );
|
372 |
|
373 | data = fdata ( data );
|
374 | if ( entry.callback )
|
375 | resp.push ( entry.callback.bind ( null, err, data ) );
|
376 | }
|
377 |
|
378 | if ( connection.verbose )
|
379 | console.log ( '\n' );
|
380 |
|
381 | if ( resp.length )
|
382 | flush ( resp );
|
383 | }
|
384 |
|
385 | function flush ( resp )
|
386 | {
|
387 | setTimeout
|
388 | (
|
389 | function ()
|
390 | {
|
391 | var i, n;
|
392 |
|
393 | n = resp.length;
|
394 | for ( i = 0; i < n; i ++ )
|
395 | resp [ i ] ();
|
396 | },
|
397 | minLatency
|
398 | );
|
399 | }
|
400 |
|
401 | function unblock ( entry )
|
402 | {
|
403 | if ( entry )
|
404 | delete entry.block;
|
405 |
|
406 | state = NORMAL;
|
407 | exec ();
|
408 |
|
409 | if ( state === NORMAL )
|
410 | {
|
411 | backend.unsub ( false, backend.UPDATE, connection );
|
412 |
|
413 | var a = block, i, n = a && a.length;
|
414 | block = null;
|
415 | for ( i = 0; i < n; i += 2 )
|
416 | NORMAL ( a [ i ], a [ i + 1 ] );
|
417 | }
|
418 | }
|
419 |
|
420 | this.pushMessage = function ( type, channel, message )
|
421 | {
|
422 |
|
423 |
|
424 | unblock ();
|
425 | }
|
426 |
|
427 |
|
428 |
|
429 |
|
430 | function fdata ( data )
|
431 | {
|
432 | if ( typeof data !== 'object' && typeof data !== 'number' && typeof data !== 'string' )
|
433 | throw new Error ( 'WOOT! Data is not an object/string/number : ' + data );
|
434 |
|
435 | if ( data )
|
436 | {
|
437 | if ( typeof data === 'string' && !isNaN ( data ) )
|
438 | data = Number ( data );
|
439 |
|
440 | else if ( data.length && data.map )
|
441 | data = data.map ( finnerdata );
|
442 |
|
443 | else if ( typeof data === 'object' && !data.map )
|
444 | throw new Error ( 'WOOT! Illegal object in data : ' + data );
|
445 | }
|
446 |
|
447 | return data;
|
448 | }
|
449 |
|
450 | function finnerdata ( data )
|
451 | {
|
452 | if ( typeof data !== 'object' && typeof data !== 'number' && typeof data !== 'string' )
|
453 | throw new Error ( 'WOOT! Data is not an object/string/number : ' + data );
|
454 |
|
455 | if ( data )
|
456 | {
|
457 | if ( typeof data === 'number' )
|
458 | data = String ( data );
|
459 |
|
460 | else if ( data.length && data.map )
|
461 | data = data.map ( finnerdata );
|
462 |
|
463 | else if ( typeof data === 'object' && !data.map )
|
464 | throw new Error ( 'WOOT! Illegal object in data : ' + data );
|
465 | }
|
466 |
|
467 | return data;
|
468 | }
|
469 |
|
470 |
|
471 |
|
472 |
|
473 | function prep ( command, args, callback )
|
474 | {
|
475 | var command = command.toUpperCase (),
|
476 | args = args.map ( function ( arg ) { return String ( arg ); } ),
|
477 | block = false;
|
478 |
|
479 | if ( /^B[LR]POP/.test ( command ) && args.length )
|
480 | block = parseInt ( args [ args.length - 1 ] ) || true;
|
481 |
|
482 | if ( !backend [ command ] )
|
483 | throw new Error ( "fakeredis: " + command + " is not implemented in fakeredis. Let me know if you need it." );
|
484 |
|
485 | return { command : command, args : args, callback : callback, block : block };
|
486 | }
|
487 |
|
488 |
|
489 |
|
490 |
|
491 | function pushReply ( err, data )
|
492 | {
|
493 | this.push ( fdata ( data ) );
|
494 | }
|
495 |
|
496 |
|
497 |
|
498 |
|
499 | minLatency = Math.ceil ( minLatency || 15 );
|
500 | maxLatency = Math.ceil ( maxLatency || minLatency * 3 );
|
501 |
|
502 | if ( maxLatency < minLatency || minLatency < 0 )
|
503 | throw new Error ( "Bad min/max latency settings." );
|
504 |
|
505 | function randLat ()
|
506 | {
|
507 | return Math.ceil ( ( maxLatency - minLatency ) * Math.random () + minLatency );
|
508 | }
|
509 |
|
510 | };
|
511 |
|
512 |
|
513 |
|
514 | function Delay ( object, method, delay )
|
515 | {
|
516 | var queue,
|
517 | flush;
|
518 |
|
519 | this [ method ] = function ()
|
520 | {
|
521 | if ( !queue )
|
522 | {
|
523 | queue = [ arguments ];
|
524 | setTimeout ( flush, delay );
|
525 | }
|
526 | else
|
527 | queue.push ( arguments );
|
528 | };
|
529 |
|
530 | flush = function ()
|
531 | {
|
532 | var q = queue, i, n = q.length;
|
533 | queue = null;
|
534 |
|
535 | for ( i = 0; i < n; i ++ )
|
536 | object [ method ].apply ( object, q [ i ] );
|
537 | };
|
538 |
|
539 | }
|
540 |
|
541 |
|
542 |
|