UNPKG

3.03 kBJavaScriptView Raw
1// Generated by CoffeeScript 1.11.1
2
3/**
4 * Expose a writeable stream and execute it as a set of bulk requests.
5 */
6
7
8/**
9 * @param bulkExec closure invoked with the bulk cmds as an array and a callback
10 * @param highWaterMark number of bulk commands executed at once. 128 by default.
11 */
12
13(function() {
14 var Writable, WritableBulk;
15
16 WritableBulk = function(bulkExec, highWaterMark) {
17 if (!(this instanceof WritableBulk)) {
18 return new WritableBulk(bulkExec, highWaterMark);
19 }
20 Writable.call(this, {
21 objectMode: true
22 });
23 this.bulkExec = bulkExec;
24 this.highWaterMark = highWaterMark || 128;
25 this.bulk = [];
26 this.bulkCount = 0;
27 this.expectingPayload = false;
28 this.on('finish', (function() {
29 this._flushBulk((function() {
30 this.emit('close');
31 }).bind(this));
32 }).bind(this));
33 };
34
35 'use strict';
36
37 Writable = require('stream').Writable;
38
39 module.exports = WritableBulk;
40
41 WritableBulk.prototype = Object.create(Writable.prototype, {
42 constructor: {
43 value: WritableBulk
44 }
45 });
46
47
48 /**
49 * @param chunk a piece of a bulk request as json.
50 */
51
52 WritableBulk.prototype._write = function(chunk, enc, next) {
53 var i, willExpectPayload;
54 if (this.expectingPayload) {
55 this.bulkCount++;
56 this.expectingPayload = false;
57 } else {
58 willExpectPayload = ['index', 'create', 'update'];
59 i = 0;
60 while (i < willExpectPayload.length) {
61 if (chunk.hasOwnProperty(willExpectPayload[i])) {
62 this.expectingPayload = willExpectPayload[i];
63 break;
64 }
65 i++;
66 }
67 if (!this.expectingPayload) {
68 if (!chunk.hasOwnProperty('delete') && !chunk.hasOwnProperty('alias') && !chunk.hasOwnProperty('mapping') && !chunk.hasOwnProperty('settings') && !chunk.hasOwnProperty('template')) {
69 this.emit('error', new Error('Unexpected chunk, not an ' + 'index/create/update/delete/alias/mapping/settings/template command and ' + 'not a document to index either'));
70 return next();
71 }
72 this.bulkCount++;
73 }
74 }
75 this.bulk.push(chunk);
76 if (this.highWaterMark <= this.bulkCount) {
77 return this._flushBulk(next);
78 }
79 next();
80 };
81
82 WritableBulk.prototype._flushBulk = function(callback) {
83 var self;
84 if (!this.bulkCount) {
85 return setImmediate(callback);
86 }
87 self = this;
88 this.bulkExec(this.bulk, function(e, resp) {
89 var bulkItemResp, i, key;
90 if (e) {
91 self.emit('error', e);
92 }
93 if (resp.errors && resp.items) {
94 i = 0;
95 while (i < resp.items.length) {
96 bulkItemResp = resp.items[i];
97 key = Object.keys(bulkItemResp)[0];
98 if (bulkItemResp[key].error) {
99 self.emit('error', new Error(bulkItemResp[key].error));
100 }
101 i++;
102 }
103 }
104 self.bulk = [];
105 self.bulkCount = 0;
106 self.expectingPayload = false;
107 callback();
108 });
109 };
110
111}).call(this);
112
113//# sourceMappingURL=writable-bulk.js.map