1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 | WritableBulk = (bulkExec, highWaterMark) ->
|
11 | if !(this instanceof WritableBulk)
|
12 | return new WritableBulk(bulkExec, highWaterMark)
|
13 | Writable.call this, objectMode: true
|
14 | @bulkExec = bulkExec
|
15 | @highWaterMark = highWaterMark or 128
|
16 | @bulk = []
|
17 | @bulkCount = 0
|
18 | @expectingPayload = false
|
19 |
|
20 |
|
21 |
|
22 | @on 'finish', (->
|
23 | @_flushBulk (->
|
24 | @emit 'close'
|
25 | return
|
26 | ).bind(this)
|
27 | return
|
28 | ).bind(this)
|
29 | return
|
30 |
|
31 | 'use strict'
|
32 | Writable = require('stream').Writable
|
33 | module.exports = WritableBulk
|
34 | WritableBulk.prototype = Object.create(Writable.prototype, constructor: value: WritableBulk)
|
35 |
|
36 |
|
37 |
|
38 |
|
39 |
|
40 | WritableBulk::_write = (chunk, enc, next) ->
|
41 | if @expectingPayload
|
42 | @bulkCount++
|
43 | @expectingPayload = false
|
44 | else
|
45 | willExpectPayload = [
|
46 | 'index'
|
47 | 'create'
|
48 | 'update'
|
49 | ]
|
50 | i = 0
|
51 | while i < willExpectPayload.length
|
52 | if chunk.hasOwnProperty(willExpectPayload[i])
|
53 | @expectingPayload = willExpectPayload[i]
|
54 | break
|
55 | i++
|
56 | if !@expectingPayload
|
57 | if !chunk.hasOwnProperty('delete') and !chunk.hasOwnProperty('alias') and !chunk.hasOwnProperty('mapping') and !chunk.hasOwnProperty('settings') and !chunk.hasOwnProperty('template')
|
58 | @emit 'error', new Error('Unexpected chunk, not an ' + 'index/create/update/delete/alias/mapping/settings/template command and ' + 'not a document to index either')
|
59 | return next()
|
60 | @bulkCount++
|
61 | @bulk.push chunk
|
62 | if @highWaterMark <= @bulkCount
|
63 | return @_flushBulk(next)
|
64 | next()
|
65 | return
|
66 |
|
67 | WritableBulk::_flushBulk = (callback) ->
|
68 | if !@bulkCount
|
69 | return setImmediate(callback)
|
70 | self = this
|
71 | @bulkExec @bulk, (e, resp) ->
|
72 | if e
|
73 | self.emit 'error', e
|
74 | if resp.errors and resp.items
|
75 | i = 0
|
76 | while i < resp.items.length
|
77 | bulkItemResp = resp.items[i]
|
78 | key = Object.keys(bulkItemResp)[0]
|
79 | if bulkItemResp[key].error
|
80 | self.emit 'error', new Error(bulkItemResp[key].error)
|
81 | i++
|
82 | self.bulk = []
|
83 | self.bulkCount = 0
|
84 | self.expectingPayload = false
|
85 | callback()
|
86 | return
|
87 | return
|
88 |
|
89 |
|
90 |
|