1 | WritableBulk = require './writable-bulk'
|
2 | through2 = require 'through2'
|
3 | byline = require 'byline'
|
4 | mixin = require './mixin'
|
5 |
|
6 | isTwoRow = (t) ->
|
7 | if typeof t == 'string'
|
8 | t in ['index', 'update', 'create']
|
9 | else
|
10 | t and (t.index or t.update or t.create)
|
11 |
|
12 | OPERS = ['index', 'update', 'create', 'delete', 'alias', 'mapping', 'settings', 'template']
|
13 |
|
14 |
|
15 | isNonStandard = (bulk) ->
|
16 | return true for b in bulk when b.alias or b.mapping or b.settings or b.template
|
17 | false
|
18 |
|
19 |
|
20 | CLEAN_PIPE = {clean:true}
|
21 |
|
22 | fromJson = (emit) ->
|
23 | saved = null
|
24 | find123 = false
|
25 | through2.obj (line, enc, callback) ->
|
26 | if find123
|
27 | if line[0] isnt 123
|
28 | return callback()
|
29 | else
|
30 | find123 = false
|
31 | json = try
|
32 | JSON.parse(line)
|
33 | catch ex
|
34 | if saved
|
35 | {message} = ex
|
36 | emit 'info', "Skipping record, JSON parse failed (#{message})
|
37 | on line after:\n#{JSON.stringify(saved)}"
|
38 | saved = null
|
39 | find123 = true
|
40 | null
|
41 | else
|
42 | throw ex
|
43 | if json == null
|
44 |
|
45 |
|
46 | this.push CLEAN_PIPE
|
47 | else if saved != null
|
48 | this.push saved
|
49 | this.push json
|
50 | saved = null
|
51 | else if isTwoRow(json)
|
52 | saved = json
|
53 | else if json
|
54 | this.push json
|
55 | callback()
|
56 |
|
57 | toDoc = ->
|
58 | d = (oper, source) ->
|
59 | opername = OPERS.find (o) -> oper[o]
|
60 | head = oper[opername]
|
61 | mixin head, _oper:opername, _source:source
|
62 | saved = null
|
63 | through2.obj (row, enc, callback) ->
|
64 | if row == CLEAN_PIPE
|
65 | saved = null
|
66 | else if saved
|
67 | this.push d(saved, row)
|
68 | saved = null
|
69 | else
|
70 | if isTwoRow(row)
|
71 | saved = row
|
72 | else
|
73 | this.push d(row)
|
74 | callback()
|
75 |
|
76 | transform = (operdelete, trans, index, type) ->
|
77 | through2.obj (row, enc, callback) ->
|
78 | if t = trans(row)
|
79 | t._oper = 'delete' if operdelete
|
80 | oper = {}
|
81 | if t._oper == 'alias'
|
82 | oper.alias = {_name:t._name, _index:t._index}
|
83 | else if t._oper == 'mapping'
|
84 | oper.mapping =
|
85 | {_index:(index ? t._index), _type:(type ? t._type), _mapping:t._mapping}
|
86 | else if t._oper == 'settings'
|
87 | oper.settings =
|
88 | {_index:(index ? t._index), _settings:t._settings}
|
89 | else if t._oper == 'template'
|
90 |
|
91 | oper.template =
|
92 | {_name:t._name, _template:t._template}
|
93 | else
|
94 | oper[t._oper] = {_id:t._id, _index:(index ? t._index), _type:(type ? t._type)}
|
95 | this.push oper
|
96 | if isTwoRow(t._oper)
|
97 | this.push t._source
|
98 | callback()
|
99 |
|
100 | module.exports = (client, _opts, operdelete, trans, instream) ->
|
101 |
|
102 | writeAlias = require('./write-alias') client
|
103 | writeMapping = require('./write-mapping') client
|
104 | writeSettings = require('./write-settings') client
|
105 | writeTemplate = require('./write-template') client
|
106 | writeBulk = require('./write-bulk') client, _opts
|
107 |
|
108 | bulkExec = (bulk, callback) ->
|
109 |
|
110 |
|
111 |
|
112 | if isNonStandard(bulk)
|
113 | a = []; m = []; s = []; t = []; b = []
|
114 | for item in bulk
|
115 | if item.alias
|
116 | a.push item
|
117 | else if item.mapping
|
118 | m.push item
|
119 | else if item.settings
|
120 | s.push item
|
121 | else if item.template
|
122 | t.push item
|
123 | else
|
124 | b.push item
|
125 | writeAlias(a, callback) if a.length
|
126 | writeMapping(m, callback) if m.length
|
127 | writeSettings(s, callback) if s.length
|
128 | writeTemplate(t, callback) if t.length
|
129 | writeBulk(b, callback) if b.length
|
130 | else
|
131 | writeBulk bulk, callback
|
132 |
|
133 | count = 0
|
134 |
|
135 | fromJsonS = fromJson (as...) -> stream.emit as...
|
136 |
|
137 | stream = instream
|
138 | .pipe byline.createStream()
|
139 | .pipe fromJsonS
|
140 | .pipe toDoc()
|
141 | .pipe through2.obj (doc, enc, callback) ->
|
142 | count++
|
143 | this.push doc
|
144 | callback()
|
145 | .pipe transform(operdelete, trans, _opts.index, _opts.type)
|
146 | .pipe through2.obj (doc, enc, callback) ->
|
147 | this.push doc
|
148 | callback()
|
149 | .pipe new WritableBulk (bulk, callback) ->
|
150 | stream.emit 'progress', {count}
|
151 | bulkExec(bulk, callback)
|
152 |
|
153 | stream
|