UNPKG

4.5 kBJavaScriptView Raw
1/*eslint no-magic-numbers: "off"*/
2/*eslint no-invalid-this: "off"*/
3const _ = require('lodash');
4const expect = require('chai').expect;
5const Promise = require('bluebird');
6const TestConfig = require('../config');
7const Utils = require('../utils');
8const Manager = require('../../app/services/manager');
9const Subtasks = require('../../app/services/subtasks');
10const Tasks = require('../../app/services/tasks');
11const Worker = require('../../app/services/worker');
12const createEsClient = require('../../config/elasticsearch.js');
13const createRedisClient = require('../../config/redis');
14const config = require('../../config/index');
15
16const log = config.log;
17const utils = new Utils();
18
19Promise.longStackTraces();
20Promise.onPossiblyUnhandledRejection((error) => log.error('Likely error: ', error.stack));
21
22describe('worker', function () {
23 this.timeout(10000);
24
25 const TASK1_NAME = 'task1';
26 let source = null;
27 let dest = null;
28 let redis = null;
29 let manager = null;
30 let tasks = null;
31 let subtasks = null;
32 let worker = null;
33
34 before((done) => {
35 Worker.__overrideCheckInterval(100);
36
37 source = createEsClient(TestConfig.elasticsearch.source);
38 dest = createEsClient(TestConfig.elasticsearch.destination);
39 redis = createRedisClient(TestConfig.redis.host, TestConfig.redis.port);
40 manager = new Manager(redis);
41 tasks = new Tasks(redis);
42 subtasks = new Subtasks(redis);
43 worker = new Worker(redis);
44
45 utils.deleteAllTemplates(source)
46 .finally(() => utils.deleteAllTemplates(dest))
47 .finally(() => utils.deleteAllIndices(source))
48 .finally(() => utils.deleteAllIndices(dest))
49 .finally(() => redis.flushdb())
50 .finally(() => done());
51 });
52
53 it('should perform transfers queued by manager', (done) => {
54 const taskParams = {
55 source: TestConfig.elasticsearch.source,
56 destination: TestConfig.elasticsearch.destination,
57 transfer: {
58 documents: {
59 fromIndices: '*'
60 }
61 }
62 };
63
64 const indexConfigs = [
65 {index: 'first', type: 'type1'},
66 {index: 'second', type: 'mytype1'}
67 ];
68
69 const data = [];
70 _.times(10, (n) => {
71 data.push({index: {_index: indexConfigs[0].index, _type: indexConfigs[0].type}});
72 data.push({something: `data${n}`});
73 });
74
75 _.times(5, (n) => {
76 data.push({index: {_index: indexConfigs[1].index, _type: indexConfigs[1].type}});
77 data.push({something: `data${n}`});
78 });
79
80 let totalTransferred = 0;
81 const progressUpdates = (taskName, subtask, update) => totalTransferred += update.tick;
82
83 const completedSubtasks = [];
84 const completeCallback = (taskName, subtask) => {
85 expect(taskName).to.be.equals(TASK1_NAME);
86 completedSubtasks.push(subtask);
87
88 if (completedSubtasks.length >= 2) {
89 expect(totalTransferred).to.be.equals(15);
90 manager.setRunning(false);
91 worker.killStopped();
92 done();
93 }
94 };
95
96 worker.setUpdateCallback(progressUpdates);
97 worker.setCompletedCallback(completeCallback);
98
99 source.indices.create({index: 'first'})
100 .then(() => source.indices.create({index: 'second'}))
101 .then(() => source.bulk({body: data}))
102 .then((results) => {
103 if (results.errors) {
104 log.error('errors', JSON.stringify(results, null, 2));
105 done('fail');
106 return Promise.reject(`errors: ${results.errors}`);
107 } else {
108 return source.indices.refresh();
109 }
110 })
111 .then(() => utils.deleteAllIndices(dest))
112 .then(() => dest.search())
113 .then((results) => expect(results.hits.total).to.be.equals(0))
114 .then(() => dest.indices.create({index: 'first'}))
115 .then(() => dest.indices.create({index: 'second'}))
116 .then(() => tasks.add(TASK1_NAME, taskParams))
117 .then(() => subtasks.getBacklog(TASK1_NAME))
118 .then((backlogJobs) => {
119 expect(backlogJobs.length).to.be.equals(2);
120
121 let target = _.find(backlogJobs, {
122 transfer: {documents: {index: 'first', type: 'type1'}}
123 });
124 expect(target.count).to.be.equals(10);
125
126 target = _.find(backlogJobs, {
127 transfer: {documents: {index: 'second', type: 'mytype1'}}
128 });
129 expect(target.count).to.be.equals(5);
130 })
131 .then(() => manager.setRunning(true));
132 });
133});
\No newline at end of file