UNPKG

3.6 kBJavaScriptView Raw
1var colors= require('colors'),
2 _= require('underscore'),
3 async= require('async');
4
5const M= 1048576;
6
7module.exports= function (dyn)
8{
9 var finder= {};
10
11 finder.canFind= function (query)
12 {
13 return query.$supported;
14 };
15
16 finder.find= function (query)
17 {
18 if (query.opts.hints)
19 console.log(('SCAN on '+query.table.name+' for '+JSON.stringify(query.cond,null,2)).red);
20
21 var p= dyn.promise(['results','count','end'],null,'consumed'),
22 avgItemSize= Math.ceil(query.table._dynamo.TableSizeBytes/query.table._dynamo.ItemCount),
23 perWorker= (M/avgItemSize)*80/100,
24 workers= Math.ceil(query.table._dynamo.ItemCount/perWorker);
25
26 if (!workers) workers= 1;
27 else
28 if (workers>query.table._dynamo.ProvisionedThroughput.ReadCapacityUnits)
29 workers= query.table._dynamo.ProvisionedThroughput.ReadCapacityUnits;
30
31 // FIXME: implement opts.maxworkers ? better divide & conquer algorithm
32
33 var filter= {};
34
35 Object.keys(query.$filter).forEach(function (fieldName)
36 {
37 var field= query.$filter[fieldName];
38
39 if (field.op!='REGEXP')
40 {
41 filter[fieldName]= field;
42 delete query.$filter[fieldName];
43 query.$filtered.push(fieldName);
44 }
45 });
46
47 query.counted= query.canCount()&&query.count;
48
49 if (query.counted)
50 {
51
52 var count= 0,
53 progress= [],
54 _progress= function (segment, pcount)
55 {
56 progress[segment]= pcount;
57
58 process.stdout.write(('\r'+_.reduce(progress,function (memo,num) { return memo+num; },0)).yellow);
59 };
60
61 async.forEach(_.range(workers),
62 function (segment,done)
63 {
64 var sp= dyn.table(query.table.name)
65 .scan(function (wcount)
66 {
67 count+=wcount;
68 done();
69 },
70 {
71 filter: filter,
72 attrs: query.projection.include,
73 limit: query.window,
74 count: query.count,
75 segment: { no: segment, of: workers }
76 })
77 .consumed(p.trigger.consumed)
78 .error(done);
79
80 if (dyn.iscli())
81 sp.progress(function (pcount) { _progress(segment,pcount); })
82 },
83 function (err)
84 {
85 if (err)
86 p.trigger.error(err);
87 else
88 p.trigger.count(count);
89 });
90 }
91 else
92 {
93 async.forEach(_.range(workers),
94 function (segment,done)
95 {
96 dyn.table(query.table.name)
97 .scan(p.trigger.results,
98 {
99 filter: filter,
100 attrs: query.projection.include,
101 segment: { no: segment, of: workers },
102 limit: query.window
103 })
104 .consumed(p.trigger.consumed)
105 .end(done)
106 .error(done);
107 },
108 function (err)
109 {
110 if (err)
111 p.trigger.error(err);
112 else
113 p.trigger.end();
114 });
115 }
116
117 return p;
118 };
119
120 return finder;
121};