/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SimpleRpcScheduler
implements RpcScheduler {
    private int port;
    private final int handlerCount;
    private final int priorityHandlerCount;
    private final int replicationHandlerCount;
    private final PriorityFunction priority;
    final BlockingQueue<CallRunner> callQueue;
    final BlockingQueue<CallRunner> priorityCallQueue;
    final BlockingQueue<CallRunner> replicationQueue;
    private volatile boolean running = false;
    private final List<Thread> handlers = Lists.newArrayList();
    private final int highPriorityLevel;

    public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) {
        int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length", handlerCount * 10);
        this.handlerCount = handlerCount;
        this.priorityHandlerCount = priorityHandlerCount;
        this.replicationHandlerCount = replicationHandlerCount;
        this.priority = priority;
        this.highPriorityLevel = highPriorityLevel;
        this.callQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength);
        this.priorityCallQueue = priorityHandlerCount > 0 ? new LinkedBlockingQueue(maxQueueLength) : null;
        this.replicationQueue = replicationHandlerCount > 0 ? new LinkedBlockingQueue(maxQueueLength) : null;
    }

    @Override
    public void init(RpcScheduler.Context context) {
        this.port = context.getListenerAddress().getPort();
    }

    @Override
    public void start() {
        this.running = true;
        this.startHandlers(this.handlerCount, this.callQueue, null);
        if (this.priorityCallQueue != null) {
            this.startHandlers(this.priorityHandlerCount, this.priorityCallQueue, "Priority.");
        }
        if (this.replicationQueue != null) {
            this.startHandlers(this.replicationHandlerCount, this.replicationQueue, "Replication.");
        }
    }

    private void startHandlers(int handlerCount, final BlockingQueue<CallRunner> callQueue, String threadNamePrefix) {
        for (int i = 0; i < handlerCount; ++i) {
            Thread t = new Thread(new Runnable(){

                @Override
                public void run() {
                    SimpleRpcScheduler.this.consumerLoop(callQueue);
                }
            });
            t.setDaemon(true);
            t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port=" + this.port);
            t.start();
            this.handlers.add(t);
        }
    }

    @Override
    public void stop() {
        this.running = false;
        for (Thread handler : this.handlers) {
            handler.interrupt();
        }
    }

    @Override
    public void dispatch(CallRunner callTask) throws InterruptedException {
        RpcServer.Call call = callTask.getCall();
        int level = this.priority.getPriority(call.header, call.param);
        if (this.priorityCallQueue != null && level > this.highPriorityLevel) {
            this.priorityCallQueue.put(callTask);
        } else if (this.replicationQueue != null && level == 5) {
            this.replicationQueue.put(callTask);
        } else {
            this.callQueue.put(callTask);
        }
    }

    @Override
    public int getGeneralQueueLength() {
        return this.callQueue.size();
    }

    @Override
    public int getPriorityQueueLength() {
        return this.priorityCallQueue == null ? 0 : this.priorityCallQueue.size();
    }

    @Override
    public int getReplicationQueueLength() {
        return this.replicationQueue == null ? 0 : this.replicationQueue.size();
    }

    private void consumerLoop(BlockingQueue<CallRunner> myQueue) {
        while (this.running) {
            try {
                CallRunner task = myQueue.take();
                task.run();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

