/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.coprocessor;

import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint;
import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={MediumTests.class})
public class TestCoprocessorEndpoint {
    private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class);
    private static final TableName TEST_TABLE = TableName.valueOf("TestTable");
    private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
    private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
    private static byte[] ROW = Bytes.toBytes("testRow");
    private static final int ROWSIZE = 20;
    private static final int rowSeperator1 = 5;
    private static final int rowSeperator2 = 12;
    private static byte[][] ROWS = TestCoprocessorEndpoint.makeN(ROW, 20);
    private static HBaseTestingUtility util = new HBaseTestingUtility();

    @BeforeClass
    public static void setupBeforeClass() throws Exception {
        Configuration conf = util.getConfiguration();
        conf.setStrings("hbase.coprocessor.region.classes", ColumnAggregationEndpoint.class.getName(), ProtobufCoprocessorService.class.getName());
        conf.setStrings("hbase.coprocessor.master.classes", ProtobufCoprocessorService.class.getName());
        util.startMiniCluster(2);
        HBaseAdmin admin = new HBaseAdmin(conf);
        HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
        desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
        admin.createTable(desc, new byte[][]{ROWS[5], ROWS[12]});
        util.waitUntilAllRegionsAssigned(TEST_TABLE);
        admin.close();
        HTable table = new HTable(conf, TEST_TABLE);
        for (int i = 0; i < 20; ++i) {
            Put put = new Put(ROWS[i]);
            put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
            table.put(put);
        }
        table.close();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        util.shutdownMiniCluster();
    }

    private Map<byte[], Long> sum(HTable table, final byte[] family, final byte[] qualifier, byte[] start, byte[] end) throws ServiceException, Throwable {
        return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class, start, end, new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>(){

            @Override
            public Long call(ColumnAggregationProtos.ColumnAggregationService instance) throws IOException {
                BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback = new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
                ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest.newBuilder();
                builder.setFamily(HBaseZeroCopyByteString.wrap(family));
                if (qualifier != null && qualifier.length > 0) {
                    builder.setQualifier(HBaseZeroCopyByteString.wrap(qualifier));
                }
                instance.sum(null, builder.build(), rpcCallback);
                return rpcCallback.get().getSum();
            }
        });
    }

    @Test
    public void testAggregation() throws Throwable {
        HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
        Map<byte[], Long> results = this.sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
        int sumResult = 0;
        int expectedResult = 0;
        for (Map.Entry<byte[], Long> e : results.entrySet()) {
            LOG.info("Got value " + e.getValue() + " for region " + Bytes.toStringBinary(e.getKey()));
            sumResult = (int)((long)sumResult + e.getValue());
        }
        for (int i = 0; i < 20; ++i) {
            expectedResult += i;
        }
        Assert.assertEquals((String)"Invalid result", (long)expectedResult, (long)sumResult);
        results.clear();
        results = this.sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[5], ROWS[ROWS.length - 1]);
        sumResult = 0;
        expectedResult = 0;
        for (Map.Entry<byte[], Long> e : results.entrySet()) {
            LOG.info("Got value " + e.getValue() + " for region " + Bytes.toStringBinary(e.getKey()));
            sumResult = (int)((long)sumResult + e.getValue());
        }
        for (int i = 5; i < 20; ++i) {
            expectedResult += i;
        }
        Assert.assertEquals((String)"Invalid result", (long)expectedResult, (long)sumResult);
        table.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCoprocessorService() throws Throwable {
        HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
        NavigableMap<HRegionInfo, ServerName> regions = table.getRegionLocations();
        final TestProtos.EchoRequestProto request = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
        final Map results = Collections.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));
        try {
            final ServerRpcController controller = new ServerRpcController();
            table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, ROWS[0], ROWS[ROWS.length - 1], new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>(){

                @Override
                public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException {
                    LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
                    BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
                    instance.echo(controller, request, callback);
                    TestProtos.EchoResponseProto response = callback.get();
                    LOG.debug("Batch.Call returning result " + response);
                    return response;
                }
            }, new Batch.Callback<TestProtos.EchoResponseProto>(){

                @Override
                public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
                    Assert.assertNotNull((Object)result);
                    Assert.assertEquals((Object)"hello", (Object)result.getMessage());
                    results.put(region, result.getMessage());
                }
            });
            for (Map.Entry entry : results.entrySet()) {
                LOG.info("Got value " + (String)entry.getValue() + " for region " + Bytes.toStringBinary(entry.getKey()));
            }
            Assert.assertEquals((long)3L, (long)results.size());
            for (HRegionInfo hRegionInfo : regions.navigableKeySet()) {
                LOG.info("Region info is " + hRegionInfo.getRegionNameAsString());
                Assert.assertTrue((boolean)results.containsKey(hRegionInfo.getRegionName()));
            }
            results.clear();
            table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, ROWS[5], ROWS[ROWS.length - 1], new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>(){

                @Override
                public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException {
                    LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
                    BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
                    instance.echo(controller, request, callback);
                    TestProtos.EchoResponseProto response = callback.get();
                    LOG.debug("Batch.Call returning result " + response);
                    return response;
                }
            }, new Batch.Callback<TestProtos.EchoResponseProto>(){

                @Override
                public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
                    Assert.assertNotNull((Object)result);
                    Assert.assertEquals((Object)"hello", (Object)result.getMessage());
                    results.put(region, result.getMessage());
                }
            });
            for (Map.Entry entry : results.entrySet()) {
                LOG.info("Got value " + (String)entry.getValue() + " for region " + Bytes.toStringBinary((byte[])entry.getKey()));
            }
            Assert.assertEquals((long)2L, (long)results.size());
        }
        finally {
            table.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCoprocessorServiceNullResponse() throws Throwable {
        HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
        NavigableMap<HRegionInfo, ServerName> regions = table.getRegionLocations();
        final TestProtos.EchoRequestProto request = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
        try {
            final ServerRpcController controller = new ServerRpcController();
            Map<byte[], String> results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, ROWS[0], ROWS[ROWS.length - 1], new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>(){

                @Override
                public String call(TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException {
                    BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
                    instance.echo(controller, request, callback);
                    TestProtos.EchoResponseProto response = callback.get();
                    LOG.debug("Batch.Call got result " + response);
                    return null;
                }
            });
            for (Map.Entry<byte[], String> e : results.entrySet()) {
                LOG.info("Got value " + e.getValue() + " for region " + Bytes.toStringBinary(e.getKey()));
            }
            Assert.assertEquals((long)3L, (long)results.size());
            for (HRegionInfo info : regions.navigableKeySet()) {
                LOG.info("Region info is " + info.getRegionNameAsString());
                Assert.assertTrue((boolean)results.containsKey(info.getRegionName()));
                Assert.assertNull((Object)results.get(info.getRegionName()));
            }
        }
        finally {
            table.close();
        }
    }

    @Test
    public void testMasterCoprocessorService() throws Throwable {
        HBaseAdmin admin = util.getHBaseAdmin();
        TestProtos.EchoRequestProto request = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
        TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
        Assert.assertEquals((Object)"hello", (Object)service.echo(null, request).getMessage());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCoprocessorError() throws Exception {
        Configuration configuration = new Configuration(util.getConfiguration());
        configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
        HTable table = new HTable(configuration, TEST_TABLE);
        try {
            CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol);
            service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
            Assert.fail((String)"Should have thrown an exception");
        }
        catch (ServiceException serviceException) {
        }
        finally {
            table.close();
        }
    }

    @Test
    public void testMasterCoprocessorError() throws Throwable {
        HBaseAdmin admin = util.getHBaseAdmin();
        TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
        try {
            service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
            Assert.fail((String)"Should have thrown an exception");
        }
        catch (ServiceException serviceException) {
            // empty catch block
        }
    }

    private static byte[][] makeN(byte[] base, int n) {
        byte[][] ret = new byte[n][];
        for (int i = 0; i < n; ++i) {
            ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
        }
        return ret;
    }
}

