/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;

@InterfaceAudience.Private
public class SplitLogWorker
extends ZooKeeperListener
implements Runnable {
    public static final int DEFAULT_MAX_SPLITTERS = 2;
    private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
    private static final int checkInterval = 5000;
    private static final int FAILED_TO_OWN_TASK = -1;
    Thread worker;
    private final ServerName serverName;
    private final TaskExecutor splitTaskExecutor;
    private final ExecutorService executorService;
    private final Object taskReadyLock = new Object();
    volatile int taskReadySeq = 0;
    private volatile String currentTask = null;
    private int currentVersion;
    private volatile boolean exitWorker;
    private final Object grabTaskLock = new Object();
    private boolean workerInGrabTask = false;
    private final int report_period;
    private RegionServerServices server = null;
    private Configuration conf = null;
    protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
    private int maxConcurrentTasks = 0;

    public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server, TaskExecutor splitTaskExecutor) {
        super(watcher);
        this.server = server;
        this.serverName = server.getServerName();
        this.splitTaskExecutor = splitTaskExecutor;
        this.report_period = conf.getInt("hbase.splitlog.report.period", conf.getInt("hbase.splitlog.manager.timeout", 120000) / 3);
        this.conf = conf;
        this.executorService = this.server.getExecutorService();
        this.maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", 2);
    }

    public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf, RegionServerServices server, final LastSequenceId sequenceIdChecker) {
        this(watcher, conf, server, new TaskExecutor(){

            @Override
            public TaskExecutor.Status exec(String filename, CancelableProgressable p) {
                FileSystem fs;
                Path rootdir;
                try {
                    rootdir = FSUtils.getRootDir(conf);
                    fs = rootdir.getFileSystem(conf);
                }
                catch (IOException e) {
                    LOG.warn("could not find root dir or fs", e);
                    return TaskExecutor.Status.RESIGNED;
                }
                try {
                    if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)), fs, conf, p, sequenceIdChecker, watcher)) {
                        return TaskExecutor.Status.PREEMPTED;
                    }
                }
                catch (InterruptedIOException iioe) {
                    LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
                    return TaskExecutor.Status.RESIGNED;
                }
                catch (IOException e) {
                    Throwable cause = e.getCause();
                    if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException || cause instanceof ConnectException || cause instanceof SocketTimeoutException)) {
                        LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, " + "resigning", e);
                        return TaskExecutor.Status.RESIGNED;
                    }
                    if (cause instanceof InterruptedException) {
                        LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
                        return TaskExecutor.Status.RESIGNED;
                    }
                    if (cause instanceof KeeperException) {
                        LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
                        return TaskExecutor.Status.RESIGNED;
                    }
                    LOG.warn("log splitting of " + filename + " failed, returning error", e);
                    return TaskExecutor.Status.ERR;
                }
                return TaskExecutor.Status.DONE;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            LOG.info("SplitLogWorker " + this.serverName + " starting");
            this.watcher.registerListener(this);
            boolean distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
            if (distributedLogReplay) {
                HConnectionManager.getConnection(this.conf);
            }
            int res = -1;
            while (res == -1 && !this.exitWorker) {
                try {
                    res = ZKUtil.checkExists(this.watcher, this.watcher.splitLogZNode);
                }
                catch (KeeperException e) {
                    LOG.warn("Exception when checking for " + this.watcher.splitLogZNode + " ... retrying", e);
                }
                if (res != -1) continue;
                try {
                    LOG.info(this.watcher.splitLogZNode + " znode does not exist, waiting for master to create");
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    LOG.debug("Interrupted while waiting for " + this.watcher.splitLogZNode + (this.exitWorker ? "" : " (ERROR: exitWorker is not set, exiting anyway)"));
                    this.exitWorker = true;
                    break;
                }
            }
            if (!this.exitWorker) {
                this.taskLoop();
            }
        }
        catch (Throwable t) {
            LOG.error("unexpected error ", t);
        }
        finally {
            LOG.info("SplitLogWorker " + this.serverName + " exiting");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void taskLoop() {
        while (!this.exitWorker) {
            int seq_start = this.taskReadySeq;
            List<String> paths = this.getTaskList();
            if (paths == null) {
                LOG.warn("Could not get tasks, did someone remove " + this.watcher.splitLogZNode + " ... worker thread exiting.");
                return;
            }
            int offset = (int)(Math.random() * (double)paths.size());
            for (int i = 0; i < paths.size(); ++i) {
                if (!HLogUtil.isMetaFile(paths.get(i))) continue;
                offset = i;
                break;
            }
            int numTasks = paths.size();
            for (int i = 0; i < numTasks; ++i) {
                int idx = (i + offset) % paths.size();
                if (this.calculateAvailableSplitters(numTasks) > 0) {
                    this.grabTask(ZKUtil.joinZNode(this.watcher.splitLogZNode, paths.get(idx)));
                    if (!this.exitWorker) continue;
                    return;
                } else {
                    LOG.debug("Current region server " + this.serverName + " has " + this.tasksInProgress.get() + " tasks in progress and can't take more.");
                    break;
                }
            }
            SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
            Object object = this.taskReadyLock;
            synchronized (object) {
                block9: while (seq_start == this.taskReadySeq) {
                    try {
                        Map<String, HRegion> recoveringRegions;
                        this.taskReadyLock.wait(5000L);
                        if (this.server == null || (recoveringRegions = this.server.getRecoveringRegions()).isEmpty()) continue;
                        ArrayList<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
                        for (String region : tmpCopy) {
                            String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
                            try {
                                if (ZKUtil.checkExists(this.watcher, nodePath) != -1) continue block9;
                                HRegion r = recoveringRegions.remove(region);
                                if (r != null) {
                                    r.setRecovering(false);
                                }
                                LOG.debug("Mark recovering region:" + region + " up.");
                            }
                            catch (KeeperException e) {
                                LOG.debug("Got a zookeeper when trying to open a recovering region", e);
                                continue block9;
                            }
                        }
                    }
                    catch (InterruptedException e) {
                        LOG.info("SplitLogWorker interrupted while waiting for task, exiting: " + e.toString() + (this.exitWorker ? "" : " (ERROR: exitWorker is not set, exiting anyway)"));
                        this.exitWorker = true;
                        return;
                    }
                }
            }
        }
        return;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void grabTask(String path) {
        SplitLogTask slt;
        Stat stat = new Stat();
        long t = -1L;
        Object object = this.grabTaskLock;
        synchronized (object) {
            this.currentTask = path;
            this.workerInGrabTask = true;
            if (Thread.interrupted()) {
                return;
            }
        }
        try {
            byte[] data = ZKUtil.getDataNoWatch(this.watcher, path, stat);
            if (data == null) {
                SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
                return;
            }
            try {
                slt = SplitLogTask.parseFrom(data);
            }
            catch (DeserializationException e) {
                LOG.warn("Failed parse data for znode " + path, e);
                SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
                Object object2 = this.grabTaskLock;
                synchronized (object2) {
                    this.workerInGrabTask = false;
                    Thread.interrupted();
                    return;
                }
            }
        }
        catch (KeeperException e) {
            LOG.warn("Failed to get data for znode " + path, e);
            SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
            return;
        }
        if (!slt.isUnassigned()) {
            SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
            return;
        }
        this.currentVersion = SplitLogWorker.attemptToOwnTask(true, this.watcher, this.serverName, path, stat.getVersion());
        if (this.currentVersion < 0) {
            SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
            return;
        }
        if (ZKSplitLog.isRescanNode(this.watcher, this.currentTask)) {
            HLogSplitterHandler.endTask(this.watcher, new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_acquired_rescan, this.currentTask, this.currentVersion);
            return;
        }
        LOG.info("worker " + this.serverName + " acquired task " + path);
        SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
        this.getDataSetWatchAsync();
        this.submitTask(path, this.currentVersion, this.report_period);
        try {
            int sleepTime = RandomUtils.nextInt(500) + 500;
            Thread.sleep(sleepTime);
            return;
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while yielding for other region servers", e);
            Thread.currentThread().interrupt();
            return;
        }
        finally {
            object = this.grabTaskLock;
            synchronized (object) {
                this.workerInGrabTask = false;
                Thread.interrupted();
            }
        }
    }

    protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw, ServerName server, String task, int taskZKVersion) {
        int latestZKVersion = -1;
        try {
            SplitLogTask.Owned slt = new SplitLogTask.Owned(server);
            Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
            if (stat == null) {
                LOG.warn("zk.setData() returned null for path " + task);
                SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
                return -1;
            }
            latestZKVersion = stat.getVersion();
            SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
            return latestZKVersion;
        }
        catch (KeeperException e) {
            if (!isFirstTime) {
                if (e.code().equals(KeeperException.Code.NONODE)) {
                    LOG.warn("NONODE failed to assert ownership for " + task, e);
                } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
                    LOG.warn("BADVERSION failed to assert ownership for " + task, e);
                } else {
                    LOG.warn("failed to assert ownership for " + task, e);
                }
            }
        }
        catch (InterruptedException e1) {
            LOG.warn("Interrupted while trying to assert ownership of " + task + " " + StringUtils.stringifyException(e1));
            Thread.currentThread().interrupt();
        }
        SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
        return -1;
    }

    private int calculateAvailableSplitters(int numTasks) {
        int availableRSs = 1;
        try {
            List<String> regionServers = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.rsZNode);
            availableRSs = Math.max(availableRSs, regionServers == null ? 0 : regionServers.size());
        }
        catch (KeeperException e) {
            LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
        }
        int expectedTasksPerRS = numTasks / availableRSs + (numTasks % availableRSs == 0 ? 0 : 1);
        expectedTasksPerRS = Math.max(1, expectedTasksPerRS);
        return Math.min(expectedTasksPerRS, this.maxConcurrentTasks) - this.tasksInProgress.get();
    }

    void submitTask(final String curTask, int curTaskZKVersion, final int reportPeriod) {
        final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
        CancelableProgressable reporter = new CancelableProgressable(){
            private long last_report_at = 0L;

            @Override
            public boolean progress() {
                long t = EnvironmentEdgeManager.currentTimeMillis();
                if (t - this.last_report_at > (long)reportPeriod) {
                    this.last_report_at = t;
                    int latestZKVersion = SplitLogWorker.attemptToOwnTask(false, SplitLogWorker.this.watcher, SplitLogWorker.this.serverName, curTask, zkVersion.intValue());
                    if (latestZKVersion < 0) {
                        LOG.warn("Failed to heartbeat the task" + curTask);
                        return false;
                    }
                    zkVersion.setValue(latestZKVersion);
                }
                return true;
            }
        };
        HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress, this.splitTaskExecutor);
        this.executorService.submit(hsh);
    }

    void getDataSetWatchAsync() {
        this.watcher.getRecoverableZooKeeper().getZooKeeper().getData(this.currentTask, this.watcher, (AsyncCallback.DataCallback)new GetDataAsyncCallback(), null);
        SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void getDataSetWatchSuccess(String path, byte[] data) {
        SplitLogTask slt;
        try {
            slt = SplitLogTask.parseFrom(data);
        }
        catch (DeserializationException e) {
            LOG.warn("Failed parse", e);
            return;
        }
        Object object = this.grabTaskLock;
        synchronized (object) {
            String taskpath;
            if (this.workerInGrabTask && (taskpath = this.currentTask) != null && taskpath.equals(path) && !slt.isOwned(this.serverName) && !slt.isDone(this.serverName) && !slt.isErr(this.serverName) && !slt.isResigned(this.serverName)) {
                LOG.info("task " + taskpath + " preempted from " + this.serverName + ", current task state and owner=" + slt.toString());
                this.stopTask();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void getDataSetWatchFailure(String path) {
        Object object = this.grabTaskLock;
        synchronized (object) {
            String taskpath;
            if (this.workerInGrabTask && (taskpath = this.currentTask) != null && taskpath.equals(path)) {
                LOG.info("retrying data watch on " + path);
                SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
                this.getDataSetWatchAsync();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void nodeDataChanged(String path) {
        Object object = this.grabTaskLock;
        synchronized (object) {
            String taskpath;
            if (this.workerInGrabTask && (taskpath = this.currentTask) != null && taskpath.equals(path)) {
                this.getDataSetWatchAsync();
            }
        }
    }

    private List<String> getTaskList() {
        List<String> childrenPaths = null;
        long sleepTime = 1000L;
        while (!this.exitWorker) {
            try {
                childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, this.watcher.splitLogZNode);
                if (childrenPaths != null) {
                    return childrenPaths;
                }
            }
            catch (KeeperException e) {
                LOG.warn("Could not get children of znode " + this.watcher.splitLogZNode, e);
            }
            try {
                LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode + " after sleep for " + sleepTime + "ms!");
                Thread.sleep(sleepTime);
            }
            catch (InterruptedException e1) {
                LOG.warn("Interrupted while trying to get task list ...", e1);
                Thread.currentThread().interrupt();
            }
        }
        return childrenPaths;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void nodeChildrenChanged(String path) {
        if (path.equals(this.watcher.splitLogZNode)) {
            LOG.debug("tasks arrived or departed");
            Object object = this.taskReadyLock;
            synchronized (object) {
                ++this.taskReadySeq;
                this.taskReadyLock.notify();
            }
        }
    }

    void stopTask() {
        LOG.info("Sending interrupt to stop the worker thread");
        this.worker.interrupt();
    }

    public void start() {
        this.worker = new Thread(null, this, "SplitLogWorker-" + this.serverName);
        this.exitWorker = false;
        this.worker.start();
    }

    public void stop() {
        this.exitWorker = true;
        this.stopTask();
    }

    public static interface TaskExecutor {
        public Status exec(String var1, CancelableProgressable var2);

        public static enum Status {
            DONE,
            ERR,
            RESIGNED,
            PREEMPTED;

        }
    }

    class GetDataAsyncCallback
    implements AsyncCallback.DataCallback {
        private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);

        GetDataAsyncCallback() {
        }

        @Override
        public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
            SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
            if (rc != 0) {
                this.LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
                SplitLogWorker.this.getDataSetWatchFailure(path);
                return;
            }
            data = SplitLogWorker.this.watcher.getRecoverableZooKeeper().removeMetaData(data);
            SplitLogWorker.this.getDataSetWatchSuccess(path, data);
        }
    }
}

