/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HTableMultiplexer {
    private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
    private static int poolID = 0;
    static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms";
    private Map<TableName, HTable> tableNameToHTableMap;
    private Map<HRegionLocation, LinkedBlockingQueue<PutStatus>> serverToBufferQueueMap;
    private Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap;
    private Configuration conf;
    private int retryNum;
    private int perRegionServerBufferQueueSize;

    public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) throws ZooKeeperConnectionException {
        this.conf = conf;
        this.serverToBufferQueueMap = new ConcurrentHashMap<HRegionLocation, LinkedBlockingQueue<PutStatus>>();
        this.serverToFlushWorkerMap = new ConcurrentHashMap<HRegionLocation, HTableFlushWorker>();
        this.tableNameToHTableMap = new ConcurrentSkipListMap<TableName, HTable>();
        this.retryNum = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
        this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
    }

    public boolean put(TableName tableName, Put put) throws IOException {
        return this.put(tableName, put, this.retryNum);
    }

    public boolean put(byte[] tableName, Put put) throws IOException {
        return this.put(TableName.valueOf(tableName), put);
    }

    public List<Put> put(TableName tableName, List<Put> puts) throws IOException {
        if (puts == null) {
            return null;
        }
        ArrayList<Put> failedPuts = null;
        for (Put put : puts) {
            boolean result = this.put(tableName, put, this.retryNum);
            if (result) continue;
            if (failedPuts == null) {
                failedPuts = new ArrayList<Put>();
            }
            failedPuts.add(put);
        }
        return failedPuts;
    }

    public List<Put> put(byte[] tableName, List<Put> puts) throws IOException {
        return this.put(TableName.valueOf(tableName), puts);
    }

    public boolean put(TableName tableName, Put put, int retry) throws IOException {
        if (retry <= 0) {
            return false;
        }
        HTable htable = this.getHTable(tableName);
        try {
            htable.validatePut(put);
            HRegionLocation loc = htable.getRegionLocation(put.getRow(), false);
            if (loc != null) {
                LinkedBlockingQueue<PutStatus> queue = this.addNewRegionServer(loc, htable);
                PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
                return queue.offer(s);
            }
        }
        catch (Exception e) {
            LOG.debug("Cannot process the put " + put + " because of " + e);
        }
        return false;
    }

    public boolean put(byte[] tableName, Put put, int retry) throws IOException {
        return this.put(TableName.valueOf(tableName), put, retry);
    }

    public HTableMultiplexerStatus getHTableMultiplexerStatus() {
        return new HTableMultiplexerStatus(this.serverToFlushWorkerMap);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private HTable getHTable(TableName tableName) throws IOException {
        HTable htable = this.tableNameToHTableMap.get(tableName);
        if (htable == null) {
            Map<TableName, HTable> map = this.tableNameToHTableMap;
            synchronized (map) {
                htable = this.tableNameToHTableMap.get(tableName);
                if (htable == null) {
                    htable = new HTable(this.conf, tableName);
                    this.tableNameToHTableMap.put(tableName, htable);
                }
            }
        }
        return htable;
    }

    private synchronized LinkedBlockingQueue<PutStatus> addNewRegionServer(HRegionLocation addr, HTable htable) {
        LinkedBlockingQueue<PutStatus> queue = this.serverToBufferQueueMap.get(addr);
        if (queue == null) {
            queue = new LinkedBlockingQueue(this.perRegionServerBufferQueueSize);
            this.serverToBufferQueueMap.put(addr, queue);
            HTableFlushWorker worker = new HTableFlushWorker(this.conf, addr, this, queue, htable);
            this.serverToFlushWorkerMap.put(addr, worker);
            String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" + poolID++;
            Thread t = new Thread((Runnable)worker, name);
            t.setDaemon(true);
            t.start();
        }
        return queue;
    }

    private static class HTableFlushWorker
    implements Runnable {
        private HRegionLocation addr;
        private Configuration conf;
        private LinkedBlockingQueue<PutStatus> queue;
        private HTableMultiplexer htableMultiplexer;
        private AtomicLong totalFailedPutCount;
        private AtomicInteger currentProcessingPutCount;
        private AtomicAverageCounter averageLatency;
        private AtomicLong maxLatency;
        private HTable htable;

        public HTableFlushWorker(Configuration conf, HRegionLocation addr, HTableMultiplexer htableMultiplexer, LinkedBlockingQueue<PutStatus> queue, HTable htable) {
            this.addr = addr;
            this.conf = conf;
            this.htableMultiplexer = htableMultiplexer;
            this.queue = queue;
            this.totalFailedPutCount = new AtomicLong(0L);
            this.currentProcessingPutCount = new AtomicInteger(0);
            this.averageLatency = new AtomicAverageCounter();
            this.maxLatency = new AtomicLong(0L);
            this.htable = htable;
        }

        public long getTotalFailedCount() {
            return this.totalFailedPutCount.get();
        }

        public long getTotalBufferedCount() {
            return this.queue.size() + this.currentProcessingPutCount.get();
        }

        public AtomicAverageCounter getAverageLatencyCounter() {
            return this.averageLatency;
        }

        public long getMaxLatency() {
            return this.maxLatency.getAndSet(0L);
        }

        private boolean resubmitFailedPut(PutStatus failedPutStatus, HRegionLocation oldLoc) throws IOException {
            Put failedPut = failedPutStatus.getPut();
            TableName tableName = failedPutStatus.getRegionInfo().getTable();
            int retryCount = failedPutStatus.getRetryCount() - 1;
            if (retryCount <= 0) {
                return false;
            }
            return this.htableMultiplexer.put(tableName, failedPut, retryCount);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        @SuppressWarnings(value={"REC_CATCH_EXCEPTION"}, justification="na")
        public void run() {
            ArrayList processingList = new ArrayList();
            long frequency = this.conf.getLong(HTableMultiplexer.TABLE_MULTIPLEXER_FLUSH_FREQ_MS, 100L);
            try {
                Thread.sleep(frequency);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            int failedCount = 0;
            while (true) {
                try {
                    while (true) {
                        long elapsed;
                        long start = elapsed = EnvironmentEdgeManager.currentTimeMillis();
                        processingList.clear();
                        failedCount = 0;
                        this.queue.drainTo(processingList);
                        this.currentProcessingPutCount.set(processingList.size());
                        if (processingList.size() > 0) {
                            ArrayList<Put> list = new ArrayList<Put>(processingList.size());
                            for (PutStatus putStatus : processingList) {
                                list.add(putStatus.getPut());
                            }
                            ArrayList<Put> failed = null;
                            Object[] results = new Object[list.size()];
                            try {
                                this.htable.batch(list, results);
                            }
                            catch (IOException e) {
                                LOG.debug("Caught some exceptions " + e + " when flushing puts to region server " + this.addr.getHostnamePort());
                            }
                            finally {
                                for (int i = results.length - 1; i >= 0; --i) {
                                    if (!(results[i] instanceof Result)) continue;
                                    list.remove(i);
                                }
                                failed = list;
                            }
                            if (failed != null) {
                                if (failed.size() == processingList.size()) {
                                    for (PutStatus putStatus : processingList) {
                                        if (this.resubmitFailedPut(putStatus, this.addr)) continue;
                                        ++failedCount;
                                    }
                                } else {
                                    HashSet failedPutSet = new HashSet(failed);
                                    for (PutStatus putStatus : processingList) {
                                        if (!failedPutSet.contains(putStatus.getPut()) || this.resubmitFailedPut(putStatus, this.addr)) continue;
                                        ++failedCount;
                                    }
                                }
                            }
                            this.totalFailedPutCount.addAndGet(failedCount);
                            elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
                            this.averageLatency.add(elapsed);
                            if (elapsed > this.maxLatency.get()) {
                                this.maxLatency.set(elapsed);
                            }
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Processed " + this.currentProcessingPutCount + " put requests for " + this.addr.getHostnamePort() + " and " + failedCount + " failed" + ", latency for this send: " + elapsed);
                            }
                            this.currentProcessingPutCount.set(0);
                        }
                        if (elapsed == start) {
                            elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
                        }
                        if (elapsed >= frequency) continue;
                        Thread.sleep(frequency - elapsed);
                    }
                }
                catch (Exception e) {
                    LOG.debug("Caught some exceptions " + e + " when flushing puts to region server " + this.addr.getHostnamePort());
                    continue;
                }
                break;
            }
        }
    }

    private static class AtomicAverageCounter {
        private long sum = 0L;
        private int count = 0;

        public synchronized long getAndReset() {
            long result = this.get();
            this.reset();
            return result;
        }

        public synchronized long get() {
            if (this.count == 0) {
                return 0L;
            }
            return this.sum / (long)this.count;
        }

        public synchronized AbstractMap.SimpleEntry<Long, Integer> getComponents() {
            return new AbstractMap.SimpleEntry<Long, Integer>(this.sum, this.count);
        }

        public synchronized void reset() {
            this.sum = 0L;
            this.count = 0;
        }

        public synchronized void add(long value) {
            this.sum += value;
            ++this.count;
        }
    }

    private static class PutStatus {
        private final HRegionInfo regionInfo;
        private final Put put;
        private final int retryCount;

        public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) {
            this.regionInfo = regionInfo;
            this.put = put;
            this.retryCount = retryCount;
        }

        public HRegionInfo getRegionInfo() {
            return this.regionInfo;
        }

        public Put getPut() {
            return this.put;
        }

        public int getRetryCount() {
            return this.retryCount;
        }
    }

    static class HTableMultiplexerStatus {
        private long totalFailedPutCounter = 0L;
        private long totalBufferedPutCounter = 0L;
        private long maxLatency = 0L;
        private long overallAverageLatency = 0L;
        private Map<String, Long> serverToFailedCounterMap;
        private Map<String, Long> serverToBufferedCounterMap = new HashMap<String, Long>();
        private Map<String, Long> serverToAverageLatencyMap;
        private Map<String, Long> serverToMaxLatencyMap;

        public HTableMultiplexerStatus(Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
            this.serverToFailedCounterMap = new HashMap<String, Long>();
            this.serverToAverageLatencyMap = new HashMap<String, Long>();
            this.serverToMaxLatencyMap = new HashMap<String, Long>();
            this.initialize(serverToFlushWorkerMap);
        }

        private void initialize(Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
            if (serverToFlushWorkerMap == null) {
                return;
            }
            long averageCalcSum = 0L;
            int averageCalcCount = 0;
            for (Map.Entry<HRegionLocation, HTableFlushWorker> entry : serverToFlushWorkerMap.entrySet()) {
                HRegionLocation addr = entry.getKey();
                HTableFlushWorker worker = entry.getValue();
                long bufferedCounter = worker.getTotalBufferedCount();
                long failedCounter = worker.getTotalFailedCount();
                long serverMaxLatency = worker.getMaxLatency();
                AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
                AbstractMap.SimpleEntry<Long, Integer> averageComponents = averageCounter.getComponents();
                long serverAvgLatency = averageCounter.getAndReset();
                this.totalBufferedPutCounter += bufferedCounter;
                this.totalFailedPutCounter += failedCounter;
                if (serverMaxLatency > this.maxLatency) {
                    this.maxLatency = serverMaxLatency;
                }
                averageCalcSum += averageComponents.getKey().longValue();
                averageCalcCount += averageComponents.getValue().intValue();
                this.serverToBufferedCounterMap.put(addr.getHostnamePort(), bufferedCounter);
                this.serverToFailedCounterMap.put(addr.getHostnamePort(), failedCounter);
                this.serverToAverageLatencyMap.put(addr.getHostnamePort(), serverAvgLatency);
                this.serverToMaxLatencyMap.put(addr.getHostnamePort(), serverMaxLatency);
            }
            this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum / (long)averageCalcCount : 0L;
        }

        public long getTotalBufferedCounter() {
            return this.totalBufferedPutCounter;
        }

        public long getTotalFailedCounter() {
            return this.totalFailedPutCounter;
        }

        public long getMaxLatency() {
            return this.maxLatency;
        }

        public long getOverallAverageLatency() {
            return this.overallAverageLatency;
        }

        public Map<String, Long> getBufferedCounterForEachRegionServer() {
            return this.serverToBufferedCounterMap;
        }

        public Map<String, Long> getFailedCounterForEachRegionServer() {
            return this.serverToFailedCounterMap;
        }

        public Map<String, Long> getMaxLatencyForEachRegionServer() {
            return this.serverToMaxLatencyMap;
        }

        public Map<String, Long> getAverageLatencyForEachRegionServer() {
            return this.serverToAverageLatencyMap;
        }
    }
}

