UNPKG

53.1 kBJavaScriptView Raw
1var dyno= require('./lib/dyn.js'),
2 diff = require('deep-diff').diff,
3 uuid= require('node-uuid').v4,
4 _= require('underscore'),
5 cclone= require('circularclone'),
6 async= require('async');
7
8var _parser= require('./lib/parser'),
9 _finder= require('./lib/finder'),
10 _refiner= require('./lib/refiner'),
11 _index= require('./lib/indexer'),
12 _modify= require('./lib/capacity');
13
14const _nu= function (v)
15 {
16 return typeof v!='undefined';
17 },
18 _isobject= function (v)
19 {
20 return typeof v=='object'&&!Array.isArray(v);
21 },
22 _isobjectarr= function (v)
23 {
24 return typeof v=='object'&&Array.isArray(v)&&v.length>0&&typeof v[0]=='object';
25 },
26 _collect= function (consume)
27 {
28 return function (cons)
29 {
30 var c;
31
32 if (cons.table)
33 {
34 if (!(c=consume[cons.table]))
35 c= consume[cons.table]= { read: 0, write: 0 };
36
37 c.read+= cons.read;
38 c.write+= cons.write;
39 }
40 else
41 _.keys(cons).forEach(function (table)
42 {
43 var c, tcons= cons[table];
44
45 if (!(c=consume[table]))
46 c= consume[table]= { read: 0, write: 0 };
47
48 c.read+= tcons.read;
49 c.write+= tcons.write;
50 });
51 };
52 };
53
54var dyngo= module.exports= function (opts,cb)
55{
56 var defaults= { hints: true };
57
58 if (!cb)
59 {
60 cb= opts;
61 opts= defaults;
62 }
63
64 opts= opts || defaults;
65 opts= _.defaults(opts,defaults);
66
67 var dyn= dyno(opts.dynamo,_.extend(opts.tx || {},{ txTable: opts.txTable })),
68 finder= _finder(dyn),
69 parser= _parser(dyn,opts),
70 db= _.extend({ _dyn: dyn },opts.tx,{ txTable: opts.txTable }),
71 _alias= function (table)
72 {
73 return (opts.tables || {} )[table] || table;
74 };
75
76 db.cleanup= function (obj)
77 {
78 var p= dyn.promise('clean');
79
80 process.nextTick(function ()
81 {
82 p.trigger.clean(cclone(obj,function (key,value)
83 {
84 if (key.indexOf&&key.indexOf('_')==0&&key!='_id')
85 return undefined;
86 else
87 return value;
88 }));
89 });
90
91 return p;
92 };
93
94
95 var configureTable= function (table)
96 {
97 table.ensuredIndexes= [];
98
99 table.find= function (cond,projection,identity)
100 {
101 var p, modifiers= {}, table= this;
102
103 modifiers.$consistent= !!table.$consistent;
104
105 p= dyn.promise(['results','count','end'],null,'consumed');
106
107 process.nextTick(function ()
108 {
109 parser
110 .parse(table,modifiers,cond,projection,identity)
111 .parsed(function (query)
112 {
113 refiner= _refiner(dyn,query),
114 cursor= finder.find(query);
115 cursor.chain(refiner);
116 refiner.chain(p);
117 })
118 .error(p.trigger.error);
119 });
120
121 p.sort= function (o)
122 {
123 modifiers.orderby= o;
124 return p;
125 };
126
127 p.limit= function (n)
128 {
129 modifiers.limit= n;
130 return p;
131 };
132
133 p.window= function (n)
134 {
135 modifiers.window= n;
136 return p;
137 };
138
139 p.skip= function (n)
140 {
141 modifiers.skip= n;
142 return p;
143 };
144
145 p.noderef= function ()
146 {
147 modifiers.noderef= true;
148 return p;
149 };
150
151 var origCount= _.bind(p.count,p);
152
153 p.count= function (fn)
154 {
155 if (fn)
156 origCount(fn);
157 else
158 modifiers.count= true;
159
160 return p;
161 };
162
163 return p;
164 };
165
166 table.findOne= function ()
167 {
168 var p, args= arguments, table= this;
169
170 p= dyn.promise('result','notfound','consumed');
171
172 table.find.apply(table,args).limit(1).results(function (items)
173 {
174 if (items.length==0)
175 p.trigger.notfound();
176 else
177 p.trigger.result(items[0]);
178 })
179 .consumed(p.trigger.consumed)
180 .error(p.trigger.error);
181
182 return p;
183 };
184
185 table.consistent= function ()
186 {
187 return _.extend({ $consistent: true }, table);
188 };
189
190 table.save= function (_obj,isCreate)
191 {
192 var objs= _obj ? (Array.isArray(_obj) ? _obj : [_obj]) : [],
193 consume= {},
194 p= dyn.promise(null,'updatedsinceread','consumed'), found= false;
195
196 process.nextTick(function ()
197 {
198
199 async.forEach(objs,
200 function (obj,done)
201 {
202 var gops= {},
203 ops= gops[table._dynamo.TableName]= [],
204 _hashrange= function (obj)
205 {
206 obj._id= obj._id || uuid();
207 obj._pos= obj._pos || 0;
208 obj._rev= (obj._rev || 0)+1;
209 obj._refs= [];
210 },
211 _index= function (obj)
212 {
213 table.indexes.forEach(function (index)
214 {
215 var iops= index.update(obj,'put') || {};
216
217 _.keys(iops).forEach(function (table)
218 {
219 var tops= gops[table]= gops[table] || [];
220 tops.push.apply(tops,_.collect(iops[table],function (op) { op.index= true; return op; }));
221 tops.index= true;
222 });
223 });
224 },
225 _remove= function (item)
226 {
227 ops.push({ op: 'del', item: { _id: item._id, _pos: item._pos } });
228
229 table.indexes.forEach(function (index)
230 {
231 var iops= index.update(item,'del') || {};
232
233 _.keys(iops).forEach(function (table)
234 {
235 var tops= gops[table]= gops[table] || [];
236 tops.push.apply(tops,_.collect(iops[table],function (op) { op.index= true; return op; }));
237 });
238 });
239 },
240 _save= function (obj,isCreate)
241 {
242 var _keys= _.keys(obj),
243 _omit= ['_old'],
244 diffs= diff(obj._old || {},
245 obj,
246 function (path,key) { return key=='_old'; });
247
248 if (!diffs || diffs.length==0
249 || (obj._old || {_rev: 0})._rev<obj._rev
250 ) return;
251
252 _hashrange(obj);
253
254 _keys.forEach(function (key)
255 {
256 if (key.indexOf('___')==0)
257 {
258 if (!_isobjectarr(obj[key.substring(3)]))
259 {
260 _omit.push(key);
261 return;
262 }
263 }
264 else
265 if (key.indexOf('__')==0)
266 {
267 if (!_isobject(obj[key.substring(2)]))
268 {
269 _omit.push(key);
270 return;
271 }
272 }
273
274 var type= typeof obj[key];
275
276 if (type=='object'&&!_.contains(['_old','_refs'],key))
277 {
278 var desc= obj[key];
279
280 if (desc==null)
281 _omit.push(key);
282 else
283 if (desc instanceof Date)
284 { /* let dyn convert */ }
285 else
286 if (_.keys(desc).length==0)
287 _omit.push(key);
288 else
289 if (Array.isArray(desc))
290 {
291 if (desc.length)
292 {
293 if (typeof desc[0]=='object')
294 {
295 var _id= obj['___'+key]= obj['___'+key] || uuid();
296
297 if (obj._old)
298 {
299 var old= obj._old[key];
300
301 if (old&&old.length>desc.length)
302 old.forEach(function (oitem,idx)
303 {
304 if (oitem._id==_id)
305 {
306 if (!_.findWhere(desc,{ _pos: oitem._pos }))
307 _remove(oitem);
308 }
309 else
310 {
311 if (!_.findWhere(desc,{ _pos: idx }))
312 _remove({ _id: _id, _pos: idx, _ref: oitem._id+'$:$'+oitem._pos });
313 }
314 });
315 }
316
317 desc.forEach(function (val, pos)
318 {
319 if (val._id&&val._id!=_id)
320 {
321 _save(val);
322 _save({ _id: _id, _pos: pos, _ref: val._id+'$:$'+val._pos });
323 obj._refs.push(val._id);
324 }
325 else
326 {
327 val._id= _id;
328
329 if (!isNaN(val._pos)&&val._pos!=pos)
330 {
331 delete val['_old'];
332 delete val['_rev'];
333 _remove(val);
334 }
335
336 val._pos= pos;
337 _save(val);
338 }
339 });
340
341 _omit.push(key);
342 }
343 }
344 else
345 {
346 var _id= obj['___'+key];
347
348 if (_id&&obj._old[key].length)
349 obj._old[key].forEach(_remove);
350
351 _omit.push(key);
352 _omit.push('___'+key);
353 }
354 }
355 else
356 {
357 _save(desc);
358 obj['__'+key]= desc._id+'$:$'+desc._pos;
359 obj._refs.push(desc._id);
360 _omit.push(key);
361 }
362 }
363 else
364 if (type=='string'&&!obj[key])
365 _omit.push(key);
366 else
367 if (type=='number'&&isNaN(obj[key]))
368 _omit.push(key);
369 });
370
371 if (!obj._refs.length)
372 delete obj['_refs'];
373 else
374 obj._refs= _.uniq(obj._refs);
375
376 _index(obj); // index after _ fields are set so they are indexable too
377
378 ops.unshift({ op: 'put', item: obj, omit: _omit, isCreate: isCreate }); // let the aggregate op came first of "contained" objects, so that the aggrgate version protects the rest
379 },
380 _mput= function (gops,done)
381 {
382 async.forEach(_.keys(gops),
383 function (_table,done)
384 {
385 var tops= gops[_table];
386
387 async.forEachSeries(tops, // forEachSeries: when deleting elements from array i need deletes of old item _pos done before new item _pos put
388 function (op,done)
389 {
390 var tab= dyn.table(_table),
391 obj= op.item;
392
393 if (op.index)
394 tab.hash('_hash',obj._hash)
395 .range('_range',obj._range);
396 else
397 tab.hash('_id',obj._id)
398 .range('_pos',obj._pos);
399
400 if (op.op=='put')
401 tab.put(_.omit(obj,op.omit),
402 function ()
403 {
404 obj._old= cclone(_.omit(obj,'_old'));
405 done();
406 },
407 { expected: obj._old&&_nu(obj._old._rev) ? { _rev: obj._old._rev } : undefined,
408 exists: isCreate ? false : undefined })
409 .consumed(_collect(consume))
410 .error(done);
411 else
412 if (op.op=='del')
413 tab.delete(done)
414 .error(done);
415 else
416 done(new Error('unknown update type:'+op.op));
417 },
418 done);
419 },
420 done);
421 };
422
423
424 _save(obj,isCreate);
425
426 _.keys(gops).forEach(function (table)
427 {
428 if (gops[table].length==0)
429 delete gops[table];
430 else
431 found= true;
432 });
433
434 if (found)
435 _mput(gops,done);
436 else
437 process.nextTick(done);
438
439 },
440 function (err)
441 {
442 p.trigger.consumed(consume);
443
444 if (err)
445 {
446 if (err.code=='notfound')
447 p.trigger.updatedsinceread();
448 else
449 p.trigger.error(err);
450 }
451 else
452 p.trigger.success();
453 });
454
455 });
456
457 return p;
458 };
459
460 table.create= function (obj)
461 {
462 var p= dyn.promise(null,'exists','consumed');
463
464 table.save(obj,true)
465 .success(p.trigger.success)
466 .consumed(p.trigger.consumed)
467 .error(function (err)
468 {
469 if (err.code=='found')
470 p.trigger.exists();
471 else
472 p.trigger.error(err);
473 });
474
475 return p;
476 };
477
478 table.enableIndex= function (fields)
479 {
480 var index= _index(dyn,table,fields,opts);
481 table.indexes.push(index);
482 table.ensuredIndexes.push(fields);
483 };
484
485 table.ensureIndex= function (fields)
486 {
487 var p= dyn.promise();
488
489 process.nextTick(function ()
490 {
491 var index= _index(dyn,table,fields,opts);
492
493 if (index)
494 index.ensure(function (err)
495 {
496 if (err)
497 p.trigger.error(err);
498 else
499 {
500 table.indexes.push(index);
501 table.ensuredIndexes.push(fields);
502 p.trigger.success();
503 }
504 });
505 else
506 p.trigger.error(new Error('no known index type can index those fields'));
507 });
508
509 return p;
510 };
511
512 table.remove= function (filter)
513 {
514 var p= dyn.promise(null,null,'consumed'),
515 consume= {},
516 _consumed= function (cons)
517 {
518 consume.read+= cons.read;
519 consume.write+= cons.write;
520 },
521 _error= function (err)
522 {
523 p.trigger.consumed(consume);
524 p.trigger.error(err);
525 },
526 _success= function ()
527 {
528 p.trigger.consumed(consume);
529 p.trigger.success();
530 },
531 sync= dyn.syncResults(function (err)
532 {
533 if (err)
534 _error(err);
535 else
536 _success();
537 }),
538 cursor= table.find(filter,table.indexes.length ? undefined : { _id: 1, _pos: 1 }),
539 _deleteItem= function (obj,done)
540 {
541 async.parallel([
542 function (done)
543 {
544 async.forEach(table.indexes,
545 function (index,done)
546 {
547 index.remove(obj).success(done).error(done).consumed(_collect(consume));
548 },done);
549 },
550 function (done)
551 {
552 dyn.table(table._dynamo.TableName)
553 .hash('_id',obj._id)
554 .range('_pos',obj._pos)
555 .delete(done)
556 .consumed(_collect(consume))
557 .error(done);
558 }],
559 done);
560 };
561
562 cursor.results(sync.results(function (items,done)
563 {
564 async.forEach(items,_deleteItem,done);
565 }))
566 .consumed(_consumed)
567 .error(_error)
568 .end(sync.end);
569
570 return p;
571 };
572
573 table.update= function (query,update)
574 {
575 var p= dyn.promise(null,null,'consumed'),
576 cursor= table.consistent().find(query),
577 consume= {},
578 _consumed= function (cons)
579 {
580 _.keys(cons).forEach(function (table)
581 {
582 var c, tcons= cons[table];
583
584 if (!(c=consume[table]))
585 c= consume[table]= { read: 0, write: 0 };
586
587 c.read+= tcons.read;
588 c.write+= tcons.write;
589 });
590 },
591 _error= function (err)
592 {
593 p.trigger.consumed(consume);
594 p.trigger.error(err);
595 },
596 _success= function ()
597 {
598 p.trigger.consumed(consume);
599 p.trigger.success();
600 },
601 sync= dyn.syncResults(function (err)
602 {
603 if (err)
604 _error(err);
605 else
606 _success();
607 }),
608 _updateItem= function (item,done)
609 {
610 if (update.$set)
611 table.save(_.extend(item,update.$set))
612 .success(done)
613 .consumed(_consumed)
614 .error(done);
615 else
616 if (update.$unset)
617 table.save(_.omit(item,_.keys(update.$unset)))
618 .success(done)
619 .consumed(_consumed)
620 .error(done);
621 else
622 done(new Error('unknown update type'));
623 },
624 _updateItems= function (items,done)
625 {
626 async.forEach(items,_updateItem,done);
627 };
628
629
630 cursor
631 .results(sync.results(_updateItems))
632 .consumed(_consumed)
633 .error(_error)
634 .end(sync.end);
635
636 return p;
637 };
638
639 table.modify= function (read,write)
640 {
641 return _modify(dyn,table._dynamo.TableName,read,write);
642 };
643
644 table.drop= function ()
645 {
646 var p= dyn.promise(),
647 _success= function ()
648 {
649 delete db[_alias(table._dynamo.TableName)];
650 p.trigger.success();
651 },
652 _check= function ()
653 {
654 dyn.describeTable(table._dynamo.TableName,
655 function (err,data)
656 {
657 if (err)
658 {
659 if (err.code=='ResourceNotFoundException')
660 _success();
661 else
662 p.trigger.error(err);
663 }
664 else
665 setTimeout(_check,5000);
666 });
667 };
668
669 async.forEach(table.indexes,
670 function (index,done)
671 {
672 index.drop(done);
673 },
674 function (err)
675 {
676 if (opts.hints) console.log('This may take a while...'.yellow);
677
678 dyn.deleteTable(table._dynamo.TableName,function (err)
679 {
680 if (err)
681 {
682 if (err.code=='ResourceNotFoundException')
683 _success();
684 else
685 p.trigger.error(err);
686 }
687 else
688 setTimeout(_check,5000);
689 });
690 });
691
692 return p;
693 };
694
695 return table;
696 },
697 configureTables= function (cb)
698 {
699 var configure= function (tables)
700 {
701 async.forEach(Object.keys(tables),
702 function (table,done)
703 {
704 dyn.describeTable(table,function (err,data)
705 {
706 if (!err)
707 {
708 var hash= _.findWhere(data.Table.KeySchema,{ KeyType: 'HASH' }),
709 range= _.findWhere(data.Table.KeySchema,{ KeyType: 'RANGE' });
710
711 if (hash&&hash.AttributeName&&hash.AttributeName=='_id'&&range&&range.AttributeName=='_pos')
712 db[tables[table]]= configureTable({ _dynamo: data.Table, indexes: [] });
713 }
714
715 done(err);
716 });
717 },
718 function (err)
719 {
720 cb(err,err ? null : db);
721 });
722 };
723
724 if (opts.tables)
725 configure(opts.tables);
726 else
727 dyn.listTables(function (err,list)
728 {
729 if (err)
730 cb(err);
731 else
732 {
733 var tables= {};
734 list.forEach(function (table) { tables[table]= table; });
735 configure(tables);
736 }
737 });
738 };
739
740 db.createCollection= function (name)
741 {
742 var p= dyn.promise(),
743 _success= function ()
744 {
745 dyn.describeTable(name,function (err,data)
746 {
747 if (!err)
748 {
749 db[name]= configureTable({ _dynamo: data.Table, indexes: [] });
750 p.trigger.success();
751 }
752 else
753 p.trigger.error(err);
754
755 });
756 };
757
758 if (opts.hints) console.log('This may take a while...'.yellow);
759
760 dyn.table(name)
761 .hash('_id','S')
762 .range('_pos','N')
763 .create(function check()
764 {
765 dyn.table(name)
766 .hash('_id','xx')
767 .query(function ()
768 {
769 _success();
770 })
771 .error(function (err)
772 {
773 if (err.code=='ResourceNotFoundException')
774 setTimeout(check,5000);
775 else
776 if (err.code=='notfound')
777 _success();
778 else
779 p.trigger.error(err);
780 });
781 })
782 .error(function (err)
783 {
784 if (err.code=='ResourceInUseException')
785 p.trigger.error(new Error('the collection exists'));
786 else
787 p.trigger.error(err);
788 });
789
790 return p;
791 };
792
793 db.ensureTransactionTable= function (topts)
794 {
795 topts= _.defaults(topts || {},{ name: 'dyngo-transaction-table' });
796
797 var p= dyn.promise(),
798 _success= function ()
799 {
800 dyn.describeTable(topts.name,function (err,data)
801 {
802 if (!err)
803 {
804 db.txTable= { _dynamo: data.Table, indexes: [] };
805
806 db.txTable.modify= function (read,write) { return _modify(dyn,data.Table.TableName,read,write) };
807
808 db.txTable.drop= function ()
809 {
810 var p= dyn.promise(),
811 _success= function ()
812 {
813 delete db.txTable;
814 p.trigger.success();
815 },
816 _check= function ()
817 {
818 dyn.describeTable(data.Table.TableName,
819 function (err,data)
820 {
821 if (err)
822 {
823 if (err.code=='ResourceNotFoundException')
824 _success();
825 else
826 p.trigger.error(err);
827 }
828 else
829 setTimeout(_check,5000);
830 });
831 };
832
833 if (opts.hints) console.log('This may take a while...'.yellow);
834
835 dyn.deleteTable(data.Table.TableName,function (err)
836 {
837 if (err)
838 {
839 if (err.code=='ResourceNotFoundException')
840 _success();
841 else
842 p.trigger.error(err);
843 }
844 else
845 setTimeout(_check,5000);
846 });
847
848 return p;
849 };
850
851 p.trigger.success();
852 }
853 else
854 p.trigger.error(err);
855 });
856 };
857
858 if (opts.hints) console.log('This may take a while...'.yellow);
859
860 dyn.table(topts.name)
861 .hash('_id','S')
862 .range('_item','S')
863 .create(function check()
864 {
865 dyn.table(topts.name)
866 .hash('_id','xx')
867 .query(function ()
868 {
869 _success();
870 })
871 .error(function (err)
872 {
873 if (err.code=='ResourceNotFoundException')
874 setTimeout(check,5000);
875 else
876 if (err.code=='notfound')
877 _success();
878 else
879 p.trigger.error(err);
880 });
881 })
882 .error(function (err)
883 {
884 if (err.code=='ResourceInUseException')
885 _success();
886 else
887 p.trigger.error(err);
888 });
889
890 return p;
891 };
892
893 db.transaction= function (txOpts)
894 {
895 var p= dyn.promise('transaction',null,'consumed'),
896 consume= {};
897
898 process.nextTick(function ()
899 {
900 if (!db.txTable)
901 {
902 p.trigger.error(new Error('no transaction table defined'));
903 return;
904 }
905
906 var tab= dyn.table(db.txTable._dynamo.TableName),
907 init= function (tx)
908 {
909 dyn.table(db.txTable._dynamo.TableName)
910 .hash('_id',tx._id)
911 .range('_item','_')
912 .put(tx,function ()
913 {
914 var dopts= _.extend({ tx: tx, txTable: db.txTable },opts,txOpts);
915
916 dyngo(dopts,
917 function (err,tx)
918 {
919 if (err)
920 {
921 p.trigger.error(err);
922 return;
923 }
924
925 _.filter(_.keys(tx),function (key) { return !!tx[key].find; })
926 .forEach(function (tableName)
927 {
928 db[tableName].ensuredIndexes.forEach(tx[tableName].enableIndex); // use enableIndex (sync) do not ensure..
929 });
930
931 dopts.tx.transaction= _.bind(db.transaction,db);
932
933 tx.commit= function ()
934 {
935 var p= dyn.promise('committed','rolledback','consumed'),
936 consume= {},
937 _commit= function (cb)
938 {
939 dyn.table(db.txTable._dynamo.TableName)
940 .hash('_id',tx._id)
941 .range('_item','_')
942 .updateItem({ update: { state: { action: 'PUT', value: 'committed' } },
943 expected: { state: 'pending' } },
944 function ()
945 {
946 tx.state= 'committed';
947 cb();
948 })
949 .consumed(_collect(consume))
950 .error(function (err)
951 {
952 if (err.code=='notfound')
953 p.trigger.rolledback(true);
954 else
955 p.trigger.error(err);
956 });
957 },
958 _complete= function (cb)
959 {
960 var sync= dyn.syncResults(function (err)
961 {
962 if (err)
963 p.trigger.error(err);
964 else
965 cb();
966 });
967
968 dyn.table(db.txTable._dynamo.TableName)
969 .hash('_id',tx._id)
970 .range('_item','target::','BEGINS_WITH')
971 .query(sync.results(function (items,done)
972 {
973 async.forEach(items,
974 function (item,done)
975 {
976 var _item= item._item.split('::'),
977 table= _item[1],
978 hash= { attr: _item[2], value: _item[3] },
979 range= { attr: _item[4], value: _item[4]=='_pos' ? +_item[5] : _item[5] };
980
981 if (item._txOp=='delete')
982 {
983 dyn.table(table)
984 .hash(hash.attr,hash.value)
985 .range(range.attr,range.value)
986 .delete(function () { done(); },{ expected: { _tx: tx._id } })
987 .consumed(_collect(consume))
988 .error(done);
989 }
990 else
991 dyn.table(table)
992 .hash(hash.attr,hash.value)
993 .range(range.attr,range.value)
994 .updateItem({ update: { _txTransient: { action: 'DELETE' },
995 _txApplied: { action: 'DELETE' },
996 _txDeleted: { action: 'DELETE' },
997 _txLocked: { action: 'DELETE' },
998 _tx: { action: 'DELETE' } } },
999 function () { done(); })
1000 .consumed(_collect(consume))
1001 .error(done);
1002
1003 },
1004 done);
1005 }),
1006 { attrs: ['_id','_item','_txOp'],
1007 consistent: true })
1008 .error(p.trigger.error)
1009 .consumed(_collect(consume))
1010 .end(sync.end);
1011 },
1012 _clean= function (cb)
1013 {
1014 dyn.table(db.txTable._dynamo.TableName)
1015 .hash('_id',tx._id)
1016 .range('_item','_')
1017 .updateItem({ update: { state: { action: 'PUT', value: 'completed' } },
1018 expected: { state: 'committed' } },
1019 function ()
1020 {
1021 tx.state= 'completed';
1022 cb();
1023 })
1024 .consumed(p.trigger.consumed)
1025 .error(p.trigger.error);
1026 },
1027 _committed= function ()
1028 {
1029 p.trigger.consumed(consume);
1030 p.trigger.committed();
1031 };
1032
1033 if (tx.state=='pending')
1034 _commit(function ()
1035 {
1036 _complete(function ()
1037 {
1038 _clean(_committed);
1039 });
1040 });
1041 else
1042 if (tx.state=='committed')
1043 _complete(function ()
1044 {
1045 _clean(_committed);
1046 });
1047 else
1048 p.trigger.error(new Error("Invalid transaction state: "+tx.state));
1049
1050 return p;
1051 };
1052
1053 tx.rollback= function ()
1054 {
1055 var p= dyn.promise('rolledback',null,'consumed'),
1056 consume= {},
1057 _rollback= function (cb)
1058 {
1059 var sync= dyn.syncResults(function (err)
1060 {
1061 if (err)
1062 p.trigger.error(err);
1063 else
1064 dyn.table(db.txTable._dynamo.TableName)
1065 .hash('_id',tx._id)
1066 .range('_item','_')
1067 .updateItem({ update: { state: { action: 'PUT', value: 'rolledback' } },
1068 expected: { state: 'pending' } },
1069 function ()
1070 {
1071 tx.state= 'rolledback';
1072 cb();
1073 })
1074 .consumed(_collect(consume))
1075 .error(p.trigger.error);
1076 });
1077
1078 dyn.table(db.txTable._dynamo.TableName)
1079 .hash('_id',tx._id)
1080 .range('_item','target::','BEGINS_WITH')
1081 .query(sync.results(function (items,done)
1082 {
1083 async.forEach(items,
1084 function (item,done)
1085 {
1086 var _item= item._item.split('::'),
1087 table= _item[1],
1088 hash= { attr: _item[2], value: _item[3] },
1089 range= { attr: _item[4], value: _item[4]=='_pos' ? +_item[5] : _item[5] },
1090 clean= function ()
1091 {
1092 dyn.table(table)
1093 .hash(hash.attr,hash.value)
1094 .range(range.attr,range.value)
1095 .updateItem({ update: { _txTransient: { action: 'DELETE' },
1096 _txApplied: { action: 'DELETE' },
1097 _txDeleted: { action: 'DELETE' },
1098 _txLocked: { action: 'DELETE' },
1099 _tx: { action: 'DELETE' } } },
1100 function () { done(); })
1101 .consumed(_collect(consume))
1102 .error(done);
1103 };
1104
1105 if (item._txOp=='put')
1106 dyn.table(table)
1107 .hash(hash.attr,hash.value)
1108 .range(range.attr,range.value)
1109 .get(function (item)
1110 {
1111 if (item._txTransient)
1112 dyn.table(table)
1113 .hash(hash.attr,hash.value)
1114 .range(range.attr,range.value)
1115 .delete(function () { done(); })
1116 .consumed(_collect(consume))
1117 .error(done);
1118 else
1119 dyn.table(db.txTable._dynamo.TableName)
1120 .hash('_id',tx._id)
1121 .range('_item',['copy',
1122 table,
1123 hash.attr,
1124 hash.value,
1125 range.attr,
1126 range.value].join('::'))
1127 .get(function (copy)
1128 {
1129 copy= _.omit(copy,['_id',
1130 '_item',
1131 '_txLocked',
1132 '_txApplied',
1133 '_txDeleted',
1134 '_txTransient',
1135 '_tx']);
1136
1137 copy[hash.attr]= hash.value;
1138 copy[range.attr]= range.value;
1139
1140 dyn.table(table)
1141 .hash(hash.attr,hash.value)
1142 .range(range.attr,range.value)
1143 .put(copy,function () { done(); })
1144 .consumed(p.trigger.consumed)
1145 .error(p.trigger.error);
1146 })
1147 .consumed(_collect(consume))
1148 .error(done);
1149 })
1150 .consumed(_collect(consume))
1151 .error(done);
1152 else
1153 clean();
1154 },
1155 done);
1156 }),
1157 { attrs: ['_id','_item','_txOp'],
1158 consistent: true })
1159 .error(p.trigger.error)
1160 .consumed(_collect(consume))
1161 .end(sync.end);
1162 };
1163
1164 if (tx.state=='pending')
1165 _rollback(p.trigger.rolledback);
1166 else
1167 p.trigger.error(new Error("Invalid transaction state: "+tx.state));
1168
1169 return p;
1170 };
1171
1172 p.trigger.consumed(consume);
1173 p.trigger.transaction(tx);
1174 });
1175 })
1176 .consumed(_collect(consume))
1177 .error(p.trigger.error);
1178
1179 };
1180
1181 if (typeof txOpts=='string')
1182 tab.hash('_id',txOpts)
1183 .range('_item','_')
1184 .get(init,{ consistent: true })
1185 .consumed(_collect(consume))
1186 .error(p.trigger.error);
1187 else
1188 {
1189 if (opts.tx)
1190 p.trigger.error(new Error('cannot start a transaction within a transaction'));
1191 else
1192 init({ _id: uuid(), _item: '_', state: 'pending' });
1193 }
1194 });
1195
1196 return p;
1197 };
1198
1199 configureTables(cb);
1200
1201};