1 | ReadableSearch = require './readable-search'
|
2 | through2 = require 'through2'
|
3 | mixin = require './mixin'
|
4 |
|
5 | toBulk = (operdelete) -> through2.obj (doc, enc, callback) ->
|
6 | idx = {_index: doc._index, _type:doc._type, _id:doc._id}
|
7 | if operdelete
|
8 | this.push delete:idx
|
9 | else
|
10 | this.push index:idx
|
11 | this.push doc._source
|
12 | callback()
|
13 |
|
14 | transform = (fn) -> through2.obj (doc, enc, callback) ->
|
15 | tdoc = fn(doc)
|
16 | if tdoc
|
17 | this.push tdoc
|
18 | callback()
|
19 |
|
20 | jsonStream = -> through2.obj (chunk, enc, callback) ->
|
21 | this.push(JSON.stringify(chunk) + "\n")
|
22 | callback()
|
23 |
|
24 |
|
25 | module.exports = (client, _opts, operdelete, trans) ->
|
26 |
|
27 | opts = mixin _opts, {scroll:'60s', size:200}
|
28 |
|
29 |
|
30 | if !opts.body and !opts.q
|
31 | opts.body = query:match_all:{}
|
32 |
|
33 |
|
34 | if opts.body
|
35 | delete opts.q
|
36 | else
|
37 | delete opts.body
|
38 |
|
39 | scrollExec = do ->
|
40 | scrollId = null
|
41 | (from, callback) ->
|
42 | if scrollId
|
43 | client.scroll({scrollId, scroll:'60s'}, callback)
|
44 | else
|
45 | client.search opts, (err, res) ->
|
46 | scrollId = res?._scroll_id
|
47 | callback(err, res)
|
48 |
|
49 | readable = new ReadableSearch scrollExec
|
50 | .on 'error', (err) ->
|
51 | stream.emit 'error', err
|
52 |
|
53 | last = -1
|
54 |
|
55 | stream = readable
|
56 | .pipe through2.obj (hit, enc, callback) ->
|
57 | this.push hit
|
58 | if readable.from != last
|
59 | last = readable.from
|
60 | stream.emit 'progress', {from:last, total:readable.total}
|
61 | callback()
|
62 | .pipe transform(trans)
|
63 | .pipe toBulk(operdelete)
|
64 | .pipe jsonStream()
|
65 | .on 'end', ->
|
66 | if readable.from != last
|
67 | stream.emit 'progress', {from:readable.total, total:readable.total}
|
68 |
|
69 | stream
|