1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 | (function() {
|
14 | var Writable, WritableBulk;
|
15 |
|
16 | WritableBulk = function(bulkExec, highWaterMark) {
|
17 | if (!(this instanceof WritableBulk)) {
|
18 | return new WritableBulk(bulkExec, highWaterMark);
|
19 | }
|
20 | Writable.call(this, {
|
21 | objectMode: true
|
22 | });
|
23 | this.bulkExec = bulkExec;
|
24 | this.highWaterMark = highWaterMark || 128;
|
25 | this.bulk = [];
|
26 | this.bulkCount = 0;
|
27 | this.expectingPayload = false;
|
28 | this.on('finish', (function() {
|
29 | this._flushBulk((function() {
|
30 | this.emit('close');
|
31 | }).bind(this));
|
32 | }).bind(this));
|
33 | };
|
34 |
|
35 | 'use strict';
|
36 |
|
37 | Writable = require('stream').Writable;
|
38 |
|
39 | module.exports = WritableBulk;
|
40 |
|
41 | WritableBulk.prototype = Object.create(Writable.prototype, {
|
42 | constructor: {
|
43 | value: WritableBulk
|
44 | }
|
45 | });
|
46 |
|
47 |
|
48 | |
49 |
|
50 |
|
51 |
|
52 | WritableBulk.prototype._write = function(chunk, enc, next) {
|
53 | var i, willExpectPayload;
|
54 | if (this.expectingPayload) {
|
55 | this.bulkCount++;
|
56 | this.expectingPayload = false;
|
57 | } else {
|
58 | willExpectPayload = ['index', 'create', 'update'];
|
59 | i = 0;
|
60 | while (i < willExpectPayload.length) {
|
61 | if (chunk.hasOwnProperty(willExpectPayload[i])) {
|
62 | this.expectingPayload = willExpectPayload[i];
|
63 | break;
|
64 | }
|
65 | i++;
|
66 | }
|
67 | if (!this.expectingPayload) {
|
68 | if (!chunk.hasOwnProperty('delete') && !chunk.hasOwnProperty('alias') && !chunk.hasOwnProperty('mapping') && !chunk.hasOwnProperty('settings') && !chunk.hasOwnProperty('template')) {
|
69 | this.emit('error', new Error('Unexpected chunk, not an ' + 'index/create/update/delete/alias/mapping/settings/template command and ' + 'not a document to index either'));
|
70 | return next();
|
71 | }
|
72 | this.bulkCount++;
|
73 | }
|
74 | }
|
75 | this.bulk.push(chunk);
|
76 | if (this.highWaterMark <= this.bulkCount) {
|
77 | return this._flushBulk(next);
|
78 | }
|
79 | next();
|
80 | };
|
81 |
|
82 | WritableBulk.prototype._flushBulk = function(callback) {
|
83 | var self;
|
84 | if (!this.bulkCount) {
|
85 | return setImmediate(callback);
|
86 | }
|
87 | self = this;
|
88 | this.bulkExec(this.bulk, function(e, resp) {
|
89 | var bulkItemResp, i, key;
|
90 | if (e) {
|
91 | self.emit('error', e);
|
92 | }
|
93 | if (resp.errors && resp.items) {
|
94 | i = 0;
|
95 | while (i < resp.items.length) {
|
96 | bulkItemResp = resp.items[i];
|
97 | key = Object.keys(bulkItemResp)[0];
|
98 | if (bulkItemResp[key].error) {
|
99 | self.emit('error', new Error(bulkItemResp[key].error));
|
100 | }
|
101 | i++;
|
102 | }
|
103 | }
|
104 | self.bulk = [];
|
105 | self.bulkCount = 0;
|
106 | self.expectingPayload = false;
|
107 | callback();
|
108 | });
|
109 | };
|
110 |
|
111 | }).call(this);
|
112 |
|
113 |
|