/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;

public class ReplicationPeersZKImpl
extends ReplicationStateZKBase
implements ReplicationPeers {
    private Map<String, ReplicationPeer> peerClusters;
    private final String tableCFsNodeName;
    private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);

    public ReplicationPeersZKImpl(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
        super(zk, conf, abortable);
        this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
        this.peerClusters = new HashMap<String, ReplicationPeer>();
    }

    @Override
    public void init() throws ReplicationException {
        try {
            if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
                ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
            }
        }
        catch (KeeperException e) {
            throw new ReplicationException("Could not initialize replication peers", e);
        }
        this.connectExistingPeers();
    }

    @Override
    public void addPeer(String id, String clusterKey) throws ReplicationException {
        this.addPeer(id, clusterKey, null);
    }

    @Override
    public void addPeer(String id, String clusterKey, String tableCFs) throws ReplicationException {
        try {
            if (this.peerExists(id)) {
                throw new IllegalArgumentException("Cannot add a peer with id=" + id + " because that id already exists.");
            }
            ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
            ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id), ReplicationPeersZKImpl.toByteArray(clusterKey));
            ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, this.getPeerStateNode(id), ENABLED_ZNODE_BYTES);
            String tableCFsStr = tableCFs == null ? "" : tableCFs;
            ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, this.getTableCFsNode(id), Bytes.toBytes(tableCFsStr));
        }
        catch (KeeperException e) {
            throw new ReplicationException("Could not add peer with id=" + id + ", clusterKey=" + clusterKey, e);
        }
    }

    @Override
    public void removePeer(String id) throws ReplicationException {
        try {
            if (!this.peerExists(id)) {
                throw new IllegalArgumentException("Cannot remove peer with id=" + id + " because that id does not exist.");
            }
            ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
        }
        catch (KeeperException e) {
            throw new ReplicationException("Could not remove peer with id=" + id, e);
        }
    }

    @Override
    public void enablePeer(String id) throws ReplicationException {
        this.changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
        LOG.info("peer " + id + " is enabled");
    }

    @Override
    public void disablePeer(String id) throws ReplicationException {
        this.changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
        LOG.info("peer " + id + " is disabled");
    }

    @Override
    public String getPeerTableCFsConfig(String id) throws ReplicationException {
        try {
            if (!this.peerExists(id)) {
                throw new IllegalArgumentException("peer " + id + " doesn't exist");
            }
            try {
                return Bytes.toString(ZKUtil.getData(this.zookeeper, this.getTableCFsNode(id)));
            }
            catch (Exception e) {
                throw new ReplicationException(e);
            }
        }
        catch (KeeperException e) {
            throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
        }
    }

    @Override
    public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException {
        try {
            if (!this.peerExists(id)) {
                throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id + " does not exist.");
            }
            String tableCFsZKNode = this.getTableCFsNode(id);
            byte[] tableCFs = Bytes.toBytes(tableCFsStr);
            if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) {
                ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs);
            } else {
                ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs);
            }
            LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr);
        }
        catch (KeeperException e) {
            throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
        }
    }

    @Override
    public Map<String, List<String>> getTableCFs(String id) throws IllegalArgumentException {
        if (!this.peerClusters.containsKey(id)) {
            throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
        }
        return this.peerClusters.get(id).getTableCFs();
    }

    @Override
    public boolean getStatusOfConnectedPeer(String id) {
        if (!this.peerClusters.containsKey(id)) {
            throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
        }
        return this.peerClusters.get(id).getPeerEnabled().get();
    }

    @Override
    public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
        try {
            if (!this.peerExists(id)) {
                throw new IllegalArgumentException("peer " + id + " doesn't exist");
            }
            String peerStateZNode = this.getPeerStateNode(id);
            try {
                return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
            }
            catch (KeeperException e) {
                throw new ReplicationException(e);
            }
            catch (DeserializationException e) {
                throw new ReplicationException(e);
            }
        }
        catch (KeeperException e) {
            throw new ReplicationException("Unable to get status of the peer with id=" + id + " from backing store", e);
        }
    }

    @Override
    public boolean connectToPeer(String peerId) throws ReplicationException {
        if (this.peerClusters == null) {
            return false;
        }
        if (this.peerClusters.containsKey(peerId)) {
            return false;
        }
        ReplicationPeer peer = null;
        try {
            peer = this.getPeer(peerId);
        }
        catch (Exception e) {
            throw new ReplicationException("Error connecting to peer with id=" + peerId, e);
        }
        if (peer == null) {
            return false;
        }
        this.peerClusters.put(peerId, peer);
        LOG.info("Added new peer cluster " + peer.getClusterKey());
        return true;
    }

    @Override
    public void disconnectFromPeer(String peerId) {
        ReplicationPeer rp = this.peerClusters.get(peerId);
        if (rp != null) {
            rp.getZkw().close();
            this.peerClusters.remove(peerId);
        }
    }

    @Override
    public Map<String, String> getAllPeerClusterKeys() {
        TreeMap<String, String> peers = new TreeMap<String, String>();
        List<String> ids = null;
        try {
            ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
            for (String id : ids) {
                byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
                String clusterKey = null;
                try {
                    clusterKey = ReplicationPeersZKImpl.parsePeerFrom(bytes);
                }
                catch (DeserializationException de) {
                    LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
                    continue;
                }
                peers.put(id, clusterKey);
            }
        }
        catch (KeeperException e) {
            this.abortable.abort("Cannot get the list of peers ", e);
        }
        return peers;
    }

    @Override
    public List<ServerName> getRegionServersOfConnectedPeer(String peerId) {
        List<ServerName> addresses;
        if (this.peerClusters.size() == 0) {
            return Collections.emptyList();
        }
        ReplicationPeer peer = this.peerClusters.get(peerId);
        if (peer == null) {
            return Collections.emptyList();
        }
        try {
            addresses = ReplicationPeersZKImpl.fetchSlavesAddresses(peer.getZkw());
        }
        catch (KeeperException ke) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Fetch salves addresses failed.", ke);
            }
            this.reconnectPeer(ke, peer);
            addresses = Collections.emptyList();
        }
        peer.setRegionServers(addresses);
        return peer.getRegionServers();
    }

    @Override
    public UUID getPeerUUID(String peerId) {
        ReplicationPeer peer = this.peerClusters.get(peerId);
        if (peer == null) {
            return null;
        }
        UUID peerUUID = null;
        try {
            peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw());
        }
        catch (KeeperException ke) {
            this.reconnectPeer(ke, peer);
        }
        return peerUUID;
    }

    @Override
    public Set<String> getConnectedPeers() {
        return this.peerClusters.keySet();
    }

    @Override
    public Configuration getPeerConf(String peerId) throws ReplicationException {
        String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
        byte[] data = null;
        try {
            data = ZKUtil.getData(this.zookeeper, znode);
        }
        catch (KeeperException e) {
            throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
        }
        if (data == null) {
            LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
            return null;
        }
        String otherClusterKey = "";
        try {
            otherClusterKey = ReplicationPeersZKImpl.parsePeerFrom(data);
        }
        catch (DeserializationException e) {
            LOG.warn("Failed to parse cluster key from peerId=" + peerId + ", specifically the content from the following znode: " + znode);
            return null;
        }
        Configuration otherConf = new Configuration(this.conf);
        try {
            ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
        }
        catch (IOException e) {
            LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
            return null;
        }
        return otherConf;
    }

    @Override
    public List<String> getAllPeerIds() {
        List<String> ids = null;
        try {
            ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
        }
        catch (KeeperException e) {
            this.abortable.abort("Cannot get the list of peers ", e);
        }
        return ids;
    }

    @Override
    public long getTimestampOfLastChangeToPeer(String peerId) {
        if (!this.peerClusters.containsKey(peerId)) {
            throw new IllegalArgumentException("Unknown peer id: " + peerId);
        }
        return this.peerClusters.get(peerId).getLastRegionserverUpdate();
    }

    private void connectExistingPeers() throws ReplicationException {
        List<String> znodes = null;
        try {
            znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
        }
        catch (KeeperException e) {
            throw new ReplicationException("Error getting the list of peer clusters.", e);
        }
        if (znodes != null) {
            for (String z : znodes) {
                this.connectToPeer(z);
            }
        }
    }

    private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
        if (ke instanceof KeeperException.ConnectionLossException || ke instanceof KeeperException.SessionExpiredException || ke instanceof KeeperException.AuthFailedException) {
            LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
            try {
                peer.reloadZkWatcher();
                peer.getZkw().registerListener(new PeerRegionServerListener(peer));
            }
            catch (IOException io) {
                LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
            }
        }
    }

    private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw) throws KeeperException {
        List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
        if (children == null) {
            return Collections.emptyList();
        }
        ArrayList<ServerName> addresses = new ArrayList<ServerName>(children.size());
        for (String child : children) {
            addresses.add(ServerName.parseServerName(child));
        }
        return addresses;
    }

    private String getTableCFsNode(String id) {
        return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
    }

    private String getPeerStateNode(String id) {
        return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
    }

    private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state) throws ReplicationException {
        try {
            byte[] stateBytes;
            if (!this.peerExists(id)) {
                throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id + " does not exist.");
            }
            String peerStateZNode = this.getPeerStateNode(id);
            byte[] byArray = stateBytes = state == ZooKeeperProtos.ReplicationState.State.ENABLED ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
            if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
                ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
            } else {
                ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
            }
            LOG.info("Peer with id= " + id + " is now " + state.name());
        }
        catch (KeeperException e) {
            throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
        }
    }

    private ReplicationPeer getPeer(String peerId) throws ReplicationException {
        Configuration peerConf = this.getPeerConf(peerId);
        if (peerConf == null) {
            return null;
        }
        if (this.ourClusterKey.equals(ZKUtil.getZooKeeperClusterKey(peerConf))) {
            LOG.debug("Not connecting to " + peerId + " because it's us");
            return null;
        }
        ReplicationPeer peer = new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
        try {
            peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
        }
        catch (KeeperException e) {
            throw new ReplicationException("Error starting the peer state tracker for peerId=" + peerId, e);
        }
        try {
            peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId));
        }
        catch (KeeperException e) {
            throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" + peerId, e);
        }
        peer.getZkw().registerListener(new PeerRegionServerListener(peer));
        return peer;
    }

    private static String parsePeerFrom(byte[] bytes) throws DeserializationException {
        if (ProtobufUtil.isPBMagicPrefix(bytes)) {
            ZooKeeperProtos.ReplicationPeer peer;
            int pblen = ProtobufUtil.lengthOfPBMagic();
            ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
            try {
                peer = ((ZooKeeperProtos.ReplicationPeer.Builder)builder.mergeFrom(bytes, pblen, bytes.length - pblen)).build();
            }
            catch (InvalidProtocolBufferException e) {
                throw new DeserializationException(e);
            }
            return peer.getClusterkey();
        }
        if (bytes.length > 0) {
            return Bytes.toString(bytes);
        }
        return "";
    }

    private static byte[] toByteArray(String clusterKey) {
        byte[] bytes = ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build().toByteArray();
        return ProtobufUtil.prependPBMagic(bytes);
    }

    public static class PeerRegionServerListener
    extends ZooKeeperListener {
        private ReplicationPeer peer;
        private String regionServerListNode;

        public PeerRegionServerListener(ReplicationPeer replicationPeer) {
            super(replicationPeer.getZkw());
            this.peer = replicationPeer;
            this.regionServerListNode = this.peer.getZkw().rsZNode;
        }

        public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) {
            super(zkw);
            this.regionServerListNode = regionServerListNode;
        }

        @Override
        public synchronized void nodeChildrenChanged(String path) {
            if (path.equals(this.regionServerListNode)) {
                try {
                    LOG.info("Detected change to peer regionservers, fetching updated list");
                    this.peer.setRegionServers(ReplicationPeersZKImpl.fetchSlavesAddresses(this.peer.getZkw()));
                }
                catch (KeeperException e) {
                    LOG.fatal("Error reading slave addresses", e);
                }
            }
        }
    }
}

