UNPKG

5.13 kBtext/coffeescriptView Raw
1WritableBulk = require './writable-bulk'
2through2 = require 'through2'
3byline = require 'byline'
4mixin = require './mixin'
5
6isTwoRow = (t) ->
7 if typeof t == 'string'
8 t in ['index', 'update', 'create']
9 else
10 t and (t.index or t.update or t.create)
11
12OPERS = ['index', 'update', 'create', 'delete', 'alias', 'mapping', 'settings', 'template']
13
14# test if bulk contains any non-standard bulk operations (alias, mapping or settings)
15isNonStandard = (bulk) ->
16 return true for b in bulk when b.alias or b.mapping or b.settings or b.template
17 false
18
19# value passed down the pipe to clean the state
20CLEAN_PIPE = {clean:true}
21
22fromJson = (emit) ->
23 saved = null # index are saved with next
24 find123 = false # 123 is {
25 through2.obj (line, enc, callback) ->
26 if find123
27 if line[0] isnt 123
28 return callback()
29 else
30 find123 = false
31 json = try
32 JSON.parse(line)
33 catch ex
34 if saved
35 {message} = ex
36 emit 'info', "Skipping record, JSON parse failed (#{message})
37 on line after:\n#{JSON.stringify(saved)}"
38 saved = null
39 find123 = true # skip data until we find the next { at a start of a line
40 null # null to json
41 else
42 throw ex
43 if json == null
44 # null is skipped, may be from catching an exception above
45 # we must clean the pipe
46 this.push CLEAN_PIPE
47 else if saved != null
48 this.push saved
49 this.push json
50 saved = null
51 else if isTwoRow(json)
52 saved = json
53 else if json
54 this.push json
55 callback()
56
57toDoc = ->
58 d = (oper, source) ->
59 opername = OPERS.find (o) -> oper[o]
60 head = oper[opername]
61 mixin head, _oper:opername, _source:source
62 saved = null
63 through2.obj (row, enc, callback) ->
64 if row == CLEAN_PIPE
65 saved = null
66 else if saved
67 this.push d(saved, row)
68 saved = null
69 else
70 if isTwoRow(row)
71 saved = row
72 else
73 this.push d(row)
74 callback()
75
76transform = (operdelete, trans, index, type) ->
77 through2.obj (row, enc, callback) ->
78 if t = trans(row)
79 t._oper = 'delete' if operdelete
80 oper = {}
81 if t._oper == 'alias'
82 oper.alias = {_name:t._name, _index:t._index}
83 else if t._oper == 'mapping'
84 oper.mapping =
85 {_index:(index ? t._index), _type:(type ? t._type), _mapping:t._mapping}
86 else if t._oper == 'settings'
87 oper.settings =
88 {_index:(index ? t._index), _settings:t._settings}
89 else if t._oper == 'template'
90 # name is not overridable by the url
91 oper.template =
92 {_name:t._name, _template:t._template}
93 else
94 oper[t._oper] = {_id:t._id, _index:(index ? t._index), _type:(type ? t._type)}
95 this.push oper
96 if isTwoRow(t._oper)
97 this.push t._source
98 callback()
99
100module.exports = (client, _opts, operdelete, trans, instream) ->
101
102 writeAlias = require('./write-alias') client
103 writeMapping = require('./write-mapping') client
104 writeSettings = require('./write-settings') client
105 writeTemplate = require('./write-template') client
106 writeBulk = require('./write-bulk') client, _opts
107
108 bulkExec = (bulk, callback) ->
109 # we try if any items are non-standard in which case
110 # we must separate the bulk in different buckets
111 # otherwise we just keep it intact
112 if isNonStandard(bulk)
113 a = []; m = []; s = []; t = []; b = []
114 for item in bulk
115 if item.alias
116 a.push item
117 else if item.mapping
118 m.push item
119 else if item.settings
120 s.push item
121 else if item.template
122 t.push item
123 else
124 b.push item
125 writeAlias(a, callback) if a.length
126 writeMapping(m, callback) if m.length
127 writeSettings(s, callback) if s.length
128 writeTemplate(t, callback) if t.length
129 writeBulk(b, callback) if b.length
130 else
131 writeBulk bulk, callback
132
133 count = 0
134
135 fromJsonS = fromJson (as...) -> stream.emit as...
136
137 stream = instream
138 .pipe byline.createStream() # ensure we get whole lines
139 .pipe fromJsonS
140 .pipe toDoc()
141 .pipe through2.obj (doc, enc, callback) ->
142 count++
143 this.push doc
144 callback()
145 .pipe transform(operdelete, trans, _opts.index, _opts.type)
146 .pipe through2.obj (doc, enc, callback) ->
147 this.push doc
148 callback()
149 .pipe new WritableBulk (bulk, callback) ->
150 stream.emit 'progress', {count}
151 bulkExec(bulk, callback)
152
153 stream