1 |
|
2 | module.exports = (client) -> (bulk, callback) ->
|
3 | bulk.reduce (p, c) ->
|
4 | p.then ->
|
5 | t = c.settings
|
6 | {number_of_replicas, number_of_shards} = t._settings?.index ? {}
|
7 | if t._settings.index
|
8 | delete t._settings.index.number_of_replicas
|
9 | delete t._settings.index.number_of_shards
|
10 | putsettings = ->
|
11 | attempt = ->
|
12 | (if t._settings.analysis
|
13 | client.indices.close index:t._index
|
14 | else
|
15 | Promise.resolve()
|
16 | ).then ->
|
17 | client.indices.putSettings {index:t._index, body:t._settings}
|
18 | .then (res) ->
|
19 | (if t._settings.analysis
|
20 | client.indices.open index:t._index
|
21 | else
|
22 | Promise.resolve()
|
23 | ).then -> res
|
24 | .catch (err) ->
|
25 | if err.body?.error?.type == 'index_primary_shard_not_allocated_exception'
|
26 |
|
27 | attempt()
|
28 | else
|
29 | throw err
|
30 | attempt()
|
31 | putsettings().catch (err) ->
|
32 | if err.status == 404
|
33 | opts = {index:t._index, body:{}}
|
34 | if number_of_replicas?
|
35 | opts.body.number_of_replicas = number_of_replicas
|
36 | if number_of_shards?
|
37 | opts.body.number_of_shards = number_of_shards
|
38 | client.indices.create opts
|
39 | .then putsettings
|
40 | else
|
41 | throw err
|
42 | , Promise.resolve()
|
43 | .then ->
|
44 | callback null, {}
|
45 | .catch (err) ->
|
46 | callback err
|