UNPKG

2.66 kBtext/coffeescriptView Raw
1###*
2# Expose a writeable stream and execute it as a set of bulk requests.
3###
4
5###*
6# @param bulkExec closure invoked with the bulk cmds as an array and a callback
7# @param highWaterMark number of bulk commands executed at once. 128 by default.
8###
9
10WritableBulk = (bulkExec, highWaterMark) ->
11 if !(this instanceof WritableBulk)
12 return new WritableBulk(bulkExec, highWaterMark)
13 Writable.call this, objectMode: true
14 @bulkExec = bulkExec
15 @highWaterMark = highWaterMark or 128
16 @bulk = []
17 @bulkCount = 0
18 @expectingPayload = false
19 # when end is called we still need to flush but we must not overwrite end ourself.
20 # now we need to tell everyone to listen to the close event to know when we are done.
21 # Not great. See: https://github.com/joyent/node/issues/5315#issuecomment-16670354
22 @on 'finish', (->
23 @_flushBulk (->
24 @emit 'close'
25 return
26 ).bind(this)
27 return
28 ).bind(this)
29 return
30
31'use strict'
32Writable = require('stream').Writable
33module.exports = WritableBulk
34WritableBulk.prototype = Object.create(Writable.prototype, constructor: value: WritableBulk)
35
36###*
37# @param chunk a piece of a bulk request as json.
38###
39
40WritableBulk::_write = (chunk, enc, next) ->
41 if @expectingPayload
42 @bulkCount++
43 @expectingPayload = false
44 else
45 willExpectPayload = [
46 'index'
47 'create'
48 'update'
49 ]
50 i = 0
51 while i < willExpectPayload.length
52 if chunk.hasOwnProperty(willExpectPayload[i])
53 @expectingPayload = willExpectPayload[i]
54 break
55 i++
56 if !@expectingPayload
57 if !chunk.hasOwnProperty('delete') and !chunk.hasOwnProperty('alias') and !chunk.hasOwnProperty('mapping') and !chunk.hasOwnProperty('settings') and !chunk.hasOwnProperty('template')
58 @emit 'error', new Error('Unexpected chunk, not an ' + 'index/create/update/delete/alias/mapping/settings/template command and ' + 'not a document to index either')
59 return next()
60 @bulkCount++
61 @bulk.push chunk
62 if @highWaterMark <= @bulkCount
63 return @_flushBulk(next)
64 next()
65 return
66
67WritableBulk::_flushBulk = (callback) ->
68 if !@bulkCount
69 return setImmediate(callback)
70 self = this
71 @bulkExec @bulk, (e, resp) ->
72 if e
73 self.emit 'error', e
74 if resp.errors and resp.items
75 i = 0
76 while i < resp.items.length
77 bulkItemResp = resp.items[i]
78 key = Object.keys(bulkItemResp)[0]
79 if bulkItemResp[key].error
80 self.emit 'error', new Error(bulkItemResp[key].error)
81 i++
82 self.bulk = []
83 self.bulkCount = 0
84 self.expectingPayload = false
85 callback()
86 return
87 return
88
89# ---
90# generated by js2coffee 2.2.0