/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.flume.sink;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.phoenix.flume.FlumeConstants;
import org.apache.phoenix.flume.serializer.EventSerializer;
import org.apache.phoenix.flume.serializer.EventSerializers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PhoenixSink
extends AbstractSink
implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(PhoenixSink.class);
    private static AtomicInteger counter = new AtomicInteger();
    private static final String NAME = "Phoenix Sink__";
    private SinkCounter sinkCounter;
    private Integer batchSize;
    private EventSerializer serializer;

    public void configure(Context context) {
        this.setName(NAME + counter.incrementAndGet());
        this.batchSize = context.getInteger("batchSize", FlumeConstants.DEFAULT_BATCH_SIZE);
        String eventSerializerType = context.getString("serializer");
        Preconditions.checkNotNull(eventSerializerType, "Event serializer cannot be empty, please specify in the configuration file");
        this.initializeSerializer(context, eventSerializerType);
        this.sinkCounter = new SinkCounter(this.getName());
    }

    private void initializeSerializer(Context context, String eventSerializerType) {
        EventSerializers eventSerializer = null;
        try {
            eventSerializer = EventSerializers.valueOf(eventSerializerType.toUpperCase());
        }
        catch (IllegalArgumentException iae) {
            logger.error("An invalid eventSerializer {} was passed. Please specify one of {} ", (Object)eventSerializerType, (Object)Joiner.on(",").skipNulls().join((Object[])EventSerializers.values()));
            Throwables.propagate(iae);
        }
        Context serializerContext = new Context();
        serializerContext.putAll((Map)context.getSubProperties("serializer."));
        this.copyPropertiesToSerializerContext(context, serializerContext);
        try {
            Class<?> clazz = Class.forName(eventSerializer.getClassName());
            this.serializer = (EventSerializer)clazz.newInstance();
            this.serializer.configure(serializerContext);
        }
        catch (Exception e) {
            logger.error("Could not instantiate event serializer.", e);
            Throwables.propagate(e);
        }
    }

    private void copyPropertiesToSerializerContext(Context context, Context serializerContext) {
        serializerContext.put("ddl", context.getString("ddl"));
        serializerContext.put("table", context.getString("table"));
        serializerContext.put("zookeeperQuorum", context.getString("zookeeperQuorum"));
        serializerContext.put("jdbcUrl", context.getString("jdbcUrl"));
        serializerContext.put("batchSize", context.getString("batchSize"));
    }

    public void start() {
        logger.info("Starting sink {} ", (Object)this.getName());
        this.sinkCounter.start();
        try {
            this.serializer.initialize();
            this.sinkCounter.incrementConnectionCreatedCount();
        }
        catch (Exception ex) {
            this.sinkCounter.incrementConnectionFailedCount();
            logger.error("Error {} in initializing the serializer.", (Object)ex.getMessage());
            Throwables.propagate(ex);
        }
        super.start();
    }

    public void stop() {
        super.stop();
        try {
            this.serializer.close();
        }
        catch (SQLException e) {
            logger.error(" Error while closing connection {} for sink {} ", (Object)e.getMessage(), (Object)this.getName());
        }
        this.sinkCounter.incrementConnectionClosedCount();
        this.sinkCounter.stop();
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public Sink.Status process() throws EventDeliveryException {
        Sink.Status status = Sink.Status.READY;
        Channel channel = this.getChannel();
        Transaction transaction = null;
        ArrayList<Event> events = Lists.newArrayListWithExpectedSize(this.batchSize);
        Stopwatch watch = new Stopwatch().start();
        try {
            transaction = channel.getTransaction();
            transaction.begin();
            for (long i = 0L; i < (long)this.batchSize.intValue(); ++i) {
                Event event = channel.take();
                if (event == null) {
                    status = Sink.Status.BACKOFF;
                    if (i == 0L) {
                        this.sinkCounter.incrementBatchEmptyCount();
                        break;
                    }
                    this.sinkCounter.incrementBatchUnderflowCount();
                    break;
                }
                events.add(event);
            }
            if (!events.isEmpty()) {
                if (events.size() == this.batchSize.intValue()) {
                    this.sinkCounter.incrementBatchCompleteCount();
                } else {
                    this.sinkCounter.incrementBatchUnderflowCount();
                    status = Sink.Status.BACKOFF;
                }
                this.serializer.upsertEvents(events);
                this.sinkCounter.addToEventDrainSuccessCount((long)events.size());
            } else {
                logger.debug("no events to process ");
                this.sinkCounter.incrementBatchEmptyCount();
                status = Sink.Status.BACKOFF;
            }
            transaction.commit();
        }
        catch (ChannelException e) {
            transaction.rollback();
            status = Sink.Status.BACKOFF;
            this.sinkCounter.incrementConnectionFailedCount();
            {
                catch (Throwable throwable) {
                    logger.error(String.format("Time taken to process [%s] events was [%s] seconds", events.size(), watch.stop().elapsedTime(TimeUnit.SECONDS)));
                    if (transaction == null) throw throwable;
                    transaction.close();
                    throw throwable;
                }
            }
            logger.error(String.format("Time taken to process [%s] events was [%s] seconds", events.size(), watch.stop().elapsedTime(TimeUnit.SECONDS)));
            if (transaction == null) return status;
            transaction.close();
            return status;
            catch (SQLException e2) {
                this.sinkCounter.incrementConnectionFailedCount();
                transaction.rollback();
                logger.error("exception while persisting to Hbase ", e2);
                throw new EventDeliveryException("Failed to persist message to Hbase", (Throwable)e2);
                catch (Throwable e3) {
                    transaction.rollback();
                    logger.error("exception while processing in Phoenix Sink", e3);
                    throw new EventDeliveryException("Failed to persist message", e3);
                }
            }
        }
        logger.error(String.format("Time taken to process [%s] events was [%s] seconds", events.size(), watch.stop().elapsedTime(TimeUnit.SECONDS)));
        if (transaction == null) return status;
        transaction.close();
        return status;
    }
}

