UNPKG

ductile

Version:

Elasticsearch Bulk Loader

146 lines (131 loc) 4.74 kB
WritableBulk = require './writable-bulk' through2 = require 'through2' byline = require 'byline' mixin = require './mixin' isTwoRow = (t) -> if typeof t == 'string' t in ['index', 'update', 'create'] else t and (t.index or t.update or t.create) OPERS = ['index', 'update', 'create', 'delete', 'alias', 'mapping', 'settings'] # test if bulk contains any non-standard bulk operations (alias, mapping or settings) isNonStandard = (bulk) -> return true for b in bulk when b.alias or b.mapping or b.settings false # value passed down the pipe to clean the state CLEAN_PIPE = {clean:true} fromJson = (emit) -> saved = null # index are saved with next find123 = false # 123 is { through2.obj (line, enc, callback) -> if find123 if line[0] isnt 123 return callback() else find123 = false json = try JSON.parse(line) catch ex if saved {message} = ex emit 'info', "Skipping record, JSON parse failed (#{message}) on line after:\n#{JSON.stringify(saved)}" saved = null find123 = true # skip data until we find the next { at a start of a line null # null to json else throw ex if json == null # null is skipped, may be from catching an exception above # we must clean the pipe this.push CLEAN_PIPE else if saved != null this.push saved this.push json saved = null else if isTwoRow(json) saved = json else if json this.push json callback() toDoc = -> d = (oper, source) -> opername = OPERS.find (o) -> oper[o] head = oper[opername] mixin head, _oper:opername, _source:source saved = null through2.obj (row, enc, callback) -> if row == CLEAN_PIPE saved = null else if saved this.push d(saved, row) saved = null else if isTwoRow(row) saved = row else this.push d(row) callback() transform = (operdelete, trans, index, type) -> through2.obj (row, enc, callback) -> if t = trans(row) t._oper = 'delete' if operdelete oper = {} if t._oper == 'alias' oper.alias = {_name:t._name, _index:t._index} else if t._oper == 'mapping' oper.mapping = {_index:(index ? t._index), _type:(type ? t._type), _mapping:t._mapping} else if t._oper == 'settings' oper.settings = {_index:(index ? t._index), _settings:t._settings} else oper[t._oper] = {_id:t._id, _index:(index ? t._index), _type:(type ? t._type)} this.push oper if isTwoRow(t._oper) this.push t._source callback() module.exports = (client, _opts, operdelete, trans, instream) -> writeAlias = require('./write-alias') client writeMapping = require('./write-mapping') client writeSettings = require('./write-settings') client writeBulk = require('./write-bulk') client, _opts bulkExec = (bulk, callback) -> # we try if any items are non-standard in which case # we must separate the bulk in different buckets # otherwise we just keep it intact if isNonStandard(bulk) a = []; m = []; s = []; b = [] for item in bulk if item.alias a.push item else if item.mapping m.push item else if item.settings s.push item else b.push item writeAlias(a, callback) if a.length writeMapping(m, callback) if m.length writeSettings(s, callback) if s.length writeBulk(b, callback) if b.length else writeBulk bulk, callback count = 0 fromJsonS = fromJson (as...) -> stream.emit as... stream = instream .pipe byline.createStream() # ensure we get whole lines .pipe fromJsonS .pipe toDoc() .pipe through2.obj (doc, enc, callback) -> count++ this.push doc callback() .pipe transform(operdelete, trans, _opts.index, _opts.type) .pipe through2.obj (doc, enc, callback) -> this.push doc callback() .pipe new WritableBulk (bulk, callback) -> stream.emit 'progress', {count} bulkExec(bulk, callback) stream