UNPKG

9.11 kBJavaScriptView Raw
1/*eslint no-magic-numbers: "off"*/
2/*eslint no-invalid-this: "off"*/
3const expect = require('chai').expect;
4const Promise = require('bluebird');
5const TestConfig = require('../config');
6const Utils = require('../utils');
7const Subtask = require('../../app/models/subtask');
8const Subtasks = require('../../app/services/subtasks');
9const Transfer = require('../../app/services/transfer');
10const createEsClient = require('../../config/elasticsearch');
11const createRedisClient = require('../../config/redis');
12const config = require('../../config/index');
13const to_bytes = require('../../config/utils').to_bytes;
14
15const log = config.log;
16
17Promise.longStackTraces();
18Promise.onPossiblyUnhandledRejection((error) => log.error('Likely error: ', error.stack));
19
20describe('subtasks service', function () {
21 this.timeout(100000);
22
23 let source = null;
24 let redis = null;
25 let subtasks = null;
26 let utils = null;
27
28 before((done) => {
29 source = createEsClient(TestConfig.elasticsearch.source);
30 redis = createRedisClient(TestConfig.redis.host, TestConfig.redis.port);
31 subtasks = new Subtasks(redis);
32 utils = new Utils();
33
34 utils.deleteAllTemplates(source)
35 .finally(() => utils.deleteAllIndices(source))
36 .finally(() => redis.flushdb())
37 .finally(() => done());
38 });
39
40 afterEach((done) => {
41 utils.deleteAllTemplates(source)
42 .finally(() => utils.deleteAllIndices(source))
43 .finally(() => redis.flushdb())
44 .finally(() => done());
45 });
46
47 const TEST_INDEX = 'myindex1';
48 const TEST_TYPE = 'mytype1';
49 const fakeTask = {
50 source: TestConfig.elasticsearch.source,
51 destination: TestConfig.elasticsearch.destination,
52 transfer: {
53 documents: {
54 fromIndices: TEST_INDEX
55 }
56 }
57 };
58 const loadedFilters = {};
59 const createIndex = (shards) => source.indices.create({
60 index: TEST_INDEX,
61 body: {
62 settings: {
63 index: {
64 number_of_shards: shards,
65 number_of_replicas: 0
66 }
67 },
68 mappings: {[TEST_TYPE]: {_size: {enabled: true}}}
69 }
70 });
71
72 const upload = (maxIterations, chunkSize, minSize, maxSize) => {
73 const buildBody = () => {
74 const body = [];
75 for (let i = 0; i < chunkSize; i++) {
76 body.push({index: {_index: TEST_INDEX, _type: TEST_TYPE}});
77 body.push({size: minSize + Math.floor(Math.random() * (maxSize - minSize))});
78 }
79 return body;
80 };
81 const uploadInternal = (index) => index >= maxIterations
82 ? null
83 : source.bulk({refresh: true, body: buildBody()})
84 .then(() => source.indices.refresh({index: TEST_INDEX}))
85 .then(() => uploadInternal(index + 1));
86
87 return uploadInternal(0);
88 };
89 const uploadExact = (chunkSize, size) => {
90 const buildBody = () => {
91 const body = [];
92 for (let i = 0; i < chunkSize; i++) {
93 body.push({index: {_index: TEST_INDEX, _type: TEST_TYPE}});
94 body.push({size});
95 }
96 return body;
97 };
98 return source.bulk({refresh: true, body: buildBody()})
99 .then(() => source.indices.refresh({index: TEST_INDEX}));
100 };
101
102 const assertSubtask = (subtask, flushSize, minSize, maxSize) => {
103 expect(subtask.source).to.be.equals(fakeTask.source);
104 expect(subtask.destination).to.be.equals(fakeTask.destination);
105 expect(subtask.mutators).to.be.equals(fakeTask.mutators);
106 expect(subtask.transfer.flushSize).to.be.equals(flushSize);
107 expect(subtask.transfer.documents.index).to.be.equals(TEST_INDEX);
108 expect(subtask.transfer.documents.type).to.be.equals(TEST_TYPE);
109 expect(subtask.transfer.documents.minSize).to.be.equals(minSize);
110 expect(subtask.transfer.documents.maxSize).to.be.equals(maxSize);
111
112 };
113
114 it('pick bounds - no records', (done) => {
115 createIndex(1)
116 .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
117 .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
118 .then((actual) => {
119 expect(actual.length).to.be.equals(1);
120 assertSubtask(actual[0], Subtask.DEFAULT_FLUSH_SIZE, -1, -1);
121 })
122 .then(() => done())
123 .catch(done);
124 });
125
126 it('pick bounds - all records same size', (done) => {
127 createIndex(1)
128 .then(() => uploadExact(100, to_bytes(100, 'B')))
129 .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
130 .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
131 .then((actual) => {
132 expect(actual.length).to.be.equals(1);
133 assertSubtask(actual[0], 524288, -1, -1);
134 })
135 .then(() => done())
136 .catch(done);
137 });
138
139 it('pick bounds - nothing above 1KB', (done) => {
140 createIndex(1)
141 .then(() => upload(10, 2000, to_bytes(1, 'B'), to_bytes(1, 'KB')))
142 .then(() => uploadExact(1, to_bytes(1, 'KB')))
143 .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
144 .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
145 .then((actual) => {
146 expect(actual.length).to.be.equals(3);
147 assertSubtask(actual[0], 85667, to_bytes(0, 'B'), to_bytes(613, 'B'));
148 assertSubtask(actual[1], 57111, to_bytes(613, 'B'), to_bytes(919, 'B'));
149 assertSubtask(actual[2], 51200, to_bytes(919, 'B'), to_bytes(1, 'KB') + 1);
150 })
151 .then(() => done())
152 .catch(done);
153 });
154
155 it('pick bounds - nothing above 10KB', (done) => {
156 createIndex(1)
157 .then(() => upload(10, 1000, to_bytes(1, 'B'), to_bytes(1, 'KB')))
158 .then(() => upload(10, 100, to_bytes(7, 'KB'), to_bytes(8, 'KB')))
159 .then(() => upload(10, 1000, to_bytes(9.2, 'KB'), to_bytes(10, 'KB')))
160 .then(() => uploadExact(1, to_bytes(6, 'KB')))
161 .then(() => uploadExact(1, to_bytes(9, 'KB')))
162 .then(() => uploadExact(1, to_bytes(10, 'KB')))
163 .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
164 .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
165 .then((actual) => {
166 expect(actual.length).to.be.equals(3);
167 assertSubtask(actual[0], 8533, 0, 6145);
168 assertSubtask(actual[1], 5688, 6145, 9217);
169 assertSubtask(actual[2], 5120, 9217, to_bytes(10, 'KB') + 1);
170 })
171 .then(() => done())
172 .catch(done);
173 });
174
175 it('pick bounds - nothing above 10KB and 4 shards', (done) => {
176 createIndex(4)
177 .then(() => upload(10, 1000, to_bytes(1, 'B'), to_bytes(1, 'KB')))
178 .then(() => upload(10, 100, to_bytes(7, 'KB'), to_bytes(8, 'KB')))
179 .then(() => upload(10, 1000, to_bytes(9.2, 'KB'), to_bytes(10, 'KB')))
180 .then(() => uploadExact(1, to_bytes(6, 'KB')))
181 .then(() => uploadExact(1, to_bytes(9, 'KB')))
182 .then(() => uploadExact(1, to_bytes(10, 'KB')))
183 .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
184 .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
185 .then((actual) => {
186 expect(actual.length).to.be.equals(3);
187 assertSubtask(actual[0], 2133, 0, 6145);
188 assertSubtask(actual[1], 1422, 6145, 9217);
189 assertSubtask(actual[2], 1280, 9217, to_bytes(10, 'KB') + 1);
190 })
191 .then(() => done())
192 .catch(done);
193 });
194
195 it('pick bounds - nothing above 50MB', (done) => {
196 createIndex(1)
197 .then(() => upload(10, 1000, to_bytes(20, 'B'), to_bytes(10, 'KB')))
198 .then(() => upload(50, 5000, to_bytes(10, 'KB'), to_bytes(200, 'KB')))
199 .then(() => upload(1, 100, to_bytes(20, 'MB'), to_bytes(50, 'MB')))
200 .then(() => uploadExact(1, to_bytes(50, 'MB')))
201 .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
202 .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
203 .then((actual) => {
204 expect(actual.length).to.be.equals(2);
205 assertSubtask(actual[0], 100, 0, 524288);
206 assertSubtask(actual[1], 1, 1048576, to_bytes(50, 'MB') + 1);
207 })
208 .then(() => done())
209 .catch(done);
210 });
211
212 it('pick bounds - nothing above 50MB', (done) => {
213 createIndex(1)
214 .then(() => upload(10, 1000, to_bytes(20, 'B'), to_bytes(10, 'KB')))
215 .then(() => upload(50, 5000, to_bytes(10, 'KB'), to_bytes(200, 'KB')))
216 .then(() => upload(1, 100, to_bytes(20, 'MB'), to_bytes(500, 'MB')))
217 .then(() => uploadExact(1, to_bytes(500, 'MB')))
218 .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
219 .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
220 .then((actual) => {
221 expect(actual.length).to.be.equals(2);
222 assertSubtask(actual[0], 100, 0, 524288);
223 assertSubtask(actual[1], 1, 1048576, to_bytes(500, 'MB') + 1);
224 })
225 .then(() => done())
226 .catch(done);
227 });
228});