/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.hbase.index;

import com.google.common.collect.Multimap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
import org.apache.phoenix.hbase.index.builder.IndexBuilder;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.hbase.index.wal.IndexedKeyValue;
import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
import org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter;

public class Indexer
extends BaseRegionObserver {
    private static final Log LOG = LogFactory.getLog(Indexer.class);
    protected IndexWriter writer;
    protected IndexBuildManager builder;
    public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
    public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion";
    private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
    private static KeyValue BATCH_MARKER = new KeyValue();
    private PerRegionIndexWriteCache failedIndexEdits = new PerRegionIndexWriteCache();
    private IndexWriter recoveryWriter;
    private boolean stopped;
    private boolean disabled;
    public static final String RecoveryFailurePolicyKeyForTesting = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
    public static final int INDEXING_SUPPORTED_MAJOR_VERSION = VersionUtil.encodeMaxPatchVersion(0, 94);
    public static final int INDEXING_SUPPORTED__MIN_MAJOR_VERSION = VersionUtil.encodeVersion("0.94.0");
    private static final int INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION = VersionUtil.encodeVersion("0.94.9");

    @Override
    public void start(CoprocessorEnvironment e) throws IOException {
        try {
            String errormsg;
            RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e;
            String serverName = env.getRegionServerServices().getServerName().getServerName();
            if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true) && (errormsg = Indexer.validateVersion(env.getHBaseVersion(), env.getConfiguration())) != null) {
                IOException ioe = new IOException(errormsg);
                env.getRegionServerServices().abort(errormsg, ioe);
                throw ioe;
            }
            this.builder = new IndexBuildManager(env);
            this.writer = new IndexWriter(env, serverName + "-index-writer");
            TrackingParallelWriterIndexCommitter recoveryCommmiter = new TrackingParallelWriterIndexCommitter();
            try {
                Class<IndexFailurePolicy> policyClass = env.getConfiguration().getClass("org.apache.hadoop.hbase.index.recovery.failurepolicy", StoreFailuresInCachePolicy.class, IndexFailurePolicy.class);
                IndexFailurePolicy policy = policyClass.getConstructor(PerRegionIndexWriteCache.class).newInstance(this.failedIndexEdits);
                LOG.debug("Setting up recovery writter with committer: " + recoveryCommmiter.getClass() + " and failure policy: " + policy.getClass());
                this.recoveryWriter = new IndexWriter(recoveryCommmiter, policy, env, serverName + "-recovery-writer");
            }
            catch (Exception ex) {
                throw new IOException("Could not instantiate recovery failure policy!", ex);
            }
        }
        catch (NoSuchMethodError ex) {
            this.disabled = true;
            super.start(e);
            LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex);
        }
    }

    @Override
    public void stop(CoprocessorEnvironment e) throws IOException {
        if (this.stopped) {
            return;
        }
        if (this.disabled) {
            super.stop(e);
            return;
        }
        this.stopped = true;
        String msg = "Indexer is being stopped";
        this.builder.stop(msg);
        this.writer.stop(msg);
        this.recoveryWriter.stop(msg);
    }

    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) throws IOException {
        if (this.disabled) {
            super.prePut(c, put, edit, durability);
            return;
        }
        this.preSingleUpdate(c, put, edit, durability);
    }

    @Override
    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
        if (this.disabled) {
            super.preDelete(e, delete, edit, durability);
            return;
        }
        this.preSingleUpdate(e, delete, edit, durability);
    }

    public void preSingleUpdate(ObserverContext<RegionCoprocessorEnvironment> c, Mutation put, WALEdit edit, Durability durability) throws IOException {
        edit.add(BATCH_MARKER);
    }

    @Override
    public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
        if (this.disabled) {
            super.preBatchMutate(c, miniBatchOp);
            return;
        }
        try {
            this.preBatchMutateWithExceptions(c, miniBatchOp);
            return;
        }
        catch (Throwable t) {
            IndexManagementUtil.rethrowIndexingException(t);
            throw new RuntimeException("Somehow didn't return an index update but also didn't propagate the failure to the client!");
        }
    }

    public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
        HashMap<ImmutableBytesPtr, MultiMutation> mutations = new HashMap<ImmutableBytesPtr, MultiMutation>();
        Durability defaultDurability = Durability.SYNC_WAL;
        if (c.getEnvironment().getRegion() != null) {
            defaultDurability = c.getEnvironment().getRegion().getTableDesc().getDurability();
            defaultDurability = defaultDurability == Durability.USE_DEFAULT ? Durability.SYNC_WAL : defaultDurability;
        }
        Durability durability = Durability.SKIP_WAL;
        for (int i = 0; i < miniBatchOp.size(); ++i) {
            ImmutableBytesPtr row;
            MultiMutation stored;
            Durability effectiveDurablity;
            Mutation m;
            KeyValue kv;
            WALEdit edit = miniBatchOp.getWalEdit(i);
            if (edit != null && (kv = edit.getKeyValues().get(0)) == BATCH_MARKER) {
                edit.getKeyValues().remove(0);
            }
            if (!this.builder.isEnabled(m = miniBatchOp.getOperation(i))) continue;
            Durability durability2 = effectiveDurablity = m.getDurability() == Durability.USE_DEFAULT ? defaultDurability : m.getDurability();
            if (effectiveDurablity.ordinal() > durability.ordinal()) {
                durability = effectiveDurablity;
            }
            if ((stored = (MultiMutation)mutations.get(row = new ImmutableBytesPtr(m.getRow()))) == null) {
                stored = new MultiMutation(row);
                mutations.put(row, stored);
            }
            stored.addAll(m);
        }
        if (mutations.entrySet().size() == 0) {
            return;
        }
        WALEdit edit = miniBatchOp.getWalEdit(0);
        Collection<Pair<Mutation, byte[]>> indexUpdates = this.builder.getIndexUpdate(miniBatchOp, mutations.values());
        this.doPre(indexUpdates, edit, durability);
    }

    private boolean doPre(Collection<Pair<Mutation, byte[]>> indexUpdates, WALEdit edit, Durability durability) throws IOException {
        if (indexUpdates == null || indexUpdates.size() == 0) {
            return false;
        }
        if (durability == Durability.SKIP_WAL) {
            try {
                this.writer.write(indexUpdates);
                return false;
            }
            catch (Throwable e) {
                LOG.error("Failed to update index with entries:" + indexUpdates, e);
                IndexManagementUtil.rethrowIndexingException(e);
            }
        }
        for (Pair<Mutation, byte[]> entry : indexUpdates) {
            edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst()));
        }
        return true;
    }

    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        if (this.disabled) {
            super.postPut(e, put, edit, durability);
            return;
        }
        this.doPost(edit, put, durability);
    }

    @Override
    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
        if (this.disabled) {
            super.postDelete(e, delete, edit, durability);
            return;
        }
        this.doPost(edit, delete, durability);
    }

    @Override
    public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
        if (this.disabled) {
            super.postBatchMutate(c, miniBatchOp);
            return;
        }
        this.builder.batchCompleted(miniBatchOp);
        Mutation mutation = miniBatchOp.getOperation(0);
        WALEdit edit = miniBatchOp.getWalEdit(0);
        this.doPost(edit, mutation, mutation.getDurability());
    }

    private void doPost(WALEdit edit, Mutation m, Durability durability) throws IOException {
        try {
            this.doPostWithExceptions(edit, m, durability);
            return;
        }
        catch (Throwable e) {
            IndexManagementUtil.rethrowIndexingException(e);
            throw new RuntimeException("Somehow didn't complete the index update, but didn't return succesfully either!");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doPostWithExceptions(WALEdit edit, Mutation m, Durability durability) throws Exception {
        if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m)) {
            return;
        }
        IndexedKeyValue ikv = this.getFirstIndexedKeyValue(edit);
        if (ikv == null) {
            return;
        }
        if (!ikv.getBatchFinished()) {
            Collection<Pair<Mutation, byte[]>> indexUpdates = this.extractIndexUpdate(edit);
            try {
                this.writer.writeAndKillYourselfOnFailure(indexUpdates);
            }
            finally {
                ikv.markBatchFinished();
            }
        }
    }

    private IndexedKeyValue getFirstIndexedKeyValue(WALEdit edit) {
        for (KeyValue kv : edit.getKeyValues()) {
            if (!(kv instanceof IndexedKeyValue)) continue;
            return (IndexedKeyValue)kv;
        }
        return null;
    }

    private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) {
        ArrayList<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>();
        for (KeyValue kv : edit.getKeyValues()) {
            if (!(kv instanceof IndexedKeyValue)) continue;
            IndexedKeyValue ikv = (IndexedKeyValue)kv;
            indexUpdates.add(new Pair<Mutation, byte[]>(ikv.getMutation(), ikv.getIndexTable()));
        }
        return indexUpdates;
    }

    @Override
    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
        Multimap<HTableInterfaceReference, Mutation> updates = this.failedIndexEdits.getEdits(c.getEnvironment().getRegion());
        if (this.disabled) {
            super.postOpen(c);
            return;
        }
        LOG.info("Found some outstanding index updates that didn't succeed during WAL replay - attempting to replay now.");
        if (updates == null || updates.size() == 0) {
            return;
        }
        try {
            this.writer.writeAndKillYourselfOnFailure(updates);
        }
        catch (IOException e) {
            LOG.error("Exception thrown instead of killing server during index writing", e);
        }
    }

    @Override
    public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
        if (this.disabled) {
            super.preWALRestore(env, info, logKey, logEdit);
            return;
        }
        Collection<Pair<Mutation, byte[]>> indexUpdates = this.extractIndexUpdate(logEdit);
        this.recoveryWriter.writeAndKillYourselfOnFailure(indexUpdates);
    }

    @Override
    public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s) throws IOException {
        return super.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s);
    }

    public IndexBuilder getBuilderForTesting() {
        return this.builder.getBuilderForTesting();
    }

    public static String validateVersion(String hbaseVersion, Configuration conf) {
        int encodedVersion = VersionUtil.encodeVersion(hbaseVersion);
        if (encodedVersion > INDEXING_SUPPORTED_MAJOR_VERSION) {
            return null;
        }
        if (encodedVersion < INDEXING_SUPPORTED__MIN_MAJOR_VERSION) {
            return "Indexing not supported for versions older than 0.94.X";
        }
        if (encodedVersion < INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION && conf.getBoolean("hbase.regionserver.wal.enablecompression", false)) {
            return "Indexing not supported with WAL Compression for versions of HBase older than 0.94.9 - found version:" + hbaseVersion;
        }
        return null;
    }

    public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBuilder> builder, Map<String, String> properties) throws IOException {
        if (properties == null) {
            properties = new HashMap<String, String>();
        }
        properties.put(INDEX_BUILDER_CONF_KEY, builder.getName());
        desc.addCoprocessor(Indexer.class.getName(), null, 0x3FFFFFFF, properties);
    }

    private class MultiMutation
    extends Mutation {
        private ImmutableBytesPtr rowKey;

        public MultiMutation(ImmutableBytesPtr rowkey) {
            this.rowKey = rowkey;
        }

        public void addAll(Mutation stored) {
            for (Map.Entry entry : stored.getFamilyCellMap().entrySet()) {
                byte[] family = (byte[])entry.getKey();
                List<Cell> list = this.getKeyValueList(family, ((List)entry.getValue()).size());
                list.addAll((Collection)entry.getValue());
                this.familyMap.put(family, list);
            }
            for (Map.Entry<Object, Object> entry : stored.getAttributesMap().entrySet()) {
                if (this.getAttribute((String)entry.getKey()) != null) continue;
                this.setAttribute((String)entry.getKey(), (byte[])entry.getValue());
            }
        }

        private List<Cell> getKeyValueList(byte[] family, int hint) {
            ArrayList list = (ArrayList)this.familyMap.get(family);
            if (list == null) {
                list = new ArrayList(hint);
            }
            return list;
        }

        @Override
        public byte[] getRow() {
            return this.rowKey.copyBytesIfNecessary();
        }

        public int hashCode() {
            return this.rowKey.hashCode();
        }

        public boolean equals(Object o) {
            return o == null ? false : o.hashCode() == this.hashCode();
        }
    }
}

