UNPKG

31.7 kBJavaScriptView Raw
1/*eslint no-magic-numbers: "off"*/
2/*eslint no-invalid-this: "off"*/
3const _ = require('lodash');
4const expect = require('chai').expect;
5const Promise = require('bluebird');
6const TestConfig = require('../config');
7const Utils = require('../utils');
8const Subtasks = require('../../app/services/subtasks');
9const Tasks = require('../../app/services/tasks');
10const createEsClient = require('../../config/elasticsearch');
11const createRedisClient = require('../../config/redis');
12const config = require('../../config/index');
13
14const log = config.log;
15
16Promise.longStackTraces();
17Promise.onPossiblyUnhandledRejection((error) => log.error('Likely error: ', error.stack));
18
19const TASK_NAME = 'expectedTask';
20const MOCK_ALL_INDICES = [
21 {
22 name: 'index_number_1',
23 aliases: {},
24 mappings: {
25 newtype_2: {
26 properties: {
27 something: {
28 type: 'string'
29 }
30 }
31 },
32 newtype: {
33 properties: {
34 something2: {
35 type: 'string'
36 }
37 }
38 },
39 oldtype: {
40 properties: {
41 something3: {
42 type: 'string'
43 }
44 }
45 }
46 },
47 settings: {
48 index: {
49 creation_date: 1461452305587,
50 number_of_shards: 5,
51 number_of_replicas: 1,
52 uuid: 'wTr18aKmRKWqV80B62ZEJA',
53 version: {
54 created: 2020299
55 }
56 }
57 },
58 warmers: {}
59 },
60 {
61 name: 'index_number_2',
62 aliases: {},
63 mappings: {
64 newtype_3: {
65 properties: {
66 something: {
67 type: 'string'
68 }
69 }
70 },
71 newtype: {
72 properties: {
73 something: {
74 type: 'string'
75 }
76 }
77 }
78 },
79 settings: {
80 index: {
81 creation_date: 1461452305587,
82 number_of_shards: 5,
83 number_of_replicas: 1,
84 uuid: 'wTr18aKmRKWqV80B62ZEJA',
85 version: {
86 created: 2020299
87 }
88 }
89 },
90 warmers: {}
91 }
92];
93
94describe('subtasks service', function () {
95 this.timeout(5000);
96
97 let source = null;
98 let redis = null;
99 let tasks = null;
100 let subtasks = null;
101 let utils = null;
102
103 before((done) => {
104 source = createEsClient(TestConfig.elasticsearch.source);
105 redis = createRedisClient(TestConfig.redis.host, TestConfig.redis.port);
106 tasks = new Tasks(redis);
107 subtasks = new Subtasks(redis);
108 utils = new Utils();
109
110 utils.deleteAllTemplates(source)
111 .finally(() => utils.deleteAllIndices(source))
112 .finally(() => redis.flushdb())
113 .finally(() => done());
114 });
115
116 afterEach((done) => {
117 utils.deleteAllTemplates(source)
118 .finally(() => utils.deleteAllIndices(source))
119 .finally(() => redis.flushdb())
120 .finally(() => done());
121 });
122
123 it('should get subtasks in the same order they were added', (done) => {
124 const expected = [
125 {
126 source: TestConfig.elasticsearch.source,
127 destination: TestConfig.elasticsearch.destination,
128 transfer: {
129 documents: {
130 index: 'myindex1',
131 type: 'mytype1'
132 }
133 },
134 count: 10
135 },
136 {
137 source: TestConfig.elasticsearch.source,
138 destination: TestConfig.elasticsearch.destination,
139 transfer: {
140 documents: {
141 index: 'myindex2',
142 type: 'mytype1'
143 }
144 },
145 count: 20
146 },
147 {
148 source: TestConfig.elasticsearch.source,
149 destination: TestConfig.elasticsearch.destination,
150 transfer: {
151 documents: {
152 index: 'myindex1',
153 type: 'mytype4'
154 }
155 },
156 count: 1
157 }
158 ];
159
160 subtasks.queue(TASK_NAME, expected)
161 .then(() => subtasks.getTotal(TASK_NAME))
162 .then((total) => expect(total).to.be.equals(31))
163 .then(() => subtasks.fetch(TASK_NAME))
164 .then((subtask) => {
165 expect(subtask.transfer.documents.index).to.be.equals(expected[0].transfer.documents.index);
166 expect(subtask.transfer.documents.type).to.be.equals(expected[0].transfer.documents.type);
167 expect(subtask.count).to.be.equals(expected[0].count);
168 })
169 .then(() => subtasks.fetch(TASK_NAME))
170 .then((subtask) => {
171 expect(subtask.transfer.documents.index).to.be.equals(expected[1].transfer.documents.index);
172 expect(subtask.transfer.documents.type).to.be.equals(expected[1].transfer.documents.type);
173 expect(subtask.count).to.be.equals(expected[1].count);
174 })
175 .then(() => subtasks.fetch(TASK_NAME))
176 .then((subtask) => {
177 expect(subtask.transfer.documents.index).to.be.equals(expected[2].transfer.documents.index);
178 expect(subtask.transfer.documents.type).to.be.equals(expected[2].transfer.documents.type);
179 expect(subtask.count).to.be.equals(expected[2].count);
180 })
181 .then(() => subtasks.fetch(TASK_NAME))
182 .then((subtask) => expect(subtask).to.be.null)
183 .then(() => done())
184 .catch(done);
185 });
186
187 it('should not add the same subtask twice', (done) => {
188 const expected = [
189 {
190 source: TestConfig.elasticsearch.source,
191 destination: TestConfig.elasticsearch.destination,
192 transfer: {
193 documents: {
194 index: 'myindex1',
195 type: 'mytype1'
196 }
197 },
198 count: 22
199 },
200 {
201 source: TestConfig.elasticsearch.source,
202 destination: TestConfig.elasticsearch.destination,
203 transfer: {
204 documents: {
205 index: 'myindex1',
206 type: 'mytype1'
207 }
208 },
209 count: 22
210 }
211 ];
212
213 subtasks.queue(TASK_NAME, expected)
214 .then(() => subtasks.getTotal(TASK_NAME))
215 .then((total) => expect(total).to.be.equals(22))
216 .then(() => subtasks.fetch(TASK_NAME))
217 .then((subtask) => {
218 expect(subtask.transfer.documents.index).to.be.equals(expected[0].transfer.documents.index);
219 expect(subtask.transfer.documents.type).to.be.equals(expected[0].transfer.documents.type);
220 expect(subtask.count).to.be.equals(expected[0].count);
221 })
222 .then(() => subtasks.fetch(TASK_NAME))
223 .then((subtask) => expect(subtask).to.be.null)
224 .then(() => done())
225 .catch(done);
226 });
227
228 it('should get all completed subtasks', (done) => {
229 const expected = [
230 {
231 source: TestConfig.elasticsearch.source,
232 destination: TestConfig.elasticsearch.destination,
233 transfer: {
234 documents: {
235 index: 'myindex1',
236 type: 'mytype1'
237 }
238 },
239 count: 10
240 },
241 {
242 source: TestConfig.elasticsearch.source,
243 destination: TestConfig.elasticsearch.destination,
244 transfer: {
245 documents: {
246 index: 'myindex2',
247 type: 'mytype1'
248 }
249 },
250 count: 20
251 },
252 {
253 source: TestConfig.elasticsearch.source,
254 destination: TestConfig.elasticsearch.destination,
255 transfer: {
256 documents: {
257 index: 'myindex1',
258 type: 'mytype4'
259 }
260 },
261 count: 1
262 }
263 ];
264
265 Promise.each(expected, (subtask) => subtasks.complete(TASK_NAME, subtask))
266 .then(() => subtasks.getCompleted(TASK_NAME))
267 .then((completedSubtasks) => {
268 let target = _.find(completedSubtasks, {count: expected[0].count});
269 expect(target.transfer.documents.index).to.be.equals(expected[0].transfer.documents.index);
270 expect(target.transfer.documents.type).to.be.equals(expected[0].transfer.documents.type);
271 expect(target.count).to.be.equals(expected[0].count);
272
273 target = _.find(completedSubtasks, {count: expected[1].count});
274 expect(target.transfer.documents.index).to.be.equals(expected[1].transfer.documents.index);
275 expect(target.transfer.documents.type).to.be.equals(expected[1].transfer.documents.type);
276 expect(target.count).to.be.equals(expected[1].count);
277
278 target = _.find(completedSubtasks, {count: expected[2].count});
279 expect(target.transfer.documents.index).to.be.equals(expected[2].transfer.documents.index);
280 expect(target.transfer.documents.type).to.be.equals(expected[2].transfer.documents.type);
281 expect(target.count).to.be.equals(expected[2].count);
282 })
283 .then(() => done())
284 .catch(done);
285 });
286
287 it('should get completed subtask count', (done) => {
288 const expected = [
289 {
290 source: TestConfig.elasticsearch.source,
291 destination: TestConfig.elasticsearch.destination,
292 transfer: {
293 documents: {
294 index: 'myindex1',
295 type: 'mytype1'
296 }
297 },
298 count: 10
299 },
300 {
301 source: TestConfig.elasticsearch.source,
302 destination: TestConfig.elasticsearch.destination,
303 transfer: {
304 documents: {
305 index: 'myindex2',
306 type: 'mytype1'
307 }
308 },
309 count: 20
310 },
311 {
312 source: TestConfig.elasticsearch.source,
313 destination: TestConfig.elasticsearch.destination,
314 transfer: {
315 documents: {
316 index: 'myindex1',
317 type: 'mytype4'
318 }
319 },
320 count: 1
321 }
322 ];
323
324 Promise.each(expected, (subtask) => subtasks.complete(TASK_NAME, subtask))
325 .then(() => subtasks.countCompleted(TASK_NAME))
326 .then((completedCount) => expect(completedCount).to.be.equals(31))
327 .then(() => done())
328 .catch(done);
329 });
330
331 it('should clear completed subtasks', (done) => {
332 const expected = [
333 {
334 source: TestConfig.elasticsearch.source,
335 destination: TestConfig.elasticsearch.destination,
336 transfer: {
337 documents: {
338 index: 'myindex1',
339 type: 'mytype1'
340 }
341 },
342 count: 10
343 },
344 {
345 source: TestConfig.elasticsearch.source,
346 destination: TestConfig.elasticsearch.destination,
347 transfer: {
348 documents: {
349 index: 'myindex2',
350 type: 'mytype1'
351 }
352 },
353 count: 20
354 },
355 {
356 source: TestConfig.elasticsearch.source,
357 destination: TestConfig.elasticsearch.destination,
358 transfer: {
359 documents: {
360 index: 'myindex1',
361 type: 'mytype4'
362 }
363 },
364 count: 1
365 }
366 ];
367
368 Promise.each(expected, (subtask) => subtasks.complete(TASK_NAME, subtask))
369 .then(() => subtasks.clearCompleted(TASK_NAME))
370 .then(() => subtasks.countCompleted(TASK_NAME))
371 .then((completedCount) => expect(completedCount).to.be.equals(0))
372 .then(() => done())
373 .catch(done);
374 });
375
376 it('should return an empty array when there are no completed subtasks', (done) => {
377 subtasks.getCompleted(TASK_NAME)
378 .then((completedSubtasks) => expect(completedSubtasks).to.be.empty)
379 .then(() => done())
380 .catch(done);
381 });
382
383 it('should return an empty array when there are no backlog subtasks', (done) => {
384 subtasks.getBacklog(TASK_NAME)
385 .then((backlogSubtasks) => expect(backlogSubtasks).to.be.empty)
386 .then(() => done())
387 .catch(done);
388 });
389
390 it('should return all backlog subtasks', (done) => {
391 const expected = [
392 {
393 source: TestConfig.elasticsearch.source,
394 destination: TestConfig.elasticsearch.destination,
395 transfer: {
396 documents: {
397 index: 'myindex1',
398 type: 'mytype1'
399 }
400 },
401 count: 10
402 },
403 {
404 source: TestConfig.elasticsearch.source,
405 destination: TestConfig.elasticsearch.destination,
406 transfer: {
407 documents: {
408 index: 'myindex2',
409 type: 'mytype1'
410 }
411 },
412 count: 20
413 },
414 {
415 source: TestConfig.elasticsearch.source,
416 destination: TestConfig.elasticsearch.destination,
417 transfer: {
418 documents: {
419 index: 'myindex1',
420 type: 'mytype4'
421 }
422 },
423 count: 1
424 }
425 ];
426
427 subtasks.queue(TASK_NAME, expected)
428 .then(() => subtasks.getTotal(TASK_NAME))
429 .then((total) => expect(total).to.be.equals(31))
430 .then(() => subtasks.getBacklog(TASK_NAME))
431 .then((backlogSubtasks) => {
432 let target = _.find(backlogSubtasks, {count: expected[0].count});
433 expect(target.transfer.documents.index).to.be.equals(expected[0].transfer.documents.index);
434 expect(target.transfer.documents.type).to.be.equals(expected[0].transfer.documents.type);
435 expect(target.count).to.be.equals(expected[0].count);
436
437 target = _.find(backlogSubtasks, {count: expected[1].count});
438 expect(target.transfer.documents.index).to.be.equals(expected[1].transfer.documents.index);
439 expect(target.transfer.documents.type).to.be.equals(expected[1].transfer.documents.type);
440 expect(target.count).to.be.equals(expected[1].count);
441
442 target = _.find(backlogSubtasks, {count: expected[2].count});
443 expect(target.transfer.documents.index).to.be.equals(expected[2].transfer.documents.index);
444 expect(target.transfer.documents.type).to.be.equals(expected[2].transfer.documents.type);
445 expect(target.count).to.be.equals(expected[2].count);
446 })
447 .then(() => done())
448 .catch(done);
449 });
450
451 it('should clear all backlog subtasks', (done) => {
452 const expected = [
453 {
454 source: TestConfig.elasticsearch.source,
455 destination: TestConfig.elasticsearch.destination,
456 transfer: {
457 documents: {
458 index: 'myindex1',
459 type: 'mytype1'
460 }
461 },
462 count: 10
463 },
464 {
465 source: TestConfig.elasticsearch.source,
466 destination: TestConfig.elasticsearch.destination,
467 transfer: {
468 documents: {
469 index: 'myindex2',
470 type: 'mytype1'
471 }
472 },
473 count: 20
474 },
475 {
476 source: TestConfig.elasticsearch.source,
477 destination: TestConfig.elasticsearch.destination,
478 transfer: {
479 documents: {
480 index: 'myindex1',
481 type: 'mytype4'
482 }
483 },
484 count: 1
485 }
486 ];
487
488 subtasks.queue(TASK_NAME, expected)
489 .then(() => subtasks.getTotal(TASK_NAME))
490 .then((total) => expect(total).to.be.equals(31))
491 .then(() => subtasks.clearBacklog(TASK_NAME))
492 .then(() => subtasks.getBacklog(TASK_NAME))
493 .then((backlogSubtasks) => expect(backlogSubtasks).to.be.empty)
494 .then(() => done())
495 .catch(done);
496 });
497
498 it('should return count of zero for empty completed', (done) => {
499 subtasks.countCompleted(TASK_NAME)
500 .then((count) => expect(count).to.be.equals(0))
501 .then(() => done())
502 .catch(done);
503 });
504
505 it('should return count of zero for empty backlog', (done) => {
506 subtasks.countBacklog(TASK_NAME)
507 .then((count) => expect(count).to.be.equals(0))
508 .then(() => done())
509 .catch(done);
510 });
511
512 it('should return total count of subtasks in backlog', (done) => {
513 const expected = [
514 {
515 source: TestConfig.elasticsearch.source,
516 destination: TestConfig.elasticsearch.destination,
517 transfer: {
518 documents: {
519 index: 'myindex1',
520 type: 'mytype1'
521 }
522 },
523 count: 10
524 },
525 {
526 source: TestConfig.elasticsearch.source,
527 destination: TestConfig.elasticsearch.destination,
528 transfer: {
529 documents: {
530 index: 'myindex2',
531 type: 'mytype1'
532 }
533 },
534 count: 20
535 },
536 {
537 source: TestConfig.elasticsearch.source,
538 destination: TestConfig.elasticsearch.destination,
539 transfer: {
540 documents: {
541 index: 'myindex1',
542 type: 'mytype4'
543 }
544 },
545 count: 1
546 }
547 ];
548
549 subtasks.queue(TASK_NAME, expected)
550 .then(() => subtasks.getTotal(TASK_NAME))
551 .then((total) => expect(total).to.be.equals(31))
552 .then(() => subtasks.countBacklog(TASK_NAME))
553 .then((backlogTotal) => expect(backlogTotal).to.be.equals(expected.reduce((total, subtask) => total + subtask.count, 0)))
554 .then(() => done())
555 .catch(done);
556 });
557
558 it('should get counts for provided jobs', (done) => {
559 const expected = [
560 {
561 source: TestConfig.elasticsearch.source,
562 destination: TestConfig.elasticsearch.destination,
563 transfer: {
564 documents: {
565 index: 'myindex1',
566 type: 'mytype1'
567 }
568 }
569 },
570 {
571 source: TestConfig.elasticsearch.source,
572 destination: TestConfig.elasticsearch.destination,
573 transfer: {
574 documents: {
575 index: 'myindex3',
576 type: 'mytype3'
577 }
578 }
579 }
580 ];
581
582 utils.addData(source)
583 .then(() => Promise.map(expected, (subtask) => subtasks.addCount(source, subtask)))
584 .then((subtasksWithCount) => {
585 expect(subtasksWithCount.length).to.be.equals(2);
586
587 let filter = {transfer: {documents: {index: 'myindex1'}}};
588 let target = _.find(subtasksWithCount, filter);
589 expect(target.count).to.be.equals(2);
590
591 filter = {transfer: {documents: {index: 'myindex3'}}};
592 target = _.find(subtasksWithCount, filter);
593 expect(target.count).to.be.equals(3);
594 })
595 .then(() => done())
596 .catch(done);
597 });
598
599 it('should filter out documents by index regex', (done) => {
600 const fakeTask = {
601 source: TestConfig.elasticsearch.source,
602 destination: TestConfig.elasticsearch.destination,
603 mutators: 'path/to/mutators'
604 };
605
606 const filterFunctions = {
607 index: [
608 {
609 predicate: (index) => index.name === 'index_number_1'
610 }
611 ]
612 };
613
614 subtasks.filterDocumentSubtasks(fakeTask, MOCK_ALL_INDICES, filterFunctions)
615 .then((actual) => {
616 expect(actual.length).to.be.equals(3);
617 expect(actual[0].source).to.be.equals(fakeTask.source);
618 expect(actual[0].destination).to.be.equals(fakeTask.destination);
619 expect(actual[0].mutators).to.be.equals(fakeTask.mutators);
620 expect(actual[0].transfer.documents.index).to.be.equals('index_number_1');
621 expect(actual[0].transfer.documents.type).to.be.oneOf([
622 'newtype',
623 'oldtype',
624 'newtype_2'
625 ]);
626
627 expect(actual[1].source).to.be.equals(fakeTask.source);
628 expect(actual[1].destination).to.be.equals(fakeTask.destination);
629 expect(actual[1].mutators).to.be.equals(fakeTask.mutators);
630 expect(actual[1].transfer.documents.index).to.be.equals('index_number_1');
631 expect(actual[1].transfer.documents.type).to.be.oneOf([
632 'newtype',
633 'oldtype',
634 'newtype_2'
635 ]);
636
637 expect(actual[2].source).to.be.equals(fakeTask.source);
638 expect(actual[2].destination).to.be.equals(fakeTask.destination);
639 expect(actual[2].mutators).to.be.equals(fakeTask.mutators);
640 expect(actual[2].transfer.documents.index).to.be.equals('index_number_1');
641 expect(actual[2].transfer.documents.type).to.be.oneOf([
642 'newtype',
643 'oldtype',
644 'newtype_2'
645 ]);
646 })
647 .then(() => done())
648 .catch(done);
649 });
650
651 it('should filter out documents by type regex', (done) => {
652 const fakeTask = {
653 source: TestConfig.elasticsearch.source,
654 destination: TestConfig.elasticsearch.destination,
655 mutators: 'path/to/mutators'
656 };
657
658 const filterFunctions = {
659 type: [
660 {
661 predicate: (type) => type.name === 'newtype'
662 }
663 ]
664 };
665
666 subtasks.filterDocumentSubtasks(fakeTask, MOCK_ALL_INDICES, filterFunctions)
667 .then((actual) => {
668 expect(actual.length).to.be.equals(2);
669 expect(actual[0].source).to.be.equals(fakeTask.source);
670 expect(actual[0].destination).to.be.equals(fakeTask.destination);
671 expect(actual[0].mutators).to.be.equals(fakeTask.mutators);
672 expect(actual[0].transfer.documents.index).to.be.oneOf([
673 'index_number_1',
674 'index_number_2'
675 ]);
676 expect(actual[0].transfer.documents.type).to.be.equals('newtype');
677
678 expect(actual[1].source).to.be.equals(fakeTask.source);
679 expect(actual[1].destination).to.be.equals(fakeTask.destination);
680 expect(actual[1].mutators).to.be.equals(fakeTask.mutators);
681 expect(actual[1].transfer.documents.index).to.be.oneOf([
682 'index_number_1',
683 'index_number_2'
684 ]);
685 expect(actual[1].transfer.documents.type).to.be.equals('newtype');
686 })
687 .then(() => done())
688 .catch(done);
689 });
690
691 it('should prep subtasks backlog considering completed jobs', (done) => {
692 const completedSubtask = {
693 source: TestConfig.elasticsearch.source,
694 destination: TestConfig.elasticsearch.destination,
695 transfer: {
696 documents: {
697 index: 'myindex1',
698 type: 'mytype1'
699 }
700 },
701 count: 10
702 };
703
704 const taskParams = {
705 source: TestConfig.elasticsearch.source,
706 destination: TestConfig.elasticsearch.destination,
707 transfer: {
708 documents: {
709 fromIndices: '*'
710 }
711 }
712 };
713
714 source.indices.create({index: 'myindex1'})
715 .then(() => source.bulk({
716 refresh: true,
717 body: [
718 {
719 index: {
720 _index: 'myindex1',
721 _type: 'mytype1'
722 }
723 },
724 {someField1: 'somedata1'},
725 {
726 index: {
727 _index: 'myindex1',
728 _type: 'mytype1'
729 }
730 },
731 {someField1: 'somedata3'}
732 ]
733 }))
734 .then(() => subtasks.complete(TASK_NAME, completedSubtask))
735 .then(() => subtasks.buildBacklog(TASK_NAME, taskParams))
736 .then(() => subtasks.fetch(TASK_NAME))
737 .then((subtask) => expect(subtask).to.be.null)
738 .then(() => done())
739 .catch(done);
740 });
741
742 it('should prep job backlog with no completed jobs', (done) => {
743 const taskParams = {
744 source: TestConfig.elasticsearch.source,
745 destination: TestConfig.elasticsearch.destination,
746 transfer: {
747 documents: {
748 fromIndices: '*'
749 }
750 }
751 };
752
753 source.indices.create({index: 'myindex1'})
754 .then(() => source.bulk({
755 refresh: true,
756 body: [
757 {
758 index: {
759 _index: 'myindex1',
760 _type: 'mytype1'
761 }
762 },
763 {someField1: 'somedata1'},
764 {
765 index: {
766 _index: 'myindex1',
767 _type: 'mytype1'
768 }
769 },
770 {someField1: 'somedata3'}
771 ]
772 }))
773 .then(() => subtasks.buildBacklog(TASK_NAME, taskParams))
774 .then(() => subtasks.fetch(TASK_NAME))
775 .then((subtask) => {
776 expect(subtask.transfer.documents.index).to.be.equals('myindex1');
777 expect(subtask.transfer.documents.type).to.be.equals('mytype1');
778 })
779 .then(() => subtasks.getTotal(TASK_NAME))
780 .then((total) => expect(total).to.equal(2))
781 .then(() => subtasks.fetch(TASK_NAME))
782 .then((subtask) => expect(subtask).to.be.null)
783 .then(() => done())
784 .catch(done);
785 });
786
787 it('should keep track of progress for a single task', (done) => {
788 const subtask = {
789 source: TestConfig.elasticsearch.source,
790 destination: TestConfig.elasticsearch.destination,
791 transfer: {
792 documents: {
793 index: 'myindex1',
794 type: 'mytype1'
795 }
796 },
797 count: 10
798 };
799
800 const progressUpdate = {
801 tick: 10,
802 total: 20,
803 transferred: 10
804 };
805
806 subtasks.updateProgress(TASK_NAME, subtask, progressUpdate)
807 .then(() => subtasks.getProgress(TASK_NAME, subtask))
808 .then((progress) => {
809 expect(progress.tick).to.be.equals(10);
810 expect(progress.total).to.be.equals(20);
811 expect(progress.transferred).to.be.equals(10);
812 expect(progress.lastModified).to.not.be.undefined;
813
814 progressUpdate.tick = 5;
815 progressUpdate.transferred = 15;
816 })
817 .then(() => subtasks.updateProgress(TASK_NAME, subtask, progressUpdate))
818 .then(() => subtasks.getProgress(TASK_NAME, subtask))
819 .then((progress) => {
820 expect(progress.tick).to.be.equals(5);
821 expect(progress.total).to.be.equals(20);
822 expect(progress.transferred).to.be.equals(15);
823 expect(progress.lastModified).to.not.be.undefined;
824 })
825 .then(() => done())
826 .catch(done);
827 });
828
829 it('should keep track of progress for multiple tasks', (done) => {
830 const subtask1 = {
831 source: TestConfig.elasticsearch.source,
832 destination: TestConfig.elasticsearch.destination,
833 transfer: {
834 documents: {
835 index: 'myindex1',
836 type: 'mytype1'
837 }
838 },
839 count: 10
840 };
841
842 const subtask2 = {
843 source: TestConfig.elasticsearch.source,
844 destination: TestConfig.elasticsearch.destination,
845 transfer: {
846 documents: {
847 index: 'myindex3',
848 type: 'mytype1'
849 }
850 },
851 count: 25
852 };
853
854 const progressUpdate = {
855 tick: 10,
856 total: 20,
857 transferred: 10
858 };
859
860 subtasks.updateProgress(TASK_NAME, subtask1, progressUpdate)
861 .then(() => subtasks.getProgress(TASK_NAME, subtask1))
862 .then((progress) => {
863 expect(progress.tick).to.be.equals(10);
864 expect(progress.total).to.be.equals(20);
865 expect(progress.transferred).to.be.equals(10);
866 expect(progress.lastModified).to.not.be.undefined;
867
868 progressUpdate.tick = 5;
869 progressUpdate.transferred = 15;
870 })
871 .then(() => subtasks.updateProgress(TASK_NAME, subtask2, progressUpdate))
872 .then(() => subtasks.getProgress(TASK_NAME, subtask2))
873 .then((progress) => {
874 expect(progress.tick).to.be.equals(5);
875 expect(progress.total).to.be.equals(20);
876 expect(progress.transferred).to.be.equals(15);
877 expect(progress.lastModified).to.not.be.undefined;
878 })
879 .then(() => tasks.getProgress(TASK_NAME))
880 .then((overallProgress) => {
881 expect(overallProgress.length).to.be.equals(2);
882
883 const predicate = {
884 subtask: {
885 transfer: {
886 documents: {
887 index: 'myindex1'
888 }
889 }
890 }
891 };
892 let target = _.find(overallProgress, predicate);
893 expect(target.progress.tick).to.be.equals(10);
894
895 predicate.subtask.transfer.documents.index = 'myindex3';
896
897 target = _.find(overallProgress, predicate);
898 expect(target.progress.tick).to.be.equals(5);
899 })
900 .then(() => done())
901 .catch(done);
902 });
903
904 it('should keep track of progress for multiple bucketed tasks', (done) => {
905 const subtask1 = {
906 source: TestConfig.elasticsearch.source,
907 destination: TestConfig.elasticsearch.destination,
908 transfer: {
909 documents: {
910 index: 'myindex1',
911 type: 'mytype1',
912 minSize: 0,
913 maxSize: 200,
914 }
915 },
916 count: 10
917 };
918
919 const subtask2 = {
920 source: TestConfig.elasticsearch.source,
921 destination: TestConfig.elasticsearch.destination,
922 transfer: {
923 documents: {
924 index: 'myindex1',
925 type: 'mytype1',
926 minSize: 200,
927 maxSize: 1000000,
928 }
929 },
930 count: 25
931 };
932
933 const progressUpdate = {
934 tick: 10,
935 total: 20,
936 transferred: 10
937 };
938
939 subtasks.updateProgress(TASK_NAME, subtask1, progressUpdate)
940 .then(() => subtasks.getProgress(TASK_NAME, subtask1))
941 .then((progress) => {
942 expect(progress.tick).to.be.equals(10);
943 expect(progress.total).to.be.equals(20);
944 expect(progress.transferred).to.be.equals(10);
945 expect(progress.lastModified).to.not.be.undefined;
946
947 progressUpdate.tick = 5;
948 progressUpdate.transferred = 15;
949 })
950 .then(() => subtasks.updateProgress(TASK_NAME, subtask2, progressUpdate))
951 .then(() => subtasks.getProgress(TASK_NAME, subtask2))
952 .then((progress) => {
953 expect(progress.tick).to.be.equals(5);
954 expect(progress.total).to.be.equals(20);
955 expect(progress.transferred).to.be.equals(15);
956 expect(progress.lastModified).to.not.be.undefined;
957 })
958 .then(() => tasks.getProgress(TASK_NAME))
959 .then((overallProgress) => {
960 expect(overallProgress.length).to.be.equals(2);
961
962 let target = _.find(overallProgress, {
963 subtask: {
964 transfer: {
965 documents: {
966 index: 'myindex1',
967 type: 'mytype1',
968 minSize: 0,
969 maxSize: 200,
970 }
971 }
972 }
973 });
974 expect(target.progress.tick).to.be.equals(10);
975
976 target = _.find(overallProgress, {
977 subtask: {
978 transfer: {
979 documents: {
980 index: 'myindex1',
981 type: 'mytype1',
982 minSize: 200,
983 maxSize: 1000000,
984 }
985 }
986 }
987 });
988 expect(target.progress.tick).to.be.equals(5);
989 })
990 .then(() => done())
991 .catch(done);
992 });
993
994 it('should delete progress of specific subtask', (done) => {
995 const subtask1 = {
996 source: TestConfig.elasticsearch.source,
997 destination: TestConfig.elasticsearch.destination,
998 transfer: {
999 documents: {
1000 index: 'myindex1',
1001 type: 'mytype1'
1002 }
1003 },
1004 count: 10
1005 };
1006
1007 const subtask2 = {
1008 source: TestConfig.elasticsearch.source,
1009 destination: TestConfig.elasticsearch.destination,
1010 transfer: {
1011 documents: {
1012 index: 'myindex3',
1013 type: 'mytype1'
1014 }
1015 },
1016 count: 25
1017 };
1018
1019 const progressUpdate = {
1020 tick: 10,
1021 total: 20,
1022 transferred: 10
1023 };
1024
1025 subtasks.updateProgress(TASK_NAME, subtask1, progressUpdate)
1026 .then(() => subtasks.getProgress(TASK_NAME, subtask1))
1027 .then(() => {
1028 progressUpdate.tick = 5;
1029 progressUpdate.transferred = 15;
1030 })
1031 .then(() => subtasks.updateProgress(TASK_NAME, subtask2, progressUpdate))
1032 .then(() => subtasks.getProgress(TASK_NAME, subtask2))
1033 .then(() => tasks.getProgress(TASK_NAME))
1034 .then((overallProgress) => expect(overallProgress.length).to.be.equals(2))
1035 .then(() => subtasks.removeProgress(TASK_NAME, subtask1))
1036 .then(() => tasks.getProgress(TASK_NAME))
1037 .then((overallProgress) => {
1038 expect(overallProgress.length).to.be.equals(1);
1039 expect(overallProgress[0].subtask.transfer.documents.index).to.be.equals('myindex3');
1040 expect(overallProgress[0].progress.tick).to.be.equals(5);
1041 })
1042 .then(() => done())
1043 .catch(done);
1044 });
1045});