UNPKG

15 kBJavaScriptView Raw
1"use strict";
2
3
4exports.Connection = function ( backend, minLatency, maxLatency )
5{
6 var connection = this,
7 db = 0,
8
9 queue,
10 watch,
11 block,
12
13 timeout = 0,
14 state = NORMAL,
15 subs = 0;
16
17
18 this.push = function ( client, command, args, callback )
19 {
20 state ( client, prep ( command, args, callback ) );
21 };
22
23
24 //// Push a command to a normal connection.
25
26 function NORMAL ( client, entry )
27 {
28 var i, n, matches;
29
30
31 //// Transactions.
32
33 if ( entry.command === "WATCH" )
34 {
35 entry.override = function ()
36 {
37 var i, n = entry.args.length;
38 if ( !watch )
39 watch = {};
40 for ( i = 0; i < n; i ++ )
41 if ( !( entry.args [ i ] in watch ) )
42 watch [ entry.args [ i ] ] = backend.getRevision ( entry.args [ i ] );
43
44 return "OK";
45 };
46 }
47
48 else if ( entry.command === "UNWATCH" )
49 {
50 entry.override = function ()
51 {
52 watch = null;
53 return "OK";
54 };
55 }
56
57 else if ( entry.command === "DISCARD" )
58 {
59 if ( queue )
60 {
61 if ( !timeout )
62 timeout = setTimeout ( exec, randLat () );
63
64 for ( i = 0; i < queue.length; i ++ )
65 if ( queue [ i ].command === "MULTI" )
66 {
67 queue.splice ( i, queue.length );
68
69 //// This will substitute the DISCARD command with an UNWATCH,
70 //// hence the recursive call to this.push.
71
72 return this.push ( [ "UNWATCH" ], entry.callback );
73 }
74 }
75
76 entry.override = function () { return "OK"; };
77 }
78
79 else if ( entry.command === "MULTI" )
80 {
81 entry.override = function ( queue )
82 {
83 if ( !queue ) throw new Error ( 'WOOT! no queue.' );
84 var w = watch, key, entry, x = 0;
85 watch = null;
86 if ( w ) for ( key in w )
87 if ( backend.getRevision ( key ) !== w [ key ] )
88 {
89 //// Abort because of a change in the watched keyspace.
90
91 n = 0;
92 while (( entry = queue.shift () ))
93 {
94 if ( entry.command === "EXEC" )
95 {
96 entry.override = function ()
97 {
98 var i, out = [];
99 for ( i = 0; i < n; i ++ )
100 out [ i ] = null;
101
102 return out;
103 };
104
105 queue.unshift ( entry );
106 break;
107 }
108
109 n ++;
110 }
111
112 return "OK";
113 }
114
115 var replies = [];
116 var i, n = queue.length, cb = pushReply.bind ( replies );
117 for ( i = 0; i < n; i ++ )
118 {
119 entry = queue [ i ];
120 if ( entry.command !== "EXEC" )
121 {
122 //// Collect replies for the EXEC output.
123
124 entry.callback = cb;
125
126 //// Prevent blocking within a transaction.
127
128 delete entry.block;
129 }
130
131 else
132 {
133 //// Exec calls back with the entire reply list.
134
135 entry.override = entry.override.bind ( replies );
136 return "OK";
137 }
138 }
139
140 throw new Error ( "WOOT! Can't find the EXEC command in the queue." );
141 };
142
143 //// Prevent flushing before the exec.
144
145 if ( timeout )
146 {
147 clearTimeout ( timeout );
148 timeout = 0;
149 }
150
151 if ( queue )
152 queue.push ( entry );
153 else
154 queue = [ entry ];
155
156 return;
157 }
158
159 else if ( entry.command === "EXEC" )
160 {
161 entry.override = function ()
162 {
163 return this.join ? this : null;
164 };
165
166 if ( queue && !timeout )
167 timeout = setTimeout ( exec, randLat () );
168 }
169
170
171 //// Pubsub.
172
173 if (( matches = /^(P)?(UN)?SUBSCRIBE$/.exec ( entry.command ) ))
174 {
175 if( !client.$PUSHDELAY )
176 client.$PUSHDELAY = new Delay ( client, 'pushMessage', minLatency );
177
178 entry.override = function ()
179 {
180 var i, n = entry.args.length;
181
182 if ( n ) for ( i = 0; i < n; i ++ )
183 {
184 //// Unsubscribe.
185
186 if ( matches [ 2 ] )
187 subs = backend.unsub ( matches [ 1 ] ? true : false, entry.args [ i ], client.$PUSHDELAY );
188
189 //// Subscribe.
190
191 else
192 subs = backend.sub ( matches [ 1 ] ? true : false, entry.args [ i ], client.$PUSHDELAY );
193 }
194
195 else if ( matches [ 2 ] )
196 {
197 //// Unsubscribe from all.
198
199 subs = backend.unsub ( matches [ 1 ] ? true : false, null, client.$PUSHDELAY );
200 }
201
202 else
203 return new Error ( 'Wrong number of arguments for \'' + matches [ 0 ] + '\' command' );
204
205 if ( !subs )
206 state = NORMAL;
207
208 return "OK";
209 };
210
211 if ( !matches [ 2 ] )
212 state = SUBSCRIBED;
213 }
214
215
216 //// Connection.
217
218 if ( entry.command === 'QUIT' )
219 {
220 entry.override = function ()
221 {
222 if ( client.$PUSHDELAY )
223 {
224 //// Unsubscribe.
225
226 backend.unsub ( true, null, client.$PUSHDELAY );
227 backend.unsub ( false, null, client.$PUSHDELAY );
228 }
229
230 return "OK";
231 };
232
233 state = CLOSED;
234 }
235
236 else if (entry.command === 'SELECT')
237 {
238 entry.override = function()
239 {
240 var n = entry.args.length;
241 if (n !== 1)
242 return new Error("Wrong number of arguments for 'SELECT' command.");
243 var id = Number(entry.args[0]);
244 if ((!id && id !== 0) || id % 1 !== 0 || id < 0)
245 return new Error("invalid DB index");
246
247 db = id;
248 backend.selectDB(db);
249 return "OK";
250 }
251 }
252
253
254 //// Regular commands.
255
256 if ( queue )
257 queue.push ( entry );
258
259 else
260 {
261 queue = [ entry ];
262 timeout = setTimeout ( exec, randLat () );
263 }
264 };
265
266
267 //// Push a command to a subscribed connection.
268
269 function SUBSCRIBED ( client, entry )
270 {
271
272 //// Allow commands that modify the subscription set.
273
274 if ( /SUBSCRIBE|^QUIT/.test ( entry.command ) )
275 NORMAL ( client, entry );
276 else
277 throw new Error ( "fakeredis: Connection is in pub/sub mode (" + subs + " subscriptions)." );
278 }
279
280
281 //// Closed connection.
282
283 function CLOSED ( client, entry )
284 {
285 throw new Error ( "fakeredis: You've closed this connection with QUIT, cannot " + entry.command );
286 }
287
288
289 //// Blocked connection.
290
291 function BLOCKED ( client, entry )
292 {
293 if ( !block )
294 block = [ client, entry ];
295 else
296 block.push ( client, entry );
297 }
298
299
300 //// Execute everything in the queue sequentially.
301
302 function exec ()
303 {
304 timeout = 0;
305 var q = queue, entry, func, out, err, data, resp = [];
306 queue = null;
307
308 if ( connection.verbose )
309 console.log ( '\n' );
310
311 backend.selectDB(db);
312
313 if ( q ) while (( entry = q.shift () ))
314 {
315 if ( entry === 'SKIP' )
316 continue;
317
318 func = backend [ entry.command ];
319 out = null;
320
321 if ( connection.verbose )
322 console.log ( "fakeredis>", entry.command, entry.args.join ( ' ' ) );
323
324 if ( entry.override )
325 {
326 out = entry.override ( q );
327 err = out instanceof Error ? out : null;
328 data = out instanceof Error ? null : out;
329 }
330
331 else if ( !func || typeof func !== 'function' )
332 throw new Error ( 'WOOT! Wierd queue entry : ' + JSON.stringify ( entry ) + ' / ' + JSON.stringify ( q ) );
333
334 else if ( func.length && func.length !== entry.args.length )
335 {
336 err = new Error ( 'Wrong number of arguments for \'' + entry.command.toLowerCase () + '\' command' );
337 data = null;
338 }
339
340 else
341 {
342 out = func.apply ( backend, entry.args );
343 err = ( ( out && out.getError ) || null ) && new Error ( out.getError () );
344 data = err ? null : ( out && out.getStatus && out.getStatus () ) || out;
345
346 //// Block if necessary.
347
348 if ( entry.block && err === null && data === null )
349 {
350 if ( resp.length )
351 flush ( resp );
352
353 q.unshift ( entry );
354 queue = q;
355 state = BLOCKED;
356 backend.sub ( false, backend.UPDATE, connection );
357
358 if ( entry.block && typeof entry.block === 'number' )
359 setTimeout ( unblock.bind ( null, entry ), entry.block * 1000 );
360
361 return;
362 }
363 }
364
365 if ( !err && !data && typeof out === "undefined" )
366 throw new Error ( "WOOT! Backend returned undefined." );
367 if ( out && out.rev )
368 throw new Error ( "WOOT! Returning the whole keyspace entry." );
369
370 if ( data === true )
371 throw new Error ( "TRUE THAT! " + JSON.stringify ( entry ) );
372
373 data = fdata ( data );
374 if ( entry.callback )
375 resp.push ( entry.callback.bind ( null, err, data ) );
376 }
377
378 if ( connection.verbose )
379 console.log ( '\n' );
380
381 if ( resp.length )
382 flush ( resp );
383 }
384
385 function flush ( resp )
386 {
387 setTimeout
388 (
389 function ()
390 {
391 var i, n;
392
393 n = resp.length;
394 for ( i = 0; i < n; i ++ )
395 resp [ i ] ();
396 },
397 minLatency
398 );
399 }
400
401 function unblock ( entry )
402 {
403 if ( entry )
404 delete entry.block;
405
406 state = NORMAL;
407 exec ();
408
409 if ( state === NORMAL )
410 {
411 backend.unsub ( false, backend.UPDATE, connection );
412
413 var a = block, i, n = a && a.length;
414 block = null;
415 for ( i = 0; i < n; i += 2 )
416 NORMAL ( a [ i ], a [ i + 1 ] );
417 }
418 }
419
420 this.pushMessage = function ( type, channel, message )
421 {
422 //// Attempt to unblock on backend keyspace change.
423
424 unblock ();
425 }
426
427
428 //// Format data the way it comes out of node_redis.
429
430 function fdata ( data )
431 {
432 if ( typeof data !== 'object' && typeof data !== 'number' && typeof data !== 'string' )
433 throw new Error ( 'WOOT! Data is not an object/string/number : ' + data );
434
435 if ( data )
436 {
437 if ( typeof data === 'string' && !isNaN ( data ) )
438 data = Number ( data );
439
440 else if ( data.length && data.map )
441 data = data.map ( finnerdata );
442
443 else if ( typeof data === 'object' && !data.map )
444 throw new Error ( 'WOOT! Illegal object in data : ' + data );
445 }
446
447 return data;
448 }
449
450 function finnerdata ( data )
451 {
452 if ( typeof data !== 'object' && typeof data !== 'number' && typeof data !== 'string' )
453 throw new Error ( 'WOOT! Data is not an object/string/number : ' + data );
454
455 if ( data )
456 {
457 if ( typeof data === 'number' )
458 data = String ( data );
459
460 else if ( data.length && data.map )
461 data = data.map ( finnerdata );
462
463 else if ( typeof data === 'object' && !data.map )
464 throw new Error ( 'WOOT! Illegal object in data : ' + data );
465 }
466
467 return data;
468 }
469
470
471 //// Prepare command.
472
473 function prep ( command, args, callback )
474 {
475 var command = command.toUpperCase (),
476 args = args.map ( function ( arg ) { return String ( arg ); } ),
477 block = false;
478
479 if ( /^B[LR]POP/.test ( command ) && args.length ) // Backend will validate the timeout param more robustly.
480 block = parseInt ( args [ args.length - 1 ] ) || true;
481
482 if ( !backend [ command ] )
483 throw new Error ( "fakeredis: " + command + " is not implemented in fakeredis. Let me know if you need it." );
484
485 return { command : command, args : args, callback : callback, block : block };
486 }
487
488
489 //// Helper to push replies onto the replies list.
490
491 function pushReply ( err, data )
492 {
493 this.push ( fdata ( data ) );
494 }
495
496
497 //// Immitate latency.
498
499 minLatency = Math.ceil ( minLatency || 15 );
500 maxLatency = Math.ceil ( maxLatency || minLatency * 3 );
501
502 if ( maxLatency < minLatency || minLatency < 0 )
503 throw new Error ( "Bad min/max latency settings." );
504
505 function randLat ()
506 {
507 return Math.ceil ( ( maxLatency - minLatency ) * Math.random () + minLatency );
508 }
509
510};
511
512
513
514function Delay ( object, method, delay )
515{
516 var queue,
517 flush;
518
519 this [ method ] = function ()
520 {
521 if ( !queue )
522 {
523 queue = [ arguments ];
524 setTimeout ( flush, delay );
525 }
526 else
527 queue.push ( arguments );
528 };
529
530 flush = function ()
531 {
532 var q = queue, i, n = q.length;
533 queue = null;
534
535 for ( i = 0; i < n; i ++ )
536 object [ method ].apply ( object, q [ i ] );
537 };
538
539}
540
541
542