1 |
|
2 |
|
3 | const _ = require('lodash');
|
4 | const expect = require('chai').expect;
|
5 | const Promise = require('bluebird');
|
6 | const TestConfig = require('../config');
|
7 | const Utils = require('../utils');
|
8 | const Manager = require('../../app/services/manager');
|
9 | const Subtasks = require('../../app/services/subtasks');
|
10 | const Tasks = require('../../app/services/tasks');
|
11 | const Worker = require('../../app/services/worker');
|
12 | const createEsClient = require('../../config/elasticsearch.js');
|
13 | const createRedisClient = require('../../config/redis');
|
14 | const config = require('../../config/index');
|
15 |
|
16 | const log = config.log;
|
17 | const utils = new Utils();
|
18 |
|
19 | Promise.longStackTraces();
|
20 | Promise.onPossiblyUnhandledRejection((error) => log.error('Likely error: ', error.stack));
|
21 |
|
22 | describe('worker', function () {
|
23 | this.timeout(10000);
|
24 |
|
25 | const TASK1_NAME = 'task1';
|
26 | let source = null;
|
27 | let dest = null;
|
28 | let redis = null;
|
29 | let manager = null;
|
30 | let tasks = null;
|
31 | let subtasks = null;
|
32 | let worker = null;
|
33 |
|
34 | before((done) => {
|
35 | Worker.__overrideCheckInterval(100);
|
36 |
|
37 | source = createEsClient(TestConfig.elasticsearch.source);
|
38 | dest = createEsClient(TestConfig.elasticsearch.destination);
|
39 | redis = createRedisClient(TestConfig.redis.host, TestConfig.redis.port);
|
40 | manager = new Manager(redis);
|
41 | tasks = new Tasks(redis);
|
42 | subtasks = new Subtasks(redis);
|
43 | worker = new Worker(redis);
|
44 |
|
45 | utils.deleteAllTemplates(source)
|
46 | .finally(() => utils.deleteAllTemplates(dest))
|
47 | .finally(() => utils.deleteAllIndices(source))
|
48 | .finally(() => utils.deleteAllIndices(dest))
|
49 | .finally(() => redis.flushdb())
|
50 | .finally(() => done());
|
51 | });
|
52 |
|
53 | it('should perform transfers queued by manager', (done) => {
|
54 | const taskParams = {
|
55 | source: TestConfig.elasticsearch.source,
|
56 | destination: TestConfig.elasticsearch.destination,
|
57 | transfer: {
|
58 | documents: {
|
59 | fromIndices: '*'
|
60 | }
|
61 | }
|
62 | };
|
63 |
|
64 | const indexConfigs = [
|
65 | {index: 'first', type: 'type1'},
|
66 | {index: 'second', type: 'mytype1'}
|
67 | ];
|
68 |
|
69 | const data = [];
|
70 | _.times(10, (n) => {
|
71 | data.push({index: {_index: indexConfigs[0].index, _type: indexConfigs[0].type}});
|
72 | data.push({something: `data${n}`});
|
73 | });
|
74 |
|
75 | _.times(5, (n) => {
|
76 | data.push({index: {_index: indexConfigs[1].index, _type: indexConfigs[1].type}});
|
77 | data.push({something: `data${n}`});
|
78 | });
|
79 |
|
80 | let totalTransferred = 0;
|
81 | const progressUpdates = (taskName, subtask, update) => totalTransferred += update.tick;
|
82 |
|
83 | const completedSubtasks = [];
|
84 | const completeCallback = (taskName, subtask) => {
|
85 | expect(taskName).to.be.equals(TASK1_NAME);
|
86 | completedSubtasks.push(subtask);
|
87 |
|
88 | if (completedSubtasks.length >= 2) {
|
89 | expect(totalTransferred).to.be.equals(15);
|
90 | manager.setRunning(false);
|
91 | worker.killStopped();
|
92 | done();
|
93 | }
|
94 | };
|
95 |
|
96 | worker.setUpdateCallback(progressUpdates);
|
97 | worker.setCompletedCallback(completeCallback);
|
98 |
|
99 | source.indices.create({index: 'first'})
|
100 | .then(() => source.indices.create({index: 'second'}))
|
101 | .then(() => source.bulk({body: data}))
|
102 | .then((results) => {
|
103 | if (results.errors) {
|
104 | log.error('errors', JSON.stringify(results, null, 2));
|
105 | done('fail');
|
106 | return Promise.reject(`errors: ${results.errors}`);
|
107 | } else {
|
108 | return source.indices.refresh();
|
109 | }
|
110 | })
|
111 | .then(() => utils.deleteAllIndices(dest))
|
112 | .then(() => dest.search())
|
113 | .then((results) => expect(results.hits.total).to.be.equals(0))
|
114 | .then(() => dest.indices.create({index: 'first'}))
|
115 | .then(() => dest.indices.create({index: 'second'}))
|
116 | .then(() => tasks.add(TASK1_NAME, taskParams))
|
117 | .then(() => subtasks.getBacklog(TASK1_NAME))
|
118 | .then((backlogJobs) => {
|
119 | expect(backlogJobs.length).to.be.equals(2);
|
120 |
|
121 | let target = _.find(backlogJobs, {
|
122 | transfer: {documents: {index: 'first', type: 'type1'}}
|
123 | });
|
124 | expect(target.count).to.be.equals(10);
|
125 |
|
126 | target = _.find(backlogJobs, {
|
127 | transfer: {documents: {index: 'second', type: 'mytype1'}}
|
128 | });
|
129 | expect(target.count).to.be.equals(5);
|
130 | })
|
131 | .then(() => manager.setRunning(true));
|
132 | });
|
133 | }); |
\ | No newline at end of file |