UNPKG

1.93 kBtext/coffeescriptView Raw
1###*
2# Expose an elasticsearch query that returns hits or docs as a stream of hits or docs.
3#
4# Expect the query to be a JSON object where the from property defines the offset
5# and the limit defines the page size.
6# Expect the client to return a parsed JSON.
7###
8
9###*
10# @param queryExec an executable query functions that takes 2 arguments: the offset and a callback.
11###
12
13ReadableHits = (queryExec, parseHit) ->
14 if !(this instanceof ReadableHits)
15 return new ReadableHits(queryExec)
16 Readable.call this, objectMode: true
17 @queryExec = queryExec
18 @total = -1
19 @from = 0
20 @_next = true
21 # current iteration through the page
22 @_hits = []
23 @_current = 0
24 @parseHit = parseHit or identity
25 return
26
27identity = (hit) ->
28 hit
29
30'use strict'
31Readable = require('stream').Readable
32module.exports = ReadableHits
33ReadableHits.prototype = Object.create(Readable.prototype, constructor: value: ReadableHits)
34
35ReadableHits::_read = ->
36 #size) {
37 @_current++
38 if @_current >= @_hits.length
39 if !@_next
40 return @push(null)
41 @_fetchNextPage()
42 else
43 @_shift()
44 return
45
46ReadableHits::_fetchNextPage = ->
47 #size) {
48 self = this
49 @queryExec @from, (e, resp) ->
50 self._current = 0
51 if e
52 self.hits = []
53 self._next = false
54 self.emit 'error', e
55 return self.push(null)
56 self.total = resp.hits.total
57 self._hits = resp.hits.hits
58 self.from += self._hits.length
59 if self.from >= self.total
60 self._next = false
61 # we got them all.
62 if !self._hits.length
63 # nothing here: end the stream.
64 # self._next = false; // precaution but we should not really need this line.
65 return self.push(null)
66 self._shift()
67 return
68 return
69
70ReadableHits::_shift = ->
71 @push @parseHit(@_hits[@_current])
72 return
73
74ReadableHits::destroy = ->
75 if @destroyed
76 return
77 @destroyed = true
78 @_next = false
79 @unpipe()
80 return
81
82# ---
83# generated by js2coffee 2.2.0