1 | var dyno= require('./lib/dyn.js'),
|
2 | diff = require('deep-diff').diff,
|
3 | uuid= require('node-uuid').v4,
|
4 | _= require('underscore'),
|
5 | cclone= require('circularclone'),
|
6 | async= require('async');
|
7 |
|
8 | var _parser= require('./lib/parser'),
|
9 | _finder= require('./lib/finder'),
|
10 | _refiner= require('./lib/refiner'),
|
11 | _index= require('./lib/indexer'),
|
12 | _modify= require('./lib/capacity');
|
13 |
|
14 | const _nu= function (v)
|
15 | {
|
16 | return typeof v!='undefined';
|
17 | },
|
18 | _isobject= function (v)
|
19 | {
|
20 | return typeof v=='object'&&!Array.isArray(v);
|
21 | },
|
22 | _isobjectarr= function (v)
|
23 | {
|
24 | return typeof v=='object'&&Array.isArray(v)&&v.length>0&&typeof v[0]=='object';
|
25 | },
|
26 | _collect= function (consume)
|
27 | {
|
28 | return function (cons)
|
29 | {
|
30 | var c;
|
31 |
|
32 | if (cons.table)
|
33 | {
|
34 | if (!(c=consume[cons.table]))
|
35 | c= consume[cons.table]= { read: 0, write: 0 };
|
36 |
|
37 | c.read+= cons.read;
|
38 | c.write+= cons.write;
|
39 | }
|
40 | else
|
41 | _.keys(cons).forEach(function (table)
|
42 | {
|
43 | var c, tcons= cons[table];
|
44 |
|
45 | if (!(c=consume[table]))
|
46 | c= consume[table]= { read: 0, write: 0 };
|
47 |
|
48 | c.read+= tcons.read;
|
49 | c.write+= tcons.write;
|
50 | });
|
51 | };
|
52 | };
|
53 |
|
54 | var dyngo= module.exports= function (opts,cb)
|
55 | {
|
56 | var defaults= { hints: true };
|
57 |
|
58 | if (!cb)
|
59 | {
|
60 | cb= opts;
|
61 | opts= defaults;
|
62 | }
|
63 |
|
64 | opts= opts || defaults;
|
65 | opts= _.defaults(opts,defaults);
|
66 |
|
67 | var dyn= dyno(opts.dynamo,_.extend(opts.tx || {},{ txTable: opts.txTable })),
|
68 | finder= _finder(dyn),
|
69 | parser= _parser(dyn,opts),
|
70 | db= _.extend({ _dyn: dyn },opts.tx,{ txTable: opts.txTable }),
|
71 | _alias= function (table)
|
72 | {
|
73 | return (opts.tables || {} )[table] || table;
|
74 | };
|
75 |
|
76 | db.cleanup= function (obj)
|
77 | {
|
78 | var p= dyn.promise('clean');
|
79 |
|
80 | process.nextTick(function ()
|
81 | {
|
82 | p.trigger.clean(cclone(obj,function (key,value)
|
83 | {
|
84 | if (key.indexOf&&key.indexOf('_')==0&&key!='_id')
|
85 | return undefined;
|
86 | else
|
87 | return value;
|
88 | }));
|
89 | });
|
90 |
|
91 | return p;
|
92 | };
|
93 |
|
94 |
|
95 | var configureTable= function (table)
|
96 | {
|
97 | table.ensuredIndexes= [];
|
98 |
|
99 | table.find= function (cond,projection,identity)
|
100 | {
|
101 | var p, modifiers= {}, table= this;
|
102 |
|
103 | modifiers.$consistent= !!table.$consistent;
|
104 |
|
105 | p= dyn.promise(['results','count','end'],null,'consumed');
|
106 |
|
107 | process.nextTick(function ()
|
108 | {
|
109 | parser
|
110 | .parse(table,modifiers,cond,projection,identity)
|
111 | .parsed(function (query)
|
112 | {
|
113 | refiner= _refiner(dyn,query),
|
114 | cursor= finder.find(query);
|
115 | cursor.chain(refiner);
|
116 | refiner.chain(p);
|
117 | })
|
118 | .error(p.trigger.error);
|
119 | });
|
120 |
|
121 | p.sort= function (o)
|
122 | {
|
123 | modifiers.orderby= o;
|
124 | return p;
|
125 | };
|
126 |
|
127 | p.limit= function (n)
|
128 | {
|
129 | modifiers.limit= n;
|
130 | return p;
|
131 | };
|
132 |
|
133 | p.window= function (n)
|
134 | {
|
135 | modifiers.window= n;
|
136 | return p;
|
137 | };
|
138 |
|
139 | p.skip= function (n)
|
140 | {
|
141 | modifiers.skip= n;
|
142 | return p;
|
143 | };
|
144 |
|
145 | p.noderef= function ()
|
146 | {
|
147 | modifiers.noderef= true;
|
148 | return p;
|
149 | };
|
150 |
|
151 | var origCount= _.bind(p.count,p);
|
152 |
|
153 | p.count= function (fn)
|
154 | {
|
155 | if (fn)
|
156 | origCount(fn);
|
157 | else
|
158 | modifiers.count= true;
|
159 |
|
160 | return p;
|
161 | };
|
162 |
|
163 | return p;
|
164 | };
|
165 |
|
166 | table.findOne= function ()
|
167 | {
|
168 | var p, args= arguments, table= this;
|
169 |
|
170 | p= dyn.promise('result','notfound','consumed');
|
171 |
|
172 | table.find.apply(table,args).limit(1).results(function (items)
|
173 | {
|
174 | if (items.length==0)
|
175 | p.trigger.notfound();
|
176 | else
|
177 | p.trigger.result(items[0]);
|
178 | })
|
179 | .consumed(p.trigger.consumed)
|
180 | .error(p.trigger.error);
|
181 |
|
182 | return p;
|
183 | };
|
184 |
|
185 | table.consistent= function ()
|
186 | {
|
187 | return _.extend({ $consistent: true }, table);
|
188 | };
|
189 |
|
190 | table.save= function (_obj,isCreate)
|
191 | {
|
192 | var objs= _obj ? (Array.isArray(_obj) ? _obj : [_obj]) : [],
|
193 | consume= {},
|
194 | p= dyn.promise(null,'updatedsinceread','consumed'), found= false;
|
195 |
|
196 | process.nextTick(function ()
|
197 | {
|
198 |
|
199 | async.forEach(objs,
|
200 | function (obj,done)
|
201 | {
|
202 | var gops= {},
|
203 | ops= gops[table._dynamo.TableName]= [],
|
204 | _hashrange= function (obj)
|
205 | {
|
206 | obj._id= obj._id || uuid();
|
207 | obj._pos= obj._pos || 0;
|
208 | obj._rev= (obj._rev || 0)+1;
|
209 | obj._refs= [];
|
210 | },
|
211 | _index= function (obj)
|
212 | {
|
213 | table.indexes.forEach(function (index)
|
214 | {
|
215 | var iops= index.update(obj,'put') || {};
|
216 |
|
217 | _.keys(iops).forEach(function (table)
|
218 | {
|
219 | var tops= gops[table]= gops[table] || [];
|
220 | tops.push.apply(tops,_.collect(iops[table],function (op) { op.index= true; return op; }));
|
221 | tops.index= true;
|
222 | });
|
223 | });
|
224 | },
|
225 | _remove= function (item)
|
226 | {
|
227 | ops.push({ op: 'del', item: { _id: item._id, _pos: item._pos } });
|
228 |
|
229 | table.indexes.forEach(function (index)
|
230 | {
|
231 | var iops= index.update(item,'del') || {};
|
232 |
|
233 | _.keys(iops).forEach(function (table)
|
234 | {
|
235 | var tops= gops[table]= gops[table] || [];
|
236 | tops.push.apply(tops,_.collect(iops[table],function (op) { op.index= true; return op; }));
|
237 | });
|
238 | });
|
239 | },
|
240 | _save= function (obj,isCreate)
|
241 | {
|
242 | var _keys= _.keys(obj),
|
243 | _omit= ['_old'],
|
244 | diffs= diff(obj._old || {},
|
245 | obj,
|
246 | function (path,key) { return key=='_old'; });
|
247 |
|
248 | if (!diffs || diffs.length==0
|
249 | || (obj._old || {_rev: 0})._rev<obj._rev
|
250 | ) return;
|
251 |
|
252 | _hashrange(obj);
|
253 |
|
254 | _keys.forEach(function (key)
|
255 | {
|
256 | if (key.indexOf('___')==0)
|
257 | {
|
258 | if (!_isobjectarr(obj[key.substring(3)]))
|
259 | {
|
260 | _omit.push(key);
|
261 | return;
|
262 | }
|
263 | }
|
264 | else
|
265 | if (key.indexOf('__')==0)
|
266 | {
|
267 | if (!_isobject(obj[key.substring(2)]))
|
268 | {
|
269 | _omit.push(key);
|
270 | return;
|
271 | }
|
272 | }
|
273 |
|
274 | var type= typeof obj[key];
|
275 |
|
276 | if (type=='object'&&!_.contains(['_old','_refs'],key))
|
277 | {
|
278 | var desc= obj[key];
|
279 |
|
280 | if (desc==null)
|
281 | _omit.push(key);
|
282 | else
|
283 | if (desc instanceof Date)
|
284 | { /* let dyn convert */ }
|
285 | else
|
286 | if (_.keys(desc).length==0)
|
287 | _omit.push(key);
|
288 | else
|
289 | if (Array.isArray(desc))
|
290 | {
|
291 | if (desc.length)
|
292 | {
|
293 | if (typeof desc[0]=='object')
|
294 | {
|
295 | var _id= obj['___'+key]= obj['___'+key] || uuid();
|
296 |
|
297 | if (obj._old)
|
298 | {
|
299 | var old= obj._old[key];
|
300 |
|
301 | if (old&&old.length>desc.length)
|
302 | old.forEach(function (oitem,idx)
|
303 | {
|
304 | if (oitem._id==_id)
|
305 | {
|
306 | if (!_.findWhere(desc,{ _pos: oitem._pos }))
|
307 | _remove(oitem);
|
308 | }
|
309 | else
|
310 | {
|
311 | if (!_.findWhere(desc,{ _pos: idx }))
|
312 | _remove({ _id: _id, _pos: idx, _ref: oitem._id+'$:$'+oitem._pos });
|
313 | }
|
314 | });
|
315 | }
|
316 |
|
317 | desc.forEach(function (val, pos)
|
318 | {
|
319 | if (val._id&&val._id!=_id)
|
320 | {
|
321 | _save(val);
|
322 | _save({ _id: _id, _pos: pos, _ref: val._id+'$:$'+val._pos });
|
323 | obj._refs.push(val._id);
|
324 | }
|
325 | else
|
326 | {
|
327 | val._id= _id;
|
328 |
|
329 | if (!isNaN(val._pos)&&val._pos!=pos)
|
330 | {
|
331 | delete val['_old'];
|
332 | delete val['_rev'];
|
333 | _remove(val);
|
334 | }
|
335 |
|
336 | val._pos= pos;
|
337 | _save(val);
|
338 | }
|
339 | });
|
340 |
|
341 | _omit.push(key);
|
342 | }
|
343 | }
|
344 | else
|
345 | {
|
346 | var _id= obj['___'+key];
|
347 |
|
348 | if (_id&&obj._old[key].length)
|
349 | obj._old[key].forEach(_remove);
|
350 |
|
351 | _omit.push(key);
|
352 | _omit.push('___'+key);
|
353 | }
|
354 | }
|
355 | else
|
356 | {
|
357 | _save(desc);
|
358 | obj['__'+key]= desc._id+'$:$'+desc._pos;
|
359 | obj._refs.push(desc._id);
|
360 | _omit.push(key);
|
361 | }
|
362 | }
|
363 | else
|
364 | if (type=='string'&&!obj[key])
|
365 | _omit.push(key);
|
366 | else
|
367 | if (type=='number'&&isNaN(obj[key]))
|
368 | _omit.push(key);
|
369 | });
|
370 |
|
371 | if (!obj._refs.length)
|
372 | delete obj['_refs'];
|
373 | else
|
374 | obj._refs= _.uniq(obj._refs);
|
375 |
|
376 | _index(obj); // index after _ fields are set so they are indexable too
|
377 |
|
378 | ops.unshift({ op: 'put', item: obj, omit: _omit, isCreate: isCreate }); // let the aggregate op came first of "contained" objects, so that the aggrgate version protects the rest
|
379 | },
|
380 | _mput= function (gops,done)
|
381 | {
|
382 | async.forEach(_.keys(gops),
|
383 | function (_table,done)
|
384 | {
|
385 | var tops= gops[_table];
|
386 |
|
387 | async.forEachSeries(tops, // forEachSeries: when deleting elements from array i need deletes of old item _pos done before new item _pos put
|
388 | function (op,done)
|
389 | {
|
390 | var tab= dyn.table(_table),
|
391 | obj= op.item;
|
392 |
|
393 | if (op.index)
|
394 | tab.hash('_hash',obj._hash)
|
395 | .range('_range',obj._range);
|
396 | else
|
397 | tab.hash('_id',obj._id)
|
398 | .range('_pos',obj._pos);
|
399 |
|
400 | if (op.op=='put')
|
401 | tab.put(_.omit(obj,op.omit),
|
402 | function ()
|
403 | {
|
404 | obj._old= cclone(_.omit(obj,'_old'));
|
405 | done();
|
406 | },
|
407 | { expected: obj._old&&_nu(obj._old._rev) ? { _rev: obj._old._rev } : undefined,
|
408 | exists: isCreate ? false : undefined })
|
409 | .consumed(_collect(consume))
|
410 | .error(done);
|
411 | else
|
412 | if (op.op=='del')
|
413 | tab.delete(done)
|
414 | .error(done);
|
415 | else
|
416 | done(new Error('unknown update type:'+op.op));
|
417 | },
|
418 | done);
|
419 | },
|
420 | done);
|
421 | };
|
422 |
|
423 |
|
424 | _save(obj,isCreate);
|
425 |
|
426 | _.keys(gops).forEach(function (table)
|
427 | {
|
428 | if (gops[table].length==0)
|
429 | delete gops[table];
|
430 | else
|
431 | found= true;
|
432 | });
|
433 |
|
434 | if (found)
|
435 | _mput(gops,done);
|
436 | else
|
437 | process.nextTick(done);
|
438 |
|
439 | },
|
440 | function (err)
|
441 | {
|
442 | p.trigger.consumed(consume);
|
443 |
|
444 | if (err)
|
445 | {
|
446 | if (err.code=='notfound')
|
447 | p.trigger.updatedsinceread();
|
448 | else
|
449 | p.trigger.error(err);
|
450 | }
|
451 | else
|
452 | p.trigger.success();
|
453 | });
|
454 |
|
455 | });
|
456 |
|
457 | return p;
|
458 | };
|
459 |
|
460 | table.create= function (obj)
|
461 | {
|
462 | var p= dyn.promise(null,'exists','consumed');
|
463 |
|
464 | table.save(obj,true)
|
465 | .success(p.trigger.success)
|
466 | .consumed(p.trigger.consumed)
|
467 | .error(function (err)
|
468 | {
|
469 | if (err.code=='found')
|
470 | p.trigger.exists();
|
471 | else
|
472 | p.trigger.error(err);
|
473 | });
|
474 |
|
475 | return p;
|
476 | };
|
477 |
|
478 | table.enableIndex= function (fields)
|
479 | {
|
480 | var index= _index(dyn,table,fields,opts);
|
481 | table.indexes.push(index);
|
482 | table.ensuredIndexes.push(fields);
|
483 | };
|
484 |
|
485 | table.ensureIndex= function (fields)
|
486 | {
|
487 | var p= dyn.promise();
|
488 |
|
489 | process.nextTick(function ()
|
490 | {
|
491 | var index= _index(dyn,table,fields,opts);
|
492 |
|
493 | if (index)
|
494 | index.ensure(function (err)
|
495 | {
|
496 | if (err)
|
497 | p.trigger.error(err);
|
498 | else
|
499 | {
|
500 | table.indexes.push(index);
|
501 | table.ensuredIndexes.push(fields);
|
502 | p.trigger.success();
|
503 | }
|
504 | });
|
505 | else
|
506 | p.trigger.error(new Error('no known index type can index those fields'));
|
507 | });
|
508 |
|
509 | return p;
|
510 | };
|
511 |
|
512 | table.remove= function (filter)
|
513 | {
|
514 | var p= dyn.promise(null,null,'consumed'),
|
515 | consume= {},
|
516 | _consumed= function (cons)
|
517 | {
|
518 | consume.read+= cons.read;
|
519 | consume.write+= cons.write;
|
520 | },
|
521 | _error= function (err)
|
522 | {
|
523 | p.trigger.consumed(consume);
|
524 | p.trigger.error(err);
|
525 | },
|
526 | _success= function ()
|
527 | {
|
528 | p.trigger.consumed(consume);
|
529 | p.trigger.success();
|
530 | },
|
531 | sync= dyn.syncResults(function (err)
|
532 | {
|
533 | if (err)
|
534 | _error(err);
|
535 | else
|
536 | _success();
|
537 | }),
|
538 | cursor= table.find(filter,table.indexes.length ? undefined : { _id: 1, _pos: 1 }),
|
539 | _deleteItem= function (obj,done)
|
540 | {
|
541 | async.parallel([
|
542 | function (done)
|
543 | {
|
544 | async.forEach(table.indexes,
|
545 | function (index,done)
|
546 | {
|
547 | index.remove(obj).success(done).error(done).consumed(_collect(consume));
|
548 | },done);
|
549 | },
|
550 | function (done)
|
551 | {
|
552 | dyn.table(table._dynamo.TableName)
|
553 | .hash('_id',obj._id)
|
554 | .range('_pos',obj._pos)
|
555 | .delete(done)
|
556 | .consumed(_collect(consume))
|
557 | .error(done);
|
558 | }],
|
559 | done);
|
560 | };
|
561 |
|
562 | cursor.results(sync.results(function (items,done)
|
563 | {
|
564 | async.forEach(items,_deleteItem,done);
|
565 | }))
|
566 | .consumed(_consumed)
|
567 | .error(_error)
|
568 | .end(sync.end);
|
569 |
|
570 | return p;
|
571 | };
|
572 |
|
573 | table.update= function (query,update)
|
574 | {
|
575 | var p= dyn.promise(null,null,'consumed'),
|
576 | cursor= table.consistent().find(query),
|
577 | consume= {},
|
578 | _consumed= function (cons)
|
579 | {
|
580 | _.keys(cons).forEach(function (table)
|
581 | {
|
582 | var c, tcons= cons[table];
|
583 |
|
584 | if (!(c=consume[table]))
|
585 | c= consume[table]= { read: 0, write: 0 };
|
586 |
|
587 | c.read+= tcons.read;
|
588 | c.write+= tcons.write;
|
589 | });
|
590 | },
|
591 | _error= function (err)
|
592 | {
|
593 | p.trigger.consumed(consume);
|
594 | p.trigger.error(err);
|
595 | },
|
596 | _success= function ()
|
597 | {
|
598 | p.trigger.consumed(consume);
|
599 | p.trigger.success();
|
600 | },
|
601 | sync= dyn.syncResults(function (err)
|
602 | {
|
603 | if (err)
|
604 | _error(err);
|
605 | else
|
606 | _success();
|
607 | }),
|
608 | _updateItem= function (item,done)
|
609 | {
|
610 | if (update.$set)
|
611 | table.save(_.extend(item,update.$set))
|
612 | .success(done)
|
613 | .consumed(_consumed)
|
614 | .error(done);
|
615 | else
|
616 | if (update.$unset)
|
617 | table.save(_.omit(item,_.keys(update.$unset)))
|
618 | .success(done)
|
619 | .consumed(_consumed)
|
620 | .error(done);
|
621 | else
|
622 | done(new Error('unknown update type'));
|
623 | },
|
624 | _updateItems= function (items,done)
|
625 | {
|
626 | async.forEach(items,_updateItem,done);
|
627 | };
|
628 |
|
629 |
|
630 | cursor
|
631 | .results(sync.results(_updateItems))
|
632 | .consumed(_consumed)
|
633 | .error(_error)
|
634 | .end(sync.end);
|
635 |
|
636 | return p;
|
637 | };
|
638 |
|
639 | table.modify= function (read,write)
|
640 | {
|
641 | return _modify(dyn,table._dynamo.TableName,read,write);
|
642 | };
|
643 |
|
644 | table.drop= function ()
|
645 | {
|
646 | var p= dyn.promise(),
|
647 | _success= function ()
|
648 | {
|
649 | delete db[_alias(table._dynamo.TableName)];
|
650 | p.trigger.success();
|
651 | },
|
652 | _check= function ()
|
653 | {
|
654 | dyn.describeTable(table._dynamo.TableName,
|
655 | function (err,data)
|
656 | {
|
657 | if (err)
|
658 | {
|
659 | if (err.code=='ResourceNotFoundException')
|
660 | _success();
|
661 | else
|
662 | p.trigger.error(err);
|
663 | }
|
664 | else
|
665 | setTimeout(_check,5000);
|
666 | });
|
667 | };
|
668 |
|
669 | async.forEach(table.indexes,
|
670 | function (index,done)
|
671 | {
|
672 | index.drop(done);
|
673 | },
|
674 | function (err)
|
675 | {
|
676 | if (opts.hints) console.log('This may take a while...'.yellow);
|
677 |
|
678 | dyn.deleteTable(table._dynamo.TableName,function (err)
|
679 | {
|
680 | if (err)
|
681 | {
|
682 | if (err.code=='ResourceNotFoundException')
|
683 | _success();
|
684 | else
|
685 | p.trigger.error(err);
|
686 | }
|
687 | else
|
688 | setTimeout(_check,5000);
|
689 | });
|
690 | });
|
691 |
|
692 | return p;
|
693 | };
|
694 |
|
695 | return table;
|
696 | },
|
697 | configureTables= function (cb)
|
698 | {
|
699 | var configure= function (tables)
|
700 | {
|
701 | async.forEach(Object.keys(tables),
|
702 | function (table,done)
|
703 | {
|
704 | dyn.describeTable(table,function (err,data)
|
705 | {
|
706 | if (!err)
|
707 | {
|
708 | var hash= _.findWhere(data.Table.KeySchema,{ KeyType: 'HASH' }),
|
709 | range= _.findWhere(data.Table.KeySchema,{ KeyType: 'RANGE' });
|
710 |
|
711 | if (hash&&hash.AttributeName&&hash.AttributeName=='_id'&&range&&range.AttributeName=='_pos')
|
712 | db[tables[table]]= configureTable({ _dynamo: data.Table, indexes: [] });
|
713 | }
|
714 |
|
715 | done(err);
|
716 | });
|
717 | },
|
718 | function (err)
|
719 | {
|
720 | cb(err,err ? null : db);
|
721 | });
|
722 | };
|
723 |
|
724 | if (opts.tables)
|
725 | configure(opts.tables);
|
726 | else
|
727 | dyn.listTables(function (err,list)
|
728 | {
|
729 | if (err)
|
730 | cb(err);
|
731 | else
|
732 | {
|
733 | var tables= {};
|
734 | list.forEach(function (table) { tables[table]= table; });
|
735 | configure(tables);
|
736 | }
|
737 | });
|
738 | };
|
739 |
|
740 | db.createCollection= function (name)
|
741 | {
|
742 | var p= dyn.promise(),
|
743 | _success= function ()
|
744 | {
|
745 | dyn.describeTable(name,function (err,data)
|
746 | {
|
747 | if (!err)
|
748 | {
|
749 | db[name]= configureTable({ _dynamo: data.Table, indexes: [] });
|
750 | p.trigger.success();
|
751 | }
|
752 | else
|
753 | p.trigger.error(err);
|
754 |
|
755 | });
|
756 | };
|
757 |
|
758 | if (opts.hints) console.log('This may take a while...'.yellow);
|
759 |
|
760 | dyn.table(name)
|
761 | .hash('_id','S')
|
762 | .range('_pos','N')
|
763 | .create(function check()
|
764 | {
|
765 | dyn.table(name)
|
766 | .hash('_id','xx')
|
767 | .query(function ()
|
768 | {
|
769 | _success();
|
770 | })
|
771 | .error(function (err)
|
772 | {
|
773 | if (err.code=='ResourceNotFoundException')
|
774 | setTimeout(check,5000);
|
775 | else
|
776 | if (err.code=='notfound')
|
777 | _success();
|
778 | else
|
779 | p.trigger.error(err);
|
780 | });
|
781 | })
|
782 | .error(function (err)
|
783 | {
|
784 | if (err.code=='ResourceInUseException')
|
785 | p.trigger.error(new Error('the collection exists'));
|
786 | else
|
787 | p.trigger.error(err);
|
788 | });
|
789 |
|
790 | return p;
|
791 | };
|
792 |
|
793 | db.ensureTransactionTable= function (topts)
|
794 | {
|
795 | topts= _.defaults(topts || {},{ name: 'dyngo-transaction-table' });
|
796 |
|
797 | var p= dyn.promise(),
|
798 | _success= function ()
|
799 | {
|
800 | dyn.describeTable(topts.name,function (err,data)
|
801 | {
|
802 | if (!err)
|
803 | {
|
804 | db.txTable= { _dynamo: data.Table, indexes: [] };
|
805 |
|
806 | db.txTable.modify= function (read,write) { return _modify(dyn,data.Table.TableName,read,write) };
|
807 |
|
808 | db.txTable.drop= function ()
|
809 | {
|
810 | var p= dyn.promise(),
|
811 | _success= function ()
|
812 | {
|
813 | delete db.txTable;
|
814 | p.trigger.success();
|
815 | },
|
816 | _check= function ()
|
817 | {
|
818 | dyn.describeTable(data.Table.TableName,
|
819 | function (err,data)
|
820 | {
|
821 | if (err)
|
822 | {
|
823 | if (err.code=='ResourceNotFoundException')
|
824 | _success();
|
825 | else
|
826 | p.trigger.error(err);
|
827 | }
|
828 | else
|
829 | setTimeout(_check,5000);
|
830 | });
|
831 | };
|
832 |
|
833 | if (opts.hints) console.log('This may take a while...'.yellow);
|
834 |
|
835 | dyn.deleteTable(data.Table.TableName,function (err)
|
836 | {
|
837 | if (err)
|
838 | {
|
839 | if (err.code=='ResourceNotFoundException')
|
840 | _success();
|
841 | else
|
842 | p.trigger.error(err);
|
843 | }
|
844 | else
|
845 | setTimeout(_check,5000);
|
846 | });
|
847 |
|
848 | return p;
|
849 | };
|
850 |
|
851 | p.trigger.success();
|
852 | }
|
853 | else
|
854 | p.trigger.error(err);
|
855 | });
|
856 | };
|
857 |
|
858 | if (opts.hints) console.log('This may take a while...'.yellow);
|
859 |
|
860 | dyn.table(topts.name)
|
861 | .hash('_id','S')
|
862 | .range('_item','S')
|
863 | .create(function check()
|
864 | {
|
865 | dyn.table(topts.name)
|
866 | .hash('_id','xx')
|
867 | .query(function ()
|
868 | {
|
869 | _success();
|
870 | })
|
871 | .error(function (err)
|
872 | {
|
873 | if (err.code=='ResourceNotFoundException')
|
874 | setTimeout(check,5000);
|
875 | else
|
876 | if (err.code=='notfound')
|
877 | _success();
|
878 | else
|
879 | p.trigger.error(err);
|
880 | });
|
881 | })
|
882 | .error(function (err)
|
883 | {
|
884 | if (err.code=='ResourceInUseException')
|
885 | _success();
|
886 | else
|
887 | p.trigger.error(err);
|
888 | });
|
889 |
|
890 | return p;
|
891 | };
|
892 |
|
893 | db.transaction= function (txOpts)
|
894 | {
|
895 | var p= dyn.promise('transaction',null,'consumed'),
|
896 | consume= {};
|
897 |
|
898 | process.nextTick(function ()
|
899 | {
|
900 | if (!db.txTable)
|
901 | {
|
902 | p.trigger.error(new Error('no transaction table defined'));
|
903 | return;
|
904 | }
|
905 |
|
906 | var tab= dyn.table(db.txTable._dynamo.TableName),
|
907 | init= function (tx)
|
908 | {
|
909 | dyn.table(db.txTable._dynamo.TableName)
|
910 | .hash('_id',tx._id)
|
911 | .range('_item','_')
|
912 | .put(tx,function ()
|
913 | {
|
914 | var dopts= _.extend({ tx: tx, txTable: db.txTable },opts,txOpts);
|
915 |
|
916 | dyngo(dopts,
|
917 | function (err,tx)
|
918 | {
|
919 | if (err)
|
920 | {
|
921 | p.trigger.error(err);
|
922 | return;
|
923 | }
|
924 |
|
925 | _.filter(_.keys(tx),function (key) { return !!tx[key].find; })
|
926 | .forEach(function (tableName)
|
927 | {
|
928 | db[tableName].ensuredIndexes.forEach(tx[tableName].enableIndex); // use enableIndex (sync) do not ensure..
|
929 | });
|
930 |
|
931 | dopts.tx.transaction= _.bind(db.transaction,db);
|
932 |
|
933 | tx.commit= function ()
|
934 | {
|
935 | var p= dyn.promise('committed','rolledback','consumed'),
|
936 | consume= {},
|
937 | _commit= function (cb)
|
938 | {
|
939 | dyn.table(db.txTable._dynamo.TableName)
|
940 | .hash('_id',tx._id)
|
941 | .range('_item','_')
|
942 | .updateItem({ update: { state: { action: 'PUT', value: 'committed' } },
|
943 | expected: { state: 'pending' } },
|
944 | function ()
|
945 | {
|
946 | tx.state= 'committed';
|
947 | cb();
|
948 | })
|
949 | .consumed(_collect(consume))
|
950 | .error(function (err)
|
951 | {
|
952 | if (err.code=='notfound')
|
953 | p.trigger.rolledback(true);
|
954 | else
|
955 | p.trigger.error(err);
|
956 | });
|
957 | },
|
958 | _complete= function (cb)
|
959 | {
|
960 | var sync= dyn.syncResults(function (err)
|
961 | {
|
962 | if (err)
|
963 | p.trigger.error(err);
|
964 | else
|
965 | cb();
|
966 | });
|
967 |
|
968 | dyn.table(db.txTable._dynamo.TableName)
|
969 | .hash('_id',tx._id)
|
970 | .range('_item','target::','BEGINS_WITH')
|
971 | .query(sync.results(function (items,done)
|
972 | {
|
973 | async.forEach(items,
|
974 | function (item,done)
|
975 | {
|
976 | var _item= item._item.split('::'),
|
977 | table= _item[1],
|
978 | hash= { attr: _item[2], value: _item[3] },
|
979 | range= { attr: _item[4], value: _item[4]=='_pos' ? +_item[5] : _item[5] };
|
980 |
|
981 | if (item._txOp=='delete')
|
982 | {
|
983 | dyn.table(table)
|
984 | .hash(hash.attr,hash.value)
|
985 | .range(range.attr,range.value)
|
986 | .delete(function () { done(); },{ expected: { _tx: tx._id } })
|
987 | .consumed(_collect(consume))
|
988 | .error(done);
|
989 | }
|
990 | else
|
991 | dyn.table(table)
|
992 | .hash(hash.attr,hash.value)
|
993 | .range(range.attr,range.value)
|
994 | .updateItem({ update: { _txTransient: { action: 'DELETE' },
|
995 | _txApplied: { action: 'DELETE' },
|
996 | _txDeleted: { action: 'DELETE' },
|
997 | _txLocked: { action: 'DELETE' },
|
998 | _tx: { action: 'DELETE' } } },
|
999 | function () { done(); })
|
1000 | .consumed(_collect(consume))
|
1001 | .error(done);
|
1002 |
|
1003 | },
|
1004 | done);
|
1005 | }),
|
1006 | { attrs: ['_id','_item','_txOp'],
|
1007 | consistent: true })
|
1008 | .error(p.trigger.error)
|
1009 | .consumed(_collect(consume))
|
1010 | .end(sync.end);
|
1011 | },
|
1012 | _clean= function (cb)
|
1013 | {
|
1014 | dyn.table(db.txTable._dynamo.TableName)
|
1015 | .hash('_id',tx._id)
|
1016 | .range('_item','_')
|
1017 | .updateItem({ update: { state: { action: 'PUT', value: 'completed' } },
|
1018 | expected: { state: 'committed' } },
|
1019 | function ()
|
1020 | {
|
1021 | tx.state= 'completed';
|
1022 | cb();
|
1023 | })
|
1024 | .consumed(p.trigger.consumed)
|
1025 | .error(p.trigger.error);
|
1026 | },
|
1027 | _committed= function ()
|
1028 | {
|
1029 | p.trigger.consumed(consume);
|
1030 | p.trigger.committed();
|
1031 | };
|
1032 |
|
1033 | if (tx.state=='pending')
|
1034 | _commit(function ()
|
1035 | {
|
1036 | _complete(function ()
|
1037 | {
|
1038 | _clean(_committed);
|
1039 | });
|
1040 | });
|
1041 | else
|
1042 | if (tx.state=='committed')
|
1043 | _complete(function ()
|
1044 | {
|
1045 | _clean(_committed);
|
1046 | });
|
1047 | else
|
1048 | p.trigger.error(new Error("Invalid transaction state: "+tx.state));
|
1049 |
|
1050 | return p;
|
1051 | };
|
1052 |
|
1053 | tx.rollback= function ()
|
1054 | {
|
1055 | var p= dyn.promise('rolledback',null,'consumed'),
|
1056 | consume= {},
|
1057 | _rollback= function (cb)
|
1058 | {
|
1059 | var sync= dyn.syncResults(function (err)
|
1060 | {
|
1061 | if (err)
|
1062 | p.trigger.error(err);
|
1063 | else
|
1064 | dyn.table(db.txTable._dynamo.TableName)
|
1065 | .hash('_id',tx._id)
|
1066 | .range('_item','_')
|
1067 | .updateItem({ update: { state: { action: 'PUT', value: 'rolledback' } },
|
1068 | expected: { state: 'pending' } },
|
1069 | function ()
|
1070 | {
|
1071 | tx.state= 'rolledback';
|
1072 | cb();
|
1073 | })
|
1074 | .consumed(_collect(consume))
|
1075 | .error(p.trigger.error);
|
1076 | });
|
1077 |
|
1078 | dyn.table(db.txTable._dynamo.TableName)
|
1079 | .hash('_id',tx._id)
|
1080 | .range('_item','target::','BEGINS_WITH')
|
1081 | .query(sync.results(function (items,done)
|
1082 | {
|
1083 | async.forEach(items,
|
1084 | function (item,done)
|
1085 | {
|
1086 | var _item= item._item.split('::'),
|
1087 | table= _item[1],
|
1088 | hash= { attr: _item[2], value: _item[3] },
|
1089 | range= { attr: _item[4], value: _item[4]=='_pos' ? +_item[5] : _item[5] },
|
1090 | clean= function ()
|
1091 | {
|
1092 | dyn.table(table)
|
1093 | .hash(hash.attr,hash.value)
|
1094 | .range(range.attr,range.value)
|
1095 | .updateItem({ update: { _txTransient: { action: 'DELETE' },
|
1096 | _txApplied: { action: 'DELETE' },
|
1097 | _txDeleted: { action: 'DELETE' },
|
1098 | _txLocked: { action: 'DELETE' },
|
1099 | _tx: { action: 'DELETE' } } },
|
1100 | function () { done(); })
|
1101 | .consumed(_collect(consume))
|
1102 | .error(done);
|
1103 | };
|
1104 |
|
1105 | if (item._txOp=='put')
|
1106 | dyn.table(table)
|
1107 | .hash(hash.attr,hash.value)
|
1108 | .range(range.attr,range.value)
|
1109 | .get(function (item)
|
1110 | {
|
1111 | if (item._txTransient)
|
1112 | dyn.table(table)
|
1113 | .hash(hash.attr,hash.value)
|
1114 | .range(range.attr,range.value)
|
1115 | .delete(function () { done(); })
|
1116 | .consumed(_collect(consume))
|
1117 | .error(done);
|
1118 | else
|
1119 | dyn.table(db.txTable._dynamo.TableName)
|
1120 | .hash('_id',tx._id)
|
1121 | .range('_item',['copy',
|
1122 | table,
|
1123 | hash.attr,
|
1124 | hash.value,
|
1125 | range.attr,
|
1126 | range.value].join('::'))
|
1127 | .get(function (copy)
|
1128 | {
|
1129 | copy= _.omit(copy,['_id',
|
1130 | '_item',
|
1131 | '_txLocked',
|
1132 | '_txApplied',
|
1133 | '_txDeleted',
|
1134 | '_txTransient',
|
1135 | '_tx']);
|
1136 |
|
1137 | copy[hash.attr]= hash.value;
|
1138 | copy[range.attr]= range.value;
|
1139 |
|
1140 | dyn.table(table)
|
1141 | .hash(hash.attr,hash.value)
|
1142 | .range(range.attr,range.value)
|
1143 | .put(copy,function () { done(); })
|
1144 | .consumed(p.trigger.consumed)
|
1145 | .error(p.trigger.error);
|
1146 | })
|
1147 | .consumed(_collect(consume))
|
1148 | .error(done);
|
1149 | })
|
1150 | .consumed(_collect(consume))
|
1151 | .error(done);
|
1152 | else
|
1153 | clean();
|
1154 | },
|
1155 | done);
|
1156 | }),
|
1157 | { attrs: ['_id','_item','_txOp'],
|
1158 | consistent: true })
|
1159 | .error(p.trigger.error)
|
1160 | .consumed(_collect(consume))
|
1161 | .end(sync.end);
|
1162 | };
|
1163 |
|
1164 | if (tx.state=='pending')
|
1165 | _rollback(p.trigger.rolledback);
|
1166 | else
|
1167 | p.trigger.error(new Error("Invalid transaction state: "+tx.state));
|
1168 |
|
1169 | return p;
|
1170 | };
|
1171 |
|
1172 | p.trigger.consumed(consume);
|
1173 | p.trigger.transaction(tx);
|
1174 | });
|
1175 | })
|
1176 | .consumed(_collect(consume))
|
1177 | .error(p.trigger.error);
|
1178 |
|
1179 | };
|
1180 |
|
1181 | if (typeof txOpts=='string')
|
1182 | tab.hash('_id',txOpts)
|
1183 | .range('_item','_')
|
1184 | .get(init,{ consistent: true })
|
1185 | .consumed(_collect(consume))
|
1186 | .error(p.trigger.error);
|
1187 | else
|
1188 | {
|
1189 | if (opts.tx)
|
1190 | p.trigger.error(new Error('cannot start a transaction within a transaction'));
|
1191 | else
|
1192 | init({ _id: uuid(), _item: '_', state: 'pending' });
|
1193 | }
|
1194 | });
|
1195 |
|
1196 | return p;
|
1197 | };
|
1198 |
|
1199 | configureTables(cb);
|
1200 |
|
1201 | };
|