/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.cache;

import com.google.common.collect.ImmutableSet;
import com.google.protobuf.HBaseZeroCopyByteString;
import java.io.Closeable;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.coprocessor.ServerCachingProtocol;
import org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SQLCloseables;

public class ServerCacheClient {
    public static final int UUID_LENGTH = 8;
    private static final Log LOG = LogFactory.getLog(ServerCacheClient.class);
    private static final Random RANDOM = new Random();
    private final PhoenixConnection connection;
    private final Map<Integer, TableRef> cacheUsingTableRefMap = new HashMap<Integer, TableRef>();

    public ServerCacheClient(PhoenixConnection connection) {
        this.connection = connection;
    }

    public PhoenixConnection getConnection() {
        return this.connection;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final ServerCachingProtocol.ServerCacheFactory cacheFactory, TableRef cacheUsingTableRef) throws SQLException {
        byte[] cacheId;
        ServerCache hashCacheSpec;
        block56: {
            ConnectionQueryServices services = this.connection.getQueryServices();
            MemoryManager.MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength());
            ArrayList<Closeable> closeables = new ArrayList<Closeable>();
            closeables.add(chunk);
            hashCacheSpec = null;
            SQLException firstException = null;
            cacheId = ServerCacheClient.generateId();
            boolean success = false;
            ExecutorService executor = services.getExecutor();
            List<Future> futures = Collections.emptyList();
            try {
                List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
                int nRegions = locations.size();
                futures = new ArrayList<Future>(nRegions);
                HashSet<HRegionLocation> servers = new HashSet<HRegionLocation>(nRegions);
                for (HRegionLocation entry : locations) {
                    if (!servers.contains(entry) && keyRanges.intersect(entry.getRegionInfo().getStartKey(), entry.getRegionInfo().getEndKey())) {
                        servers.add(entry);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Adding cache entry to be sent for " + entry);
                        }
                        final byte[] key = entry.getRegionInfo().getStartKey();
                        final HTableInterface htable = services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
                        closeables.add(htable);
                        futures.add(executor.submit(new JobManager.JobCallable<Boolean>(){

                            @Override
                            public Boolean call() throws Exception {
                                Map<byte[], ServerCachingProtos.AddServerCacheResponse> results;
                                try {
                                    results = htable.coprocessorService(ServerCachingProtos.ServerCachingService.class, key, key, new Batch.Call<ServerCachingProtos.ServerCachingService, ServerCachingProtos.AddServerCacheResponse>(){

                                        @Override
                                        public ServerCachingProtos.AddServerCacheResponse call(ServerCachingProtos.ServerCachingService instance) throws IOException {
                                            ServerRpcController controller = new ServerRpcController();
                                            BlockingRpcCallback<ServerCachingProtos.AddServerCacheResponse> rpcCallback = new BlockingRpcCallback<ServerCachingProtos.AddServerCacheResponse>();
                                            ServerCachingProtos.AddServerCacheRequest.Builder builder = ServerCachingProtos.AddServerCacheRequest.newBuilder();
                                            if (ServerCacheClient.this.connection.getTenantId() != null) {
                                                builder.setTenantId(HBaseZeroCopyByteString.wrap(ServerCacheClient.this.connection.getTenantId().getBytes()));
                                            }
                                            builder.setCacheId(HBaseZeroCopyByteString.wrap(cacheId));
                                            builder.setCachePtr(ProtobufUtil.toProto(cachePtr));
                                            ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder();
                                            svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
                                            builder.setCacheFactory(svrCacheFactoryBuider.build());
                                            instance.addServerCache(controller, builder.build(), rpcCallback);
                                            if (controller.getFailedOn() != null) {
                                                throw controller.getFailedOn();
                                            }
                                            return rpcCallback.get();
                                        }
                                    });
                                }
                                catch (Throwable t) {
                                    throw new Exception(t);
                                }
                                if (results != null && results.size() == 1) {
                                    return results.values().iterator().next().getReturn();
                                }
                                return false;
                            }

                            @Override
                            public Object getJobId() {
                                return ServerCacheClient.this;
                            }
                        }));
                        continue;
                    }
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry");
                }
                hashCacheSpec = new ServerCache(cacheId, servers, cachePtr.getLength());
                int timeoutMs = services.getProps().getInt("phoenix.query.timeoutMs", 600000);
                for (Future future : futures) {
                    future.get(timeoutMs, TimeUnit.MILLISECONDS);
                }
                this.cacheUsingTableRefMap.put(Bytes.mapKey(cacheId), cacheUsingTableRef);
                success = true;
            }
            catch (SQLException e) {
                firstException = e;
                return firstException;
            }
            catch (Exception e) {
                firstException = new SQLException(e);
                return firstException;
            }
            finally {
                try {
                    if (success) {
                    }
                    SQLCloseables.closeAllQuietly(Collections.singletonList(hashCacheSpec));
                    for (Future future : futures) {
                        future.cancel(true);
                    }
                }
                finally {
                    try {
                        Closeables.closeAll(closeables);
                    }
                    catch (IOException e) {
                        if (firstException == null) {
                            firstException = new SQLException(e);
                            return firstException;
                        }
                    }
                    finally {
                        if (firstException == null) break block56;
                        throw firstException;
                    }
                }
            }
        }
        if (!LOG.isDebugEnabled()) return hashCacheSpec;
        LOG.debug("Cache " + cacheId + " successfully added to servers.");
        return hashCacheSpec;
    }

    private void removeServerCache(final byte[] cacheId, Set<HRegionLocation> servers) throws SQLException {
        ConnectionQueryServices services = this.connection.getQueryServices();
        Throwable lastThrowable = null;
        TableRef cacheUsingTableRef = this.cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
        byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes();
        HTableInterface iterateOverTable = services.getTable(tableName);
        List<HRegionLocation> locations = services.getAllTableRegions(tableName);
        HashSet<HRegionLocation> remainingOnServers = new HashSet<HRegionLocation>(servers);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Removing Cache " + cacheId + " from servers.");
        }
        for (HRegionLocation entry : locations) {
            if (!remainingOnServers.contains(entry)) continue;
            try {
                byte[] key = entry.getRegionInfo().getStartKey();
                iterateOverTable.coprocessorService(ServerCachingProtos.ServerCachingService.class, key, key, new Batch.Call<ServerCachingProtos.ServerCachingService, ServerCachingProtos.RemoveServerCacheResponse>(){

                    @Override
                    public ServerCachingProtos.RemoveServerCacheResponse call(ServerCachingProtos.ServerCachingService instance) throws IOException {
                        ServerRpcController controller = new ServerRpcController();
                        BlockingRpcCallback<ServerCachingProtos.RemoveServerCacheResponse> rpcCallback = new BlockingRpcCallback<ServerCachingProtos.RemoveServerCacheResponse>();
                        ServerCachingProtos.RemoveServerCacheRequest.Builder builder = ServerCachingProtos.RemoveServerCacheRequest.newBuilder();
                        if (ServerCacheClient.this.connection.getTenantId() != null) {
                            builder.setTenantId(HBaseZeroCopyByteString.wrap(ServerCacheClient.this.connection.getTenantId().getBytes()));
                        }
                        builder.setCacheId(HBaseZeroCopyByteString.wrap(cacheId));
                        instance.removeServerCache(controller, builder.build(), rpcCallback);
                        if (controller.getFailedOn() != null) {
                            throw controller.getFailedOn();
                        }
                        return rpcCallback.get();
                    }
                });
                remainingOnServers.remove(entry);
            }
            catch (Throwable t) {
                lastThrowable = t;
                LOG.error("Error trying to remove hash cache for " + entry, t);
            }
        }
        if (!remainingOnServers.isEmpty()) {
            LOG.warn("Unable to remove hash cache for " + remainingOnServers, lastThrowable);
        }
    }

    public static byte[] generateId() {
        long rand = RANDOM.nextLong();
        return Bytes.toBytes(rand);
    }

    public static String idToString(byte[] uuid) {
        assert (uuid.length == 8);
        return Long.toString(Bytes.toLong(uuid));
    }

    public class ServerCache
    implements SQLCloseable {
        private final int size;
        private final byte[] id;
        private final ImmutableSet<HRegionLocation> servers;

        public ServerCache(byte[] id, Set<HRegionLocation> servers, int size) {
            this.id = id;
            this.servers = ImmutableSet.copyOf(servers);
            this.size = size;
        }

        public int getSize() {
            return this.size;
        }

        public byte[] getId() {
            return this.id;
        }

        @Override
        public void close() throws SQLException {
            ServerCacheClient.this.removeServerCache(this.id, this.servers);
        }
    }
}

