package dmo.fs.db.wsnext.hib;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import dmo.fs.db.MessageUser;
import dmo.fs.db.MessageUserImpl;
import dmo.fs.db.router.wsnext.DodexRouter;
import dmo.fs.db.wsnext.admin.CleanOrphanedUsers;
import dmo.fs.db.wsnext.hib.srv.DodexService;
import dmo.fs.kafka.KafkaEmitterDodex;
import dmo.fs.utils.ColorUtilConstants;
import dmo.fs.utils.DodexUtil;
import dmo.fs.utils.Err;
import io.quarkus.websockets.next.WebSocketConnection;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import jakarta.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serial;
import java.io.Serializable;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public abstract class DodexHibernateReactiveBase extends DodexService implements DodexHibernateReactive, Serializable {
    @Serial
    private static final long serialVersionUID = 1L;
    private final static Logger logger = LoggerFactory.getLogger(DodexHibernateReactiveBase.class.getName());

    static protected String dbName;
    protected Map<String, String> queryParams;
    protected String remoteAddress = null;
    protected Map<String, Map<String, String>> sessionsNext = new ConcurrentHashMap<>();
    protected Map<String, WebSocketConnection> sessions = new ConcurrentHashMap<>();
    protected final KafkaEmitterDodex ke = DodexRouter.getKafkaEmitterDodex();
    protected boolean isSetupDone;

    @Inject
    Vertx vertx;
    @Inject
    protected WebSocketConnection connection;

    public DodexHibernateReactiveBase() {
    }

    public void databaseSetup() throws InterruptedException {
    }

    protected long broadcast(WebSocketConnection connection, String message, Map<String, String> queryParams) {
        return connection.getOpenConnections().stream().filter(session -> {
            if (connection.id().equals(session.id())) {
                return false;
            }
            CompletableFuture<Void> complete = session.sendText(message).subscribe().asCompletionStage();
            if (complete.isCompletedExceptionally()) {
                logger.info(Err.displayErr(49), ColorUtilConstants.BLUE_BOLD_BRIGHT,
                  queryParams.get("handle"), ": Exception in broadcast",
                  ColorUtilConstants.RESET);
            }
            return true;
        }).count();
    }

    protected WebSocketConnection getThisWebSocket(WebSocketConnection connection) {
        return connection.getOpenConnections().stream()
          .filter(s -> s.id().equals(connection.id())).findFirst().orElse(connection);
    }

    protected void doConnection(WebSocketConnection session) {
        final MessageUser messageUser = setMessageUser(session);
        WebSocketConnection ws = getThisWebSocket(session);

        isSetupDone = true;
//                cleanupPromise.complete(pool);

        try {
            Uni<MessageUser> future = selectUser(messageUser);

            future.subscribeAsCompletionStage().thenComposeAsync(resultUser -> {
                /*
                 * Send list of registered users with connected notification
                 */
                buildUsersJson(resultUser).onItemOrFailure().invoke((userJson, err) -> {
                    if (err != null) {
                        logger.error("{}{}", Err.displayErr(40), err.getMessage());
                    }

                    ws.sendText("connected:" + userJson) // Users for private messages
                      .subscribe().asCompletionStage();
                }).subscribe().asCompletionStage();

                /*
                 * Send undelivered messages and remove user related messages.
                 */
                try {
                    processUserMessages(ws, resultUser).onItemOrFailure().invoke((map, err) -> {
                        if (err != null) {
                            logger.error("{}{}", Err.displayErr(41), err.getMessage());
                        }

                        int messageCount = map.get("messages");
                        if (messageCount > 0) {
                            logger.info(Err.displayErr(48),
                              ColorUtilConstants.BLUE_BOLD_BRIGHT, messageCount, resultUser.getName(),
                              ColorUtilConstants.RESET);
                            if (ke != null) {
                                ke.setValue("delivered", messageCount);
                            }
                        }
                    }).subscribe().asCompletionStage();
                } catch (ExecutionException | InterruptedException e) {
                    logger.error("{}{}", Err.displayErr(33), e.getMessage());
                }
                return CompletableFuture.completedFuture(resultUser);
            });
        } catch (IOException e) {
            logger.error("{}{}", Err.displayErr(38), e.getMessage());
        }
    }

    public void doMessage(WebSocketConnection session, Map<String, WebSocketConnection> sessions, String message) {
        final MessageUser messageUser = setMessageUser(session);
        final List<String> onlineUsers = new ArrayList<>();
        // Checking if message or command
        final Map<String, String> returnObject = DodexUtil.commandMessage(message);
        final String selectedUsers = returnObject.get("selectedUsers");
        // message with command stripped out
        final String computedMessage = returnObject.get("message");
        final String command = returnObject.get("command");

        if (";removeuser".equals(command)) {
            deleteUser(messageUser).onItemOrFailure().invoke((id, err) -> {
                if (err != null) {
                    logger.error("{}{}", Err.displayErr(34), err.getMessage());
                    session.sendText(err.getMessage()).subscribe().asCompletionStage();
                }
            }).subscribe().asCompletionStage();
        }

        sessions = session.getOpenConnections()
          .stream()
          .collect(Collectors.toConcurrentMap(WebSocketConnection::id, v -> v));

        if (!computedMessage.isEmpty()) {
            // broadcast
            if ("".equals(selectedUsers) && "".equals(command)) {
                long count = broadcast(session, messageUser.getName() + ": " + computedMessage, queryParams);
                String handles = "handle";
                handles = count == 1 ? handles : handles + "s";

                session.sendText(String.format(Err.displayErr(47), count, handles)).subscribe().asCompletionStage();

                if (ke != null) {
                    ke.setValue(1);
                }
            }

            sessions.values().stream().filter(s -> !s.id().equals(getThisWebSocket(session).id()) /*&& getThisWebSocket(session).isOpen()*/)
              .forEach(s -> {
                  final String handle = sessionsNext.get(s.id()).get("handle");
                  // private message
                  if (Arrays.stream(selectedUsers.split(",")).anyMatch(h -> h.contains(handle))) {
                      CompletableFuture<Void> complete = s.sendText(messageUser.getName() + ": " + computedMessage)
                        .subscribe().asCompletionStage();
                      if (complete.isCompletedExceptionally()) {
                          if (logger.isInfoEnabled()) {
                              logger.info(Err.displayErr(46),
                                ColorUtilConstants.BLUE_BOLD_BRIGHT, sessionsNext.get(s.id()).get("handle"),
                                ColorUtilConstants.RESET);
                          }
                      }
                      // keep track of delivered messages
                      onlineUsers.add(handle);
                  }
              });

            if ("".equals(selectedUsers) && !"".equals(command)) {
                session.sendText("Private user not selected").subscribeAsCompletionStage().isDone();
            } else {
                session.sendText("ok").subscribeAsCompletionStage().isDone();
                if (!onlineUsers.isEmpty()) {
                    if (ke != null) {
                        ke.setValue("private", onlineUsers.size());
                    }
                }
            }
        }

        // calculate difference between selected and online users
        if (!selectedUsers.isEmpty()) {
            final List<String> selected = Arrays.asList(selectedUsers.split(","));
            final List<String> disconnectedUsers = selected.stream().filter(user -> !onlineUsers.contains(user))
              .collect(Collectors.toList());
            // Save protected message to send when to-user logs in
            if (!disconnectedUsers.isEmpty()) {
                Uni<Long> future = addMessage(messageUser, computedMessage);

                future.onItemOrFailure().invoke((key, err) -> {
                    if (key.equals(0L)) {
                        if (err != null) {
                            logger.error("{}{}", Err.displayErr(35), err.getMessage());
                        } else {
                            logger.error("{}{}", Err.displayErr(36), "");
                        }
                    } else {
                        try {
                            addUndelivered(messageUser, disconnectedUsers, key).onItemOrFailure()
                              .invoke((v2, err2) -> {
                                  if (err2 != null) {
                                      logger.error("{}{}", Err.displayErr(37), err2.getMessage());
                                  }
                                  messageUser.getReactiveSession().close();
                              }).subscribe().asCompletionStage();
                        } catch (ExecutionException | InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        if (ke != null) {
                            ke.setValue("undelivered", disconnectedUsers.size());
                        }
                    }
                }).subscribe().asCompletionStage();
            }
            if (!onlineUsers.isEmpty()) {
                if (ke != null) {
                    ke.setValue("protected", onlineUsers.size());
                }
            }
        }
    }

    protected MessageUser setMessageUser(WebSocketConnection session) {
        final MessageUser messageUser = createMessageUser();
        if (session == null) {
            return messageUser;
        }

        int i = 0;
        while (!session.isOpen()) {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                break;
            }
            if (i++ == 2) {
                break;
            }
        }

        final Map<String, String> queryParams = sessionsNext.get(session.id());

        String handle = "";
        String id = "";

        handle = queryParams.get("handle");
        id = queryParams.get("id");

        messageUser.setName(handle);
        messageUser.setPassword(id);
        String thisRemoteAddress = sessionsNext.get(session.id()).get("remoteAddress");

        messageUser.setIp(thisRemoteAddress == null && remoteAddress == null ? "Unknown" :
          remoteAddress == null ? thisRemoteAddress : remoteAddress);

        if (remoteAddress == null) {
            remoteAddress = messageUser.getIp();
        }
        return messageUser;
    }

    protected void setup() throws InterruptedException, IOException, SQLException {
//        dodexDatabase = DbConfiguration.getDefaultDb();
//        dbPromise = dodexDatabase.databaseSetup();

        /*
         * Optional auto user cleanup - config in "application-conf.json". When client
         * changes handle when server is down, old users and undelivered messages will
         * be orphaned.
         *
         * Defaults: off - when turned on 1. execute on start up and every 7 days
         * thereafter. 2. remove users who have not logged in for 90 days.
         */

        final Optional<Context> context = Optional.ofNullable(vertx.getOrCreateContext());
        if (context.isPresent()) {
            final Optional<JsonObject> jsonObject = Optional.ofNullable(vertx.getOrCreateContext().config());
            try {
                JsonObject config = jsonObject.orElseGet(JsonObject::new);
                if (config.isEmpty()) {
                    ObjectMapper jsonMapper = new ObjectMapper();
                    JsonNode node;

                    try (InputStream in = getClass().getResourceAsStream("/application-conf.json")) {
                        node = jsonMapper.readTree(in);
                    }
                    config = JsonObject.mapFrom(node);
                }
                final Optional<Boolean> runClean = Optional.ofNullable(config.getBoolean("clean.run"));
                if (runClean.isPresent() && runClean.get().equals(true)) {
                    final CleanOrphanedUsers clean = new CleanOrphanedUsers();
//                    clean.setDatabase(dodexDatabase);
//                    clean.setPromise(cleanupPromise);
                    clean.startClean(config);
                }
            } catch (final Exception exception) {
                logger.error(Err.displayErr(42),
                  ColorUtilConstants.RED_BOLD_BRIGHT, exception.getMessage(), ColorUtilConstants.RESET);
            }
        }

        String defaultDb = new DodexUtil().getDefaultDb();
        String startupMessage = Err.displayErr(44) + defaultDb;

        startupMessage = "dev".equals(DodexUtil.getEnv()) ? Err.displayErr(45) + defaultDb + " "
          : startupMessage;
        logger.info(Err.displayErr(43),
          ColorUtilConstants.BLUE_BOLD_BRIGHT, startupMessage, ColorUtilConstants.RESET);
    }

    @Override
    public MessageUser createMessageUser() {
        return new MessageUserImpl();
    }

    public void setRemoteAddress(String remoteAddress) {
        this.remoteAddress = remoteAddress;
    }
}
