1 |
|
2 | (function() {
|
3 | var ReadableSearch, jsonStream, mixin, through2, toBulk, transform;
|
4 |
|
5 | ReadableSearch = require('./readable-search');
|
6 |
|
7 | through2 = require('through2');
|
8 |
|
9 | mixin = require('./mixin');
|
10 |
|
11 | toBulk = function(operdelete) {
|
12 | return through2.obj(function(doc, enc, callback) {
|
13 | var idx;
|
14 | idx = {
|
15 | _index: doc._index,
|
16 | _type: doc._type,
|
17 | _id: doc._id
|
18 | };
|
19 | if (operdelete) {
|
20 | this.push({
|
21 | "delete": idx
|
22 | });
|
23 | } else {
|
24 | this.push({
|
25 | index: idx
|
26 | });
|
27 | this.push(doc._source);
|
28 | }
|
29 | return callback();
|
30 | });
|
31 | };
|
32 |
|
33 | transform = function(fn) {
|
34 | return through2.obj(function(doc, enc, callback) {
|
35 | var tdoc;
|
36 | tdoc = fn(doc);
|
37 | if (tdoc) {
|
38 | this.push(tdoc);
|
39 | }
|
40 | return callback();
|
41 | });
|
42 | };
|
43 |
|
44 | jsonStream = function() {
|
45 | return through2.obj(function(chunk, enc, callback) {
|
46 | this.push(JSON.stringify(chunk) + "\n");
|
47 | return callback();
|
48 | });
|
49 | };
|
50 |
|
51 | module.exports = function(client, _opts, operdelete, trans) {
|
52 | var last, opts, readable, scrollExec, stream;
|
53 | opts = mixin(_opts, {
|
54 | scroll: '60s',
|
55 | size: 200
|
56 | });
|
57 | if (!opts.body && !opts.q) {
|
58 | opts.body = {
|
59 | query: {
|
60 | match_all: {}
|
61 | }
|
62 | };
|
63 | }
|
64 | if (opts.body) {
|
65 | delete opts.q;
|
66 | } else {
|
67 | delete opts.body;
|
68 | }
|
69 | scrollExec = (function() {
|
70 | var scrollId;
|
71 | scrollId = null;
|
72 | return function(from, callback) {
|
73 | if (scrollId) {
|
74 | return client.scroll({
|
75 | scrollId: scrollId,
|
76 | scroll: '60s'
|
77 | }, callback);
|
78 | } else {
|
79 | return client.search(opts, function(err, res) {
|
80 | scrollId = res != null ? res._scroll_id : void 0;
|
81 | return callback(err, res);
|
82 | });
|
83 | }
|
84 | };
|
85 | })();
|
86 | readable = new ReadableSearch(scrollExec).on('error', function(err) {
|
87 | return stream.emit('error', err);
|
88 | });
|
89 | last = -1;
|
90 | stream = readable.pipe(through2.obj(function(hit, enc, callback) {
|
91 | this.push(hit);
|
92 | if (readable.from !== last) {
|
93 | last = readable.from;
|
94 | stream.emit('progress', {
|
95 | from: last,
|
96 | total: readable.total
|
97 | });
|
98 | }
|
99 | return callback();
|
100 | })).pipe(transform(trans)).pipe(toBulk(operdelete)).pipe(jsonStream()).on('end', function() {
|
101 | if (readable.from !== last) {
|
102 | return stream.emit('progress', {
|
103 | from: readable.total,
|
104 | total: readable.total
|
105 | });
|
106 | }
|
107 | });
|
108 | return stream;
|
109 | };
|
110 |
|
111 | }).call(this);
|
112 |
|
113 |
|