UNPKG

6.22 kBJavaScriptView Raw
1// Generated by CoffeeScript 1.11.1
2(function() {
3 var CLEAN_PIPE, OPERS, WritableBulk, byline, fromJson, isNonStandard, isTwoRow, mixin, through2, toDoc, transform,
4 slice = [].slice;
5
6 WritableBulk = require('./writable-bulk');
7
8 through2 = require('through2');
9
10 byline = require('byline');
11
12 mixin = require('./mixin');
13
14 isTwoRow = function(t) {
15 if (typeof t === 'string') {
16 return t === 'index' || t === 'update' || t === 'create';
17 } else {
18 return t && (t.index || t.update || t.create);
19 }
20 };
21
22 OPERS = ['index', 'update', 'create', 'delete', 'alias', 'mapping', 'settings', 'template'];
23
24 isNonStandard = function(bulk) {
25 var b, i, len;
26 for (i = 0, len = bulk.length; i < len; i++) {
27 b = bulk[i];
28 if (b.alias || b.mapping || b.settings || b.template) {
29 return true;
30 }
31 }
32 return false;
33 };
34
35 CLEAN_PIPE = {
36 clean: true
37 };
38
39 fromJson = function(emit) {
40 var find123, saved;
41 saved = null;
42 find123 = false;
43 return through2.obj(function(line, enc, callback) {
44 var ex, json, message;
45 if (find123) {
46 if (line[0] !== 123) {
47 return callback();
48 } else {
49 find123 = false;
50 }
51 }
52 json = (function() {
53 try {
54 return JSON.parse(line);
55 } catch (error) {
56 ex = error;
57 if (saved) {
58 message = ex.message;
59 emit('info', "Skipping record, JSON parse failed (" + message + ") on line after:\n" + (JSON.stringify(saved)));
60 saved = null;
61 find123 = true;
62 return null;
63 } else {
64 throw ex;
65 }
66 }
67 })();
68 if (json === null) {
69 this.push(CLEAN_PIPE);
70 } else if (saved !== null) {
71 this.push(saved);
72 this.push(json);
73 saved = null;
74 } else if (isTwoRow(json)) {
75 saved = json;
76 } else if (json) {
77 this.push(json);
78 }
79 return callback();
80 });
81 };
82
83 toDoc = function() {
84 var d, saved;
85 d = function(oper, source) {
86 var head, opername;
87 opername = OPERS.find(function(o) {
88 return oper[o];
89 });
90 head = oper[opername];
91 return mixin(head, {
92 _oper: opername,
93 _source: source
94 });
95 };
96 saved = null;
97 return through2.obj(function(row, enc, callback) {
98 if (row === CLEAN_PIPE) {
99 saved = null;
100 } else if (saved) {
101 this.push(d(saved, row));
102 saved = null;
103 } else {
104 if (isTwoRow(row)) {
105 saved = row;
106 } else {
107 this.push(d(row));
108 }
109 }
110 return callback();
111 });
112 };
113
114 transform = function(operdelete, trans, index, type) {
115 return through2.obj(function(row, enc, callback) {
116 var oper, t;
117 if (t = trans(row)) {
118 if (operdelete) {
119 t._oper = 'delete';
120 }
121 oper = {};
122 if (t._oper === 'alias') {
123 oper.alias = {
124 _name: t._name,
125 _index: t._index
126 };
127 } else if (t._oper === 'mapping') {
128 oper.mapping = {
129 _index: index != null ? index : t._index,
130 _type: type != null ? type : t._type,
131 _mapping: t._mapping
132 };
133 } else if (t._oper === 'settings') {
134 oper.settings = {
135 _index: index != null ? index : t._index,
136 _settings: t._settings
137 };
138 } else if (t._oper === 'template') {
139 oper.template = {
140 _name: t._name,
141 _template: t._template
142 };
143 } else {
144 oper[t._oper] = {
145 _id: t._id,
146 _index: index != null ? index : t._index,
147 _type: type != null ? type : t._type
148 };
149 }
150 this.push(oper);
151 if (isTwoRow(t._oper)) {
152 this.push(t._source);
153 }
154 }
155 return callback();
156 });
157 };
158
159 module.exports = function(client, _opts, operdelete, trans, instream) {
160 var bulkExec, count, fromJsonS, stream, writeAlias, writeBulk, writeMapping, writeSettings, writeTemplate;
161 writeAlias = require('./write-alias')(client);
162 writeMapping = require('./write-mapping')(client);
163 writeSettings = require('./write-settings')(client);
164 writeTemplate = require('./write-template')(client);
165 writeBulk = require('./write-bulk')(client, _opts);
166 bulkExec = function(bulk, callback) {
167 var a, b, i, item, len, m, s, t;
168 if (isNonStandard(bulk)) {
169 a = [];
170 m = [];
171 s = [];
172 t = [];
173 b = [];
174 for (i = 0, len = bulk.length; i < len; i++) {
175 item = bulk[i];
176 if (item.alias) {
177 a.push(item);
178 } else if (item.mapping) {
179 m.push(item);
180 } else if (item.settings) {
181 s.push(item);
182 } else if (item.template) {
183 t.push(item);
184 } else {
185 b.push(item);
186 }
187 }
188 if (a.length) {
189 writeAlias(a, callback);
190 }
191 if (m.length) {
192 writeMapping(m, callback);
193 }
194 if (s.length) {
195 writeSettings(s, callback);
196 }
197 if (t.length) {
198 writeTemplate(t, callback);
199 }
200 if (b.length) {
201 return writeBulk(b, callback);
202 }
203 } else {
204 return writeBulk(bulk, callback);
205 }
206 };
207 count = 0;
208 fromJsonS = fromJson(function() {
209 var as;
210 as = 1 <= arguments.length ? slice.call(arguments, 0) : [];
211 return stream.emit.apply(stream, as);
212 });
213 stream = instream.pipe(byline.createStream()).pipe(fromJsonS).pipe(toDoc()).pipe(through2.obj(function(doc, enc, callback) {
214 count++;
215 this.push(doc);
216 return callback();
217 })).pipe(transform(operdelete, trans, _opts.index, _opts.type)).pipe(through2.obj(function(doc, enc, callback) {
218 this.push(doc);
219 return callback();
220 })).pipe(new WritableBulk(function(bulk, callback) {
221 stream.emit('progress', {
222 count: count
223 });
224 return bulkExec(bulk, callback);
225 }));
226 return stream;
227 };
228
229}).call(this);
230
231//# sourceMappingURL=writer.js.map