1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 | ReadableHits = (queryExec, parseHit) ->
|
14 | if !(this instanceof ReadableHits)
|
15 | return new ReadableHits(queryExec)
|
16 | Readable.call this, objectMode: true
|
17 | @queryExec = queryExec
|
18 | @total = -1
|
19 | @from = 0
|
20 | @_next = true
|
21 |
|
22 | @_hits = []
|
23 | @_current = 0
|
24 | @parseHit = parseHit or identity
|
25 | return
|
26 |
|
27 | identity = (hit) ->
|
28 | hit
|
29 |
|
30 | 'use strict'
|
31 | Readable = require('stream').Readable
|
32 | module.exports = ReadableHits
|
33 | ReadableHits.prototype = Object.create(Readable.prototype, constructor: value: ReadableHits)
|
34 |
|
35 | ReadableHits::_read = ->
|
36 |
|
37 | @_current++
|
38 | if @_current >= @_hits.length
|
39 | if !@_next
|
40 | return @push(null)
|
41 | @_fetchNextPage()
|
42 | else
|
43 | @_shift()
|
44 | return
|
45 |
|
46 | ReadableHits::_fetchNextPage = ->
|
47 |
|
48 | self = this
|
49 | @queryExec @from, (e, resp) ->
|
50 | self._current = 0
|
51 | if e
|
52 | self.hits = []
|
53 | self._next = false
|
54 | self.emit 'error', e
|
55 | return self.push(null)
|
56 | self.total = resp.hits.total
|
57 | self._hits = resp.hits.hits
|
58 | self.from += self._hits.length
|
59 | if self.from >= self.total
|
60 | self._next = false
|
61 |
|
62 | if !self._hits.length
|
63 |
|
64 |
|
65 | return self.push(null)
|
66 | self._shift()
|
67 | return
|
68 | return
|
69 |
|
70 | ReadableHits::_shift = ->
|
71 | @push @parseHit(@_hits[@_current])
|
72 | return
|
73 |
|
74 | ReadableHits::destroy = ->
|
75 | if @destroyed
|
76 | return
|
77 | @destroyed = true
|
78 | @_next = false
|
79 | @unpipe()
|
80 | return
|
81 |
|
82 |
|
83 |
|