/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

@Category(value={MediumTests.class})
public class TestSplitLogWorker {
    private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
    private static final int WAIT_TIME = 15000;
    private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
    private static final HBaseTestingUtility TEST_UTIL;
    private ZooKeeperWatcher zkw;
    private SplitLogWorker slw;
    private ExecutorService executorService;
    SplitLogWorker.TaskExecutor neverEndingTask = new SplitLogWorker.TaskExecutor(){

        @Override
        public SplitLogWorker.TaskExecutor.Status exec(String name, CancelableProgressable p) {
            do {
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    return SplitLogWorker.TaskExecutor.Status.PREEMPTED;
                }
            } while (p.progress());
            return SplitLogWorker.TaskExecutor.Status.PREEMPTED;
        }
    };

    private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems) throws Exception {
        Assert.assertTrue((String)("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval), (boolean)this.waitForCounterBoolean(ctr, oldval, newval, timems));
    }

    private boolean waitForCounterBoolean(AtomicLong ctr, long oldval, long newval, long timems) throws Exception {
        return this.waitForCounterBoolean(ctr, oldval, newval, timems, true);
    }

    private boolean waitForCounterBoolean(final AtomicLong ctr, long oldval, final long newval, long timems, boolean failIfTimeout) throws Exception {
        long timeWaited = TEST_UTIL.waitFor(timems, 10L, failIfTimeout, new Waiter.Predicate<Exception>(){

            @Override
            public boolean evaluate() throws Exception {
                return ctr.get() >= newval;
            }
        });
        if (timeWaited > 0L) {
            Assert.assertEquals((long)newval, (long)ctr.get());
        }
        return true;
    }

    @Before
    public void setup() throws Exception {
        TEST_UTIL.startMiniZKCluster();
        this.zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null);
        ZKUtil.deleteChildrenRecursively(this.zkw, this.zkw.baseZNode);
        ZKUtil.createAndFailSilent(this.zkw, this.zkw.baseZNode);
        Assert.assertTrue((ZKUtil.checkExists(this.zkw, this.zkw.baseZNode) != -1 ? 1 : 0) != 0);
        LOG.debug(this.zkw.baseZNode + " created");
        ZKUtil.createAndFailSilent(this.zkw, this.zkw.splitLogZNode);
        Assert.assertTrue((ZKUtil.checkExists(this.zkw, this.zkw.splitLogZNode) != -1 ? 1 : 0) != 0);
        LOG.debug(this.zkw.splitLogZNode + " created");
        ZKUtil.createAndFailSilent(this.zkw, this.zkw.rsZNode);
        Assert.assertTrue((ZKUtil.checkExists(this.zkw, this.zkw.rsZNode) != -1 ? 1 : 0) != 0);
        SplitLogCounters.resetCounters();
        this.executorService = new ExecutorService("TestSplitLogWorker");
        this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
    }

    @After
    public void teardown() throws Exception {
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        TEST_UTIL.shutdownMiniZKCluster();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testAcquireTaskAtStartup() throws Exception {
        LOG.info("testAcquireTaskAtStartup");
        SplitLogCounters.resetCounters();
        String TATAS = "tatas";
        ServerName RS = ServerName.valueOf("rs,1,1");
        RegionServerServices mockedRS = this.getRegionServer(RS);
        this.zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(this.zkw, "tatas"), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        SplitLogWorker slw = new SplitLogWorker(this.zkw, TEST_UTIL.getConfiguration(), mockedRS, this.neverEndingTask);
        slw.start();
        try {
            this.waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 1L, 15000L);
            byte[] bytes = ZKUtil.getData(this.zkw, ZKSplitLog.getEncodedNodeName(this.zkw, "tatas"));
            SplitLogTask slt = SplitLogTask.parseFrom(bytes);
            Assert.assertTrue((boolean)slt.isOwned(RS));
        }
        finally {
            this.stopSplitLogWorker(slw);
        }
    }

    private void stopSplitLogWorker(SplitLogWorker slw) throws InterruptedException {
        if (slw != null) {
            slw.stop();
            slw.worker.join(15000L);
            if (slw.worker.isAlive()) {
                Assert.assertTrue(("Could not stop the worker thread slw=" + slw == null ? 1 : 0) != 0);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testRaceForTask() throws Exception {
        LOG.info("testRaceForTask");
        SplitLogCounters.resetCounters();
        String TRFT = "trft";
        ServerName SVR1 = ServerName.valueOf("svr1,1,1");
        ServerName SVR2 = ServerName.valueOf("svr2,1,1");
        this.zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(this.zkw, "trft"), new SplitLogTask.Unassigned(this.MANAGER).toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        RegionServerServices mockedRS1 = this.getRegionServer(SVR1);
        RegionServerServices mockedRS2 = this.getRegionServer(SVR2);
        SplitLogWorker slw1 = new SplitLogWorker(this.zkw, TEST_UTIL.getConfiguration(), mockedRS1, this.neverEndingTask);
        SplitLogWorker slw2 = new SplitLogWorker(this.zkw, TEST_UTIL.getConfiguration(), mockedRS2, this.neverEndingTask);
        slw1.start();
        slw2.start();
        try {
            this.waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 1L, 15000L);
            Assert.assertTrue((this.waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0L, 1L, 15000L, false) || SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1L ? 1 : 0) != 0);
            byte[] bytes = ZKUtil.getData(this.zkw, ZKSplitLog.getEncodedNodeName(this.zkw, "trft"));
            SplitLogTask slt = SplitLogTask.parseFrom(bytes);
            Assert.assertTrue((slt.isOwned(SVR1) || slt.isOwned(SVR2) ? 1 : 0) != 0);
        }
        finally {
            this.stopSplitLogWorker(slw1);
            this.stopSplitLogWorker(slw2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testPreemptTask() throws Exception {
        LOG.info("testPreemptTask");
        SplitLogCounters.resetCounters();
        ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
        String PATH = ZKSplitLog.getEncodedNodeName(this.zkw, "tpt_task");
        RegionServerServices mockedRS = this.getRegionServer(SRV);
        SplitLogWorker slw = new SplitLogWorker(this.zkw, TEST_UTIL.getConfiguration(), mockedRS, this.neverEndingTask);
        slw.start();
        try {
            Thread.yield();
            Thread.sleep(1000L);
            this.waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0L, 1L, 15000L);
            this.zkw.getRecoverableZooKeeper().create(PATH, new SplitLogTask.Unassigned(this.MANAGER).toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            this.waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 1L, 15000L);
            Assert.assertEquals((long)1L, (long)slw.taskReadySeq);
            byte[] bytes = ZKUtil.getData(this.zkw, PATH);
            SplitLogTask slt = SplitLogTask.parseFrom(bytes);
            Assert.assertTrue((boolean)slt.isOwned(SRV));
            slt = new SplitLogTask.Owned(this.MANAGER);
            ZKUtil.setData(this.zkw, PATH, slt.toByteArray());
            this.waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0L, 1L, 15000L);
        }
        finally {
            this.stopSplitLogWorker(slw);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testMultipleTasks() throws Exception {
        LOG.info("testMultipleTasks");
        SplitLogCounters.resetCounters();
        ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
        String PATH1 = ZKSplitLog.getEncodedNodeName(this.zkw, "tmt_task");
        RegionServerServices mockedRS = this.getRegionServer(SRV);
        SplitLogWorker slw = new SplitLogWorker(this.zkw, TEST_UTIL.getConfiguration(), mockedRS, this.neverEndingTask);
        slw.start();
        try {
            Thread.yield();
            Thread.sleep(100L);
            this.waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0L, 1L, 15000L);
            SplitLogTask.Unassigned unassignedManager = new SplitLogTask.Unassigned(this.MANAGER);
            this.zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            this.waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 1L, 15000L);
            String PATH2 = ZKSplitLog.getEncodedNodeName(this.zkw, "tmt_task_2");
            this.zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
            SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
            ZKUtil.setData(this.zkw, PATH1, slt.toByteArray());
            this.waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0L, 1L, 15000L);
            this.waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1L, 2L, 15000L);
            Assert.assertEquals((long)2L, (long)slw.taskReadySeq);
            byte[] bytes = ZKUtil.getData(this.zkw, PATH2);
            slt = SplitLogTask.parseFrom(bytes);
            Assert.assertTrue((boolean)slt.isOwned(SRV));
        }
        finally {
            this.stopSplitLogWorker(slw);
        }
    }

    @Test(timeout=60000L)
    public void testRescan() throws Exception {
        LOG.info("testRescan");
        SplitLogCounters.resetCounters();
        ServerName SRV = ServerName.valueOf("svr,1,1");
        RegionServerServices mockedRS = this.getRegionServer(SRV);
        this.slw = new SplitLogWorker(this.zkw, TEST_UTIL.getConfiguration(), mockedRS, this.neverEndingTask);
        this.slw.start();
        Thread.yield();
        Thread.sleep(100L);
        String task = ZKSplitLog.getEncodedNodeName(this.zkw, "task");
        SplitLogTask slt = new SplitLogTask.Unassigned(this.MANAGER);
        this.zkw.getRecoverableZooKeeper().create(task, slt.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        this.waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 1L, 15000L);
        ZKUtil.setData(this.zkw, task, slt.toByteArray());
        this.waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0L, 1L, 15000L);
        String rescan = ZKSplitLog.getEncodedNodeName(this.zkw, "RESCAN");
        rescan = this.zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        this.waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1L, 2L, 15000L);
        ZKUtil.setData(this.zkw, task, slt.toByteArray());
        this.waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1L, 2L, 15000L);
        this.waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0L, 1L, 15000L);
        List<String> nodes = ZKUtil.listChildrenNoWatch(this.zkw, this.zkw.splitLogZNode);
        LOG.debug(nodes);
        int num = 0;
        for (String node : nodes) {
            ++num;
            if (!node.startsWith("RESCAN")) continue;
            String name = ZKSplitLog.getEncodedNodeName(this.zkw, node);
            String fn = ZKSplitLog.getFileName(name);
            byte[] data = ZKUtil.getData(this.zkw, ZKUtil.joinZNode(this.zkw.splitLogZNode, fn));
            slt = SplitLogTask.parseFrom(data);
            Assert.assertTrue((String)slt.toString(), (boolean)slt.isDone(SRV));
        }
        Assert.assertEquals((long)2L, (long)num);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testAcquireMultiTasks() throws Exception {
        LOG.info("testAcquireMultiTasks");
        SplitLogCounters.resetCounters();
        String TATAS = "tatas";
        ServerName RS = ServerName.valueOf("rs,1,1");
        int maxTasks = 3;
        Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
        testConf.setInt("hbase.regionserver.wal.max.splitters", 3);
        RegionServerServices mockedRS = this.getRegionServer(RS);
        for (int i = 0; i < 3; ++i) {
            this.zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(this.zkw, "tatas" + i), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        SplitLogWorker slw = new SplitLogWorker(this.zkw, testConf, mockedRS, this.neverEndingTask);
        slw.start();
        try {
            this.waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 3L, 15000L);
            for (int i = 0; i < 3; ++i) {
                byte[] bytes = ZKUtil.getData(this.zkw, ZKSplitLog.getEncodedNodeName(this.zkw, "tatas" + i));
                SplitLogTask slt = SplitLogTask.parseFrom(bytes);
                Assert.assertTrue((boolean)slt.isOwned(RS));
            }
        }
        finally {
            this.stopSplitLogWorker(slw);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
        LOG.info("testAcquireMultiTasks");
        SplitLogCounters.resetCounters();
        String TATAS = "tatas";
        ServerName RS = ServerName.valueOf("rs,1,1");
        ServerName RS2 = ServerName.valueOf("rs,1,2");
        int maxTasks = 3;
        Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
        testConf.setInt("hbase.regionserver.wal.max.splitters", 3);
        RegionServerServices mockedRS = this.getRegionServer(RS);
        String rsPath = ZKUtil.joinZNode(this.zkw.rsZNode, RS.getServerName());
        this.zkw.getRecoverableZooKeeper().create(rsPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        rsPath = ZKUtil.joinZNode(this.zkw.rsZNode, RS2.getServerName());
        this.zkw.getRecoverableZooKeeper().create(rsPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        for (int i = 0; i < 3; ++i) {
            this.zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(this.zkw, "tatas" + i), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        SplitLogWorker slw = new SplitLogWorker(this.zkw, testConf, mockedRS, this.neverEndingTask);
        slw.start();
        try {
            int acquiredTasks = 0;
            this.waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 2L, 15000L);
            for (int i = 0; i < 3; ++i) {
                byte[] bytes = ZKUtil.getData(this.zkw, ZKSplitLog.getEncodedNodeName(this.zkw, "tatas" + i));
                SplitLogTask slt = SplitLogTask.parseFrom(bytes);
                if (!slt.isOwned(RS)) continue;
                ++acquiredTasks;
            }
            Assert.assertEquals((long)2L, (long)acquiredTasks);
        }
        finally {
            this.stopSplitLogWorker(slw);
        }
    }

    private RegionServerServices getRegionServer(ServerName name) {
        RegionServerServices mockedServer = (RegionServerServices)Mockito.mock(RegionServerServices.class);
        Mockito.when((Object)mockedServer.getConfiguration()).thenReturn((Object)TEST_UTIL.getConfiguration());
        Mockito.when((Object)mockedServer.getServerName()).thenReturn((Object)name);
        Mockito.when((Object)mockedServer.getZooKeeper()).thenReturn((Object)this.zkw);
        Mockito.when((Object)mockedServer.isStopped()).thenReturn((Object)false);
        Mockito.when((Object)mockedServer.getExecutorService()).thenReturn((Object)this.executorService);
        return mockedServer;
    }

    static {
        Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
        TEST_UTIL = new HBaseTestingUtility();
    }
}

