1 | var colors= require('colors'),
|
2 | _= require('underscore'),
|
3 | async= require('async');
|
4 |
|
5 | const M= 1048576;
|
6 |
|
7 | module.exports= function (dyn)
|
8 | {
|
9 | var finder= {};
|
10 |
|
11 | finder.canFind= function (query)
|
12 | {
|
13 | return query.$supported;
|
14 | };
|
15 |
|
16 | finder.find= function (query)
|
17 | {
|
18 | if (query.opts.hints)
|
19 | console.log(('SCAN on '+query.table.name+' for '+JSON.stringify(query.cond,null,2)).red);
|
20 |
|
21 | var p= dyn.promise(['results','count','end'],null,'consumed'),
|
22 | avgItemSize= Math.ceil(query.table._dynamo.TableSizeBytes/query.table._dynamo.ItemCount),
|
23 | perWorker= (M/avgItemSize)*80/100,
|
24 | workers= Math.ceil(query.table._dynamo.ItemCount/perWorker);
|
25 |
|
26 | if (!workers) workers= 1;
|
27 | else
|
28 | if (workers>query.table._dynamo.ProvisionedThroughput.ReadCapacityUnits)
|
29 | workers= query.table._dynamo.ProvisionedThroughput.ReadCapacityUnits;
|
30 |
|
31 |
|
32 |
|
33 | var filter= {};
|
34 |
|
35 | Object.keys(query.$filter).forEach(function (fieldName)
|
36 | {
|
37 | var field= query.$filter[fieldName];
|
38 |
|
39 | if (field.op!='REGEXP')
|
40 | {
|
41 | filter[fieldName]= field;
|
42 | delete query.$filter[fieldName];
|
43 | query.$filtered.push(fieldName);
|
44 | }
|
45 | });
|
46 |
|
47 | query.counted= query.canCount()&&query.count;
|
48 |
|
49 | if (query.counted)
|
50 | {
|
51 |
|
52 | var count= 0,
|
53 | progress= [],
|
54 | _progress= function (segment, pcount)
|
55 | {
|
56 | progress[segment]= pcount;
|
57 |
|
58 | process.stdout.write(('\r'+_.reduce(progress,function (memo,num) { return memo+num; },0)).yellow);
|
59 | };
|
60 |
|
61 | async.forEach(_.range(workers),
|
62 | function (segment,done)
|
63 | {
|
64 | var sp= dyn.table(query.table.name)
|
65 | .scan(function (wcount)
|
66 | {
|
67 | count+=wcount;
|
68 | done();
|
69 | },
|
70 | {
|
71 | filter: filter,
|
72 | attrs: query.projection.include,
|
73 | limit: query.window,
|
74 | count: query.count,
|
75 | segment: { no: segment, of: workers }
|
76 | })
|
77 | .consumed(p.trigger.consumed)
|
78 | .error(done);
|
79 |
|
80 | if (dyn.iscli())
|
81 | sp.progress(function (pcount) { _progress(segment,pcount); })
|
82 | },
|
83 | function (err)
|
84 | {
|
85 | if (err)
|
86 | p.trigger.error(err);
|
87 | else
|
88 | p.trigger.count(count);
|
89 | });
|
90 | }
|
91 | else
|
92 | {
|
93 | async.forEach(_.range(workers),
|
94 | function (segment,done)
|
95 | {
|
96 | dyn.table(query.table.name)
|
97 | .scan(p.trigger.results,
|
98 | {
|
99 | filter: filter,
|
100 | attrs: query.projection.include,
|
101 | segment: { no: segment, of: workers },
|
102 | limit: query.window
|
103 | })
|
104 | .consumed(p.trigger.consumed)
|
105 | .end(done)
|
106 | .error(done);
|
107 | },
|
108 | function (err)
|
109 | {
|
110 | if (err)
|
111 | p.trigger.error(err);
|
112 | else
|
113 | p.trigger.end();
|
114 | });
|
115 | }
|
116 |
|
117 | return p;
|
118 | };
|
119 |
|
120 | return finder;
|
121 | };
|