1 |
|
2 | (function() {
|
3 | var CLEAN_PIPE, OPERS, WritableBulk, byline, fromJson, isNonStandard, isTwoRow, mixin, through2, toDoc, transform,
|
4 | slice = [].slice;
|
5 |
|
6 | WritableBulk = require('./writable-bulk');
|
7 |
|
8 | through2 = require('through2');
|
9 |
|
10 | byline = require('byline');
|
11 |
|
12 | mixin = require('./mixin');
|
13 |
|
14 | isTwoRow = function(t) {
|
15 | if (typeof t === 'string') {
|
16 | return t === 'index' || t === 'update' || t === 'create';
|
17 | } else {
|
18 | return t && (t.index || t.update || t.create);
|
19 | }
|
20 | };
|
21 |
|
22 | OPERS = ['index', 'update', 'create', 'delete', 'alias', 'mapping', 'settings', 'template'];
|
23 |
|
24 | isNonStandard = function(bulk) {
|
25 | var b, i, len;
|
26 | for (i = 0, len = bulk.length; i < len; i++) {
|
27 | b = bulk[i];
|
28 | if (b.alias || b.mapping || b.settings || b.template) {
|
29 | return true;
|
30 | }
|
31 | }
|
32 | return false;
|
33 | };
|
34 |
|
35 | CLEAN_PIPE = {
|
36 | clean: true
|
37 | };
|
38 |
|
39 | fromJson = function(emit) {
|
40 | var find123, saved;
|
41 | saved = null;
|
42 | find123 = false;
|
43 | return through2.obj(function(line, enc, callback) {
|
44 | var ex, json, message;
|
45 | if (find123) {
|
46 | if (line[0] !== 123) {
|
47 | return callback();
|
48 | } else {
|
49 | find123 = false;
|
50 | }
|
51 | }
|
52 | json = (function() {
|
53 | try {
|
54 | return JSON.parse(line);
|
55 | } catch (error) {
|
56 | ex = error;
|
57 | if (saved) {
|
58 | message = ex.message;
|
59 | emit('info', "Skipping record, JSON parse failed (" + message + ") on line after:\n" + (JSON.stringify(saved)));
|
60 | saved = null;
|
61 | find123 = true;
|
62 | return null;
|
63 | } else {
|
64 | throw ex;
|
65 | }
|
66 | }
|
67 | })();
|
68 | if (json === null) {
|
69 | this.push(CLEAN_PIPE);
|
70 | } else if (saved !== null) {
|
71 | this.push(saved);
|
72 | this.push(json);
|
73 | saved = null;
|
74 | } else if (isTwoRow(json)) {
|
75 | saved = json;
|
76 | } else if (json) {
|
77 | this.push(json);
|
78 | }
|
79 | return callback();
|
80 | });
|
81 | };
|
82 |
|
83 | toDoc = function() {
|
84 | var d, saved;
|
85 | d = function(oper, source) {
|
86 | var head, opername;
|
87 | opername = OPERS.find(function(o) {
|
88 | return oper[o];
|
89 | });
|
90 | head = oper[opername];
|
91 | return mixin(head, {
|
92 | _oper: opername,
|
93 | _source: source
|
94 | });
|
95 | };
|
96 | saved = null;
|
97 | return through2.obj(function(row, enc, callback) {
|
98 | if (row === CLEAN_PIPE) {
|
99 | saved = null;
|
100 | } else if (saved) {
|
101 | this.push(d(saved, row));
|
102 | saved = null;
|
103 | } else {
|
104 | if (isTwoRow(row)) {
|
105 | saved = row;
|
106 | } else {
|
107 | this.push(d(row));
|
108 | }
|
109 | }
|
110 | return callback();
|
111 | });
|
112 | };
|
113 |
|
114 | transform = function(operdelete, trans, index, type) {
|
115 | return through2.obj(function(row, enc, callback) {
|
116 | var oper, t;
|
117 | if (t = trans(row)) {
|
118 | if (operdelete) {
|
119 | t._oper = 'delete';
|
120 | }
|
121 | oper = {};
|
122 | if (t._oper === 'alias') {
|
123 | oper.alias = {
|
124 | _name: t._name,
|
125 | _index: t._index
|
126 | };
|
127 | } else if (t._oper === 'mapping') {
|
128 | oper.mapping = {
|
129 | _index: index != null ? index : t._index,
|
130 | _type: type != null ? type : t._type,
|
131 | _mapping: t._mapping
|
132 | };
|
133 | } else if (t._oper === 'settings') {
|
134 | oper.settings = {
|
135 | _index: index != null ? index : t._index,
|
136 | _settings: t._settings
|
137 | };
|
138 | } else if (t._oper === 'template') {
|
139 | oper.template = {
|
140 | _name: t._name,
|
141 | _template: t._template
|
142 | };
|
143 | } else {
|
144 | oper[t._oper] = {
|
145 | _id: t._id,
|
146 | _index: index != null ? index : t._index,
|
147 | _type: type != null ? type : t._type
|
148 | };
|
149 | }
|
150 | this.push(oper);
|
151 | if (isTwoRow(t._oper)) {
|
152 | this.push(t._source);
|
153 | }
|
154 | }
|
155 | return callback();
|
156 | });
|
157 | };
|
158 |
|
159 | module.exports = function(client, _opts, operdelete, trans, instream) {
|
160 | var bulkExec, count, fromJsonS, stream, writeAlias, writeBulk, writeMapping, writeSettings, writeTemplate;
|
161 | writeAlias = require('./write-alias')(client);
|
162 | writeMapping = require('./write-mapping')(client);
|
163 | writeSettings = require('./write-settings')(client);
|
164 | writeTemplate = require('./write-template')(client);
|
165 | writeBulk = require('./write-bulk')(client, _opts);
|
166 | bulkExec = function(bulk, callback) {
|
167 | var a, b, i, item, len, m, s, t;
|
168 | if (isNonStandard(bulk)) {
|
169 | a = [];
|
170 | m = [];
|
171 | s = [];
|
172 | t = [];
|
173 | b = [];
|
174 | for (i = 0, len = bulk.length; i < len; i++) {
|
175 | item = bulk[i];
|
176 | if (item.alias) {
|
177 | a.push(item);
|
178 | } else if (item.mapping) {
|
179 | m.push(item);
|
180 | } else if (item.settings) {
|
181 | s.push(item);
|
182 | } else if (item.template) {
|
183 | t.push(item);
|
184 | } else {
|
185 | b.push(item);
|
186 | }
|
187 | }
|
188 | if (a.length) {
|
189 | writeAlias(a, callback);
|
190 | }
|
191 | if (m.length) {
|
192 | writeMapping(m, callback);
|
193 | }
|
194 | if (s.length) {
|
195 | writeSettings(s, callback);
|
196 | }
|
197 | if (t.length) {
|
198 | writeTemplate(t, callback);
|
199 | }
|
200 | if (b.length) {
|
201 | return writeBulk(b, callback);
|
202 | }
|
203 | } else {
|
204 | return writeBulk(bulk, callback);
|
205 | }
|
206 | };
|
207 | count = 0;
|
208 | fromJsonS = fromJson(function() {
|
209 | var as;
|
210 | as = 1 <= arguments.length ? slice.call(arguments, 0) : [];
|
211 | return stream.emit.apply(stream, as);
|
212 | });
|
213 | stream = instream.pipe(byline.createStream()).pipe(fromJsonS).pipe(toDoc()).pipe(through2.obj(function(doc, enc, callback) {
|
214 | count++;
|
215 | this.push(doc);
|
216 | return callback();
|
217 | })).pipe(transform(operdelete, trans, _opts.index, _opts.type)).pipe(through2.obj(function(doc, enc, callback) {
|
218 | this.push(doc);
|
219 | return callback();
|
220 | })).pipe(new WritableBulk(function(bulk, callback) {
|
221 | stream.emit('progress', {
|
222 | count: count
|
223 | });
|
224 | return bulkExec(bulk, callback);
|
225 | }));
|
226 | return stream;
|
227 | };
|
228 |
|
229 | }).call(this);
|
230 |
|
231 |
|