UNPKG

2.68 kBJavaScriptView Raw
1// Generated by CoffeeScript 1.11.1
2(function() {
3 var ReadableSearch, jsonStream, mixin, through2, toBulk, transform;
4
5 ReadableSearch = require('./readable-search');
6
7 through2 = require('through2');
8
9 mixin = require('./mixin');
10
11 toBulk = function(operdelete) {
12 return through2.obj(function(doc, enc, callback) {
13 var idx;
14 idx = {
15 _index: doc._index,
16 _type: doc._type,
17 _id: doc._id
18 };
19 if (operdelete) {
20 this.push({
21 "delete": idx
22 });
23 } else {
24 this.push({
25 index: idx
26 });
27 this.push(doc._source);
28 }
29 return callback();
30 });
31 };
32
33 transform = function(fn) {
34 return through2.obj(function(doc, enc, callback) {
35 var tdoc;
36 tdoc = fn(doc);
37 if (tdoc) {
38 this.push(tdoc);
39 }
40 return callback();
41 });
42 };
43
44 jsonStream = function() {
45 return through2.obj(function(chunk, enc, callback) {
46 this.push(JSON.stringify(chunk) + "\n");
47 return callback();
48 });
49 };
50
51 module.exports = function(client, _opts, operdelete, trans) {
52 var last, opts, readable, scrollExec, stream;
53 opts = mixin(_opts, {
54 scroll: '60s',
55 size: 200
56 });
57 if (!opts.body && !opts.q) {
58 opts.body = {
59 query: {
60 match_all: {}
61 }
62 };
63 }
64 if (opts.body) {
65 delete opts.q;
66 } else {
67 delete opts.body;
68 }
69 scrollExec = (function() {
70 var scrollId;
71 scrollId = null;
72 return function(from, callback) {
73 if (scrollId) {
74 return client.scroll({
75 scrollId: scrollId,
76 scroll: '60s'
77 }, callback);
78 } else {
79 return client.search(opts, function(err, res) {
80 scrollId = res != null ? res._scroll_id : void 0;
81 return callback(err, res);
82 });
83 }
84 };
85 })();
86 readable = new ReadableSearch(scrollExec).on('error', function(err) {
87 return stream.emit('error', err);
88 });
89 last = -1;
90 stream = readable.pipe(through2.obj(function(hit, enc, callback) {
91 this.push(hit);
92 if (readable.from !== last) {
93 last = readable.from;
94 stream.emit('progress', {
95 from: last,
96 total: readable.total
97 });
98 }
99 return callback();
100 })).pipe(transform(trans)).pipe(toBulk(operdelete)).pipe(jsonStream()).on('end', function() {
101 if (readable.from !== last) {
102 return stream.emit('progress', {
103 from: readable.total,
104 total: readable.total
105 });
106 }
107 });
108 return stream;
109 };
110
111}).call(this);
112
113//# sourceMappingURL=reader.js.map