1 |
|
2 |
|
3 | const expect = require('chai').expect;
|
4 | const Promise = require('bluebird');
|
5 | const TestConfig = require('../config');
|
6 | const Utils = require('../utils');
|
7 | const Subtask = require('../../app/models/subtask');
|
8 | const Subtasks = require('../../app/services/subtasks');
|
9 | const Transfer = require('../../app/services/transfer');
|
10 | const createEsClient = require('../../config/elasticsearch');
|
11 | const createRedisClient = require('../../config/redis');
|
12 | const config = require('../../config/index');
|
13 | const to_bytes = require('../../config/utils').to_bytes;
|
14 |
|
15 | const log = config.log;
|
16 |
|
17 | Promise.longStackTraces();
|
18 | Promise.onPossiblyUnhandledRejection((error) => log.error('Likely error: ', error.stack));
|
19 |
|
20 | describe('subtasks service', function () {
|
21 | this.timeout(100000);
|
22 |
|
23 | let source = null;
|
24 | let redis = null;
|
25 | let subtasks = null;
|
26 | let utils = null;
|
27 |
|
28 | before((done) => {
|
29 | source = createEsClient(TestConfig.elasticsearch.source);
|
30 | redis = createRedisClient(TestConfig.redis.host, TestConfig.redis.port);
|
31 | subtasks = new Subtasks(redis);
|
32 | utils = new Utils();
|
33 |
|
34 | utils.deleteAllTemplates(source)
|
35 | .finally(() => utils.deleteAllIndices(source))
|
36 | .finally(() => redis.flushdb())
|
37 | .finally(() => done());
|
38 | });
|
39 |
|
40 | afterEach((done) => {
|
41 | utils.deleteAllTemplates(source)
|
42 | .finally(() => utils.deleteAllIndices(source))
|
43 | .finally(() => redis.flushdb())
|
44 | .finally(() => done());
|
45 | });
|
46 |
|
47 | const TEST_INDEX = 'myindex1';
|
48 | const TEST_TYPE = 'mytype1';
|
49 | const fakeTask = {
|
50 | source: TestConfig.elasticsearch.source,
|
51 | destination: TestConfig.elasticsearch.destination,
|
52 | transfer: {
|
53 | documents: {
|
54 | fromIndices: TEST_INDEX
|
55 | }
|
56 | }
|
57 | };
|
58 | const loadedFilters = {};
|
59 | const createIndex = (shards) => source.indices.create({
|
60 | index: TEST_INDEX,
|
61 | body: {
|
62 | settings: {
|
63 | index: {
|
64 | number_of_shards: shards,
|
65 | number_of_replicas: 0
|
66 | }
|
67 | },
|
68 | mappings: {[TEST_TYPE]: {_size: {enabled: true}}}
|
69 | }
|
70 | });
|
71 |
|
72 | const upload = (maxIterations, chunkSize, minSize, maxSize) => {
|
73 | const buildBody = () => {
|
74 | const body = [];
|
75 | for (let i = 0; i < chunkSize; i++) {
|
76 | body.push({index: {_index: TEST_INDEX, _type: TEST_TYPE}});
|
77 | body.push({size: minSize + Math.floor(Math.random() * (maxSize - minSize))});
|
78 | }
|
79 | return body;
|
80 | };
|
81 | const uploadInternal = (index) => index >= maxIterations
|
82 | ? null
|
83 | : source.bulk({refresh: true, body: buildBody()})
|
84 | .then(() => source.indices.refresh({index: TEST_INDEX}))
|
85 | .then(() => uploadInternal(index + 1));
|
86 |
|
87 | return uploadInternal(0);
|
88 | };
|
89 | const uploadExact = (chunkSize, size) => {
|
90 | const buildBody = () => {
|
91 | const body = [];
|
92 | for (let i = 0; i < chunkSize; i++) {
|
93 | body.push({index: {_index: TEST_INDEX, _type: TEST_TYPE}});
|
94 | body.push({size});
|
95 | }
|
96 | return body;
|
97 | };
|
98 | return source.bulk({refresh: true, body: buildBody()})
|
99 | .then(() => source.indices.refresh({index: TEST_INDEX}));
|
100 | };
|
101 |
|
102 | const assertSubtask = (subtask, flushSize, minSize, maxSize) => {
|
103 | expect(subtask.source).to.be.equals(fakeTask.source);
|
104 | expect(subtask.destination).to.be.equals(fakeTask.destination);
|
105 | expect(subtask.mutators).to.be.equals(fakeTask.mutators);
|
106 | expect(subtask.transfer.flushSize).to.be.equals(flushSize);
|
107 | expect(subtask.transfer.documents.index).to.be.equals(TEST_INDEX);
|
108 | expect(subtask.transfer.documents.type).to.be.equals(TEST_TYPE);
|
109 | expect(subtask.transfer.documents.minSize).to.be.equals(minSize);
|
110 | expect(subtask.transfer.documents.maxSize).to.be.equals(maxSize);
|
111 |
|
112 | };
|
113 |
|
114 | it('pick bounds - no records', (done) => {
|
115 | createIndex(1)
|
116 | .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
|
117 | .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
|
118 | .then((actual) => {
|
119 | expect(actual.length).to.be.equals(1);
|
120 | assertSubtask(actual[0], Subtask.DEFAULT_FLUSH_SIZE, -1, -1);
|
121 | })
|
122 | .then(() => done())
|
123 | .catch(done);
|
124 | });
|
125 |
|
126 | it('pick bounds - all records same size', (done) => {
|
127 | createIndex(1)
|
128 | .then(() => uploadExact(100, to_bytes(100, 'B')))
|
129 | .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
|
130 | .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
|
131 | .then((actual) => {
|
132 | expect(actual.length).to.be.equals(1);
|
133 | assertSubtask(actual[0], 524288, -1, -1);
|
134 | })
|
135 | .then(() => done())
|
136 | .catch(done);
|
137 | });
|
138 |
|
139 | it('pick bounds - nothing above 1KB', (done) => {
|
140 | createIndex(1)
|
141 | .then(() => upload(10, 2000, to_bytes(1, 'B'), to_bytes(1, 'KB')))
|
142 | .then(() => uploadExact(1, to_bytes(1, 'KB')))
|
143 | .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
|
144 | .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
|
145 | .then((actual) => {
|
146 | expect(actual.length).to.be.equals(3);
|
147 | assertSubtask(actual[0], 85667, to_bytes(0, 'B'), to_bytes(613, 'B'));
|
148 | assertSubtask(actual[1], 57111, to_bytes(613, 'B'), to_bytes(919, 'B'));
|
149 | assertSubtask(actual[2], 51200, to_bytes(919, 'B'), to_bytes(1, 'KB') + 1);
|
150 | })
|
151 | .then(() => done())
|
152 | .catch(done);
|
153 | });
|
154 |
|
155 | it('pick bounds - nothing above 10KB', (done) => {
|
156 | createIndex(1)
|
157 | .then(() => upload(10, 1000, to_bytes(1, 'B'), to_bytes(1, 'KB')))
|
158 | .then(() => upload(10, 100, to_bytes(7, 'KB'), to_bytes(8, 'KB')))
|
159 | .then(() => upload(10, 1000, to_bytes(9.2, 'KB'), to_bytes(10, 'KB')))
|
160 | .then(() => uploadExact(1, to_bytes(6, 'KB')))
|
161 | .then(() => uploadExact(1, to_bytes(9, 'KB')))
|
162 | .then(() => uploadExact(1, to_bytes(10, 'KB')))
|
163 | .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
|
164 | .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
|
165 | .then((actual) => {
|
166 | expect(actual.length).to.be.equals(3);
|
167 | assertSubtask(actual[0], 8533, 0, 6145);
|
168 | assertSubtask(actual[1], 5688, 6145, 9217);
|
169 | assertSubtask(actual[2], 5120, 9217, to_bytes(10, 'KB') + 1);
|
170 | })
|
171 | .then(() => done())
|
172 | .catch(done);
|
173 | });
|
174 |
|
175 | it('pick bounds - nothing above 10KB and 4 shards', (done) => {
|
176 | createIndex(4)
|
177 | .then(() => upload(10, 1000, to_bytes(1, 'B'), to_bytes(1, 'KB')))
|
178 | .then(() => upload(10, 100, to_bytes(7, 'KB'), to_bytes(8, 'KB')))
|
179 | .then(() => upload(10, 1000, to_bytes(9.2, 'KB'), to_bytes(10, 'KB')))
|
180 | .then(() => uploadExact(1, to_bytes(6, 'KB')))
|
181 | .then(() => uploadExact(1, to_bytes(9, 'KB')))
|
182 | .then(() => uploadExact(1, to_bytes(10, 'KB')))
|
183 | .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
|
184 | .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
|
185 | .then((actual) => {
|
186 | expect(actual.length).to.be.equals(3);
|
187 | assertSubtask(actual[0], 2133, 0, 6145);
|
188 | assertSubtask(actual[1], 1422, 6145, 9217);
|
189 | assertSubtask(actual[2], 1280, 9217, to_bytes(10, 'KB') + 1);
|
190 | })
|
191 | .then(() => done())
|
192 | .catch(done);
|
193 | });
|
194 |
|
195 | it('pick bounds - nothing above 50MB', (done) => {
|
196 | createIndex(1)
|
197 | .then(() => upload(10, 1000, to_bytes(20, 'B'), to_bytes(10, 'KB')))
|
198 | .then(() => upload(50, 5000, to_bytes(10, 'KB'), to_bytes(200, 'KB')))
|
199 | .then(() => upload(1, 100, to_bytes(20, 'MB'), to_bytes(50, 'MB')))
|
200 | .then(() => uploadExact(1, to_bytes(50, 'MB')))
|
201 | .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
|
202 | .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
|
203 | .then((actual) => {
|
204 | expect(actual.length).to.be.equals(2);
|
205 | assertSubtask(actual[0], 100, 0, 524288);
|
206 | assertSubtask(actual[1], 1, 1048576, to_bytes(50, 'MB') + 1);
|
207 | })
|
208 | .then(() => done())
|
209 | .catch(done);
|
210 | });
|
211 |
|
212 | it('pick bounds - nothing above 50MB', (done) => {
|
213 | createIndex(1)
|
214 | .then(() => upload(10, 1000, to_bytes(20, 'B'), to_bytes(10, 'KB')))
|
215 | .then(() => upload(50, 5000, to_bytes(10, 'KB'), to_bytes(200, 'KB')))
|
216 | .then(() => upload(1, 100, to_bytes(20, 'MB'), to_bytes(500, 'MB')))
|
217 | .then(() => uploadExact(1, to_bytes(500, 'MB')))
|
218 | .then(() => Transfer.getIndices(source, fakeTask.transfer.documents.fromIndices))
|
219 | .then((allIndices) => subtasks.filterDocumentSubtasks(fakeTask, allIndices, loadedFilters, 'size'))
|
220 | .then((actual) => {
|
221 | expect(actual.length).to.be.equals(2);
|
222 | assertSubtask(actual[0], 100, 0, 524288);
|
223 | assertSubtask(actual[1], 1, 1048576, to_bytes(500, 'MB') + 1);
|
224 | })
|
225 | .then(() => done())
|
226 | .catch(done);
|
227 | });
|
228 | });
|