UNPKG

1.84 kBtext/coffeescriptView Raw
1ReadableSearch = require './readable-search'
2through2 = require 'through2'
3mixin = require './mixin'
4
5toBulk = (operdelete) -> through2.obj (doc, enc, callback) ->
6 idx = {_index: doc._index, _type:doc._type, _id:doc._id}
7 if operdelete
8 this.push delete:idx
9 else
10 this.push index:idx
11 this.push doc._source
12 callback()
13
14transform = (fn) -> through2.obj (doc, enc, callback) ->
15 tdoc = fn(doc)
16 if tdoc
17 this.push tdoc
18 callback()
19
20jsonStream = -> through2.obj (chunk, enc, callback) ->
21 this.push(JSON.stringify(chunk) + "\n")
22 callback()
23
24
25module.exports = (client, _opts, operdelete, trans) ->
26
27 opts = mixin _opts, {scroll:'60s', size:200}
28
29 # need some kind of query
30 if !opts.body and !opts.q
31 opts.body = query:match_all:{}
32
33 # body wins over q, not both at the same time
34 if opts.body
35 delete opts.q
36 else
37 delete opts.body
38
39 scrollExec = do ->
40 scrollId = null
41 (from, callback) ->
42 if scrollId
43 client.scroll({scrollId, scroll:'60s'}, callback)
44 else
45 client.search opts, (err, res) ->
46 scrollId = res?._scroll_id
47 callback(err, res)
48
49 readable = new ReadableSearch scrollExec
50 .on 'error', (err) ->
51 stream.emit 'error', err
52
53 last = -1
54
55 stream = readable
56 .pipe through2.obj (hit, enc, callback) ->
57 this.push hit
58 if readable.from != last
59 last = readable.from
60 stream.emit 'progress', {from:last, total:readable.total}
61 callback()
62 .pipe transform(trans)
63 .pipe toBulk(operdelete)
64 .pipe jsonStream()
65 .on 'end', ->
66 if readable.from != last
67 stream.emit 'progress', {from:readable.total, total:readable.total}
68
69 stream