
package dmo.fs.db.wsnext.hib.srv;

import dmo.fs.db.MessageUser;
import dmo.fs.db.openapi.GroupHibernateService;
import dmo.fs.entities.Messages;
import dmo.fs.entities.Undelivered;
import dmo.fs.entities.Users;
import dmo.fs.entities.Users_;
import dmo.fs.quarkus.Server;
import dmo.fs.utils.Err;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.websockets.next.WebSocketConnection;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiSubscribe;
import io.vertx.core.Context;
import io.vertx.core.json.JsonArray;
import io.vertx.mutiny.core.Promise;
import jakarta.persistence.criteria.*;
import jakarta.transaction.Transactional;
import org.hibernate.reactive.mutiny.Mutiny;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

public abstract class DodexService {
    private final static Logger logger = LoggerFactory.getLogger(DodexService.class.getName());
    protected Mutiny.SessionFactory sessionFactory;

    protected Uni<List<Map<String, String>>> getAllUsers(MessageUser messageUser) {
        Promise<List<Map<String, String>>> userListPromise = Promise.promise();
        CriteriaBuilder builder = sessionFactory.getCriteriaBuilder();
        CriteriaQuery<Users> userQuery = builder.createQuery(Users.class);
        Root<Users> root = userQuery.from(Users.class);

        ParameterExpression<String> value = builder.parameter(String.class, Users_.NAME);
        userQuery.select(root).where(builder.notEqual(root.<String>get(Users_.NAME), value));

        sessionFactory.openSession().onItemOrFailure().invoke((session, err) -> {
            if (err != null) {
                logger.error("{}{}", Err.displayErr(39), err.getMessage());
            }
            Mutiny.SelectionQuery<Users> query = session.createQuery(userQuery);
            query.setParameter(Users_.NAME, messageUser.getName());

            query.getResultList().onItemOrFailure().invoke((results, err2) -> {
                if (err2 != null) {
                    logger.error("{}{}", Err.displayErr(1), err2.getMessage());
                }
                List<Map<String, String>> userList = new ArrayList<>();

                results.forEach(user -> {
                    Map<String, String> userMap = new HashMap<>();
                    userMap.put(Users_.NAME, user.getName());
                    userList.add(userMap);
                });

                userListPromise.complete(userList);
                session.close().onItemOrFailure().invoke((c, err3) -> {
                    if (err3 != null) {
                        logger.error("{}{}", Err.displayErr(2), err3.getMessage());
                    }
                }).subscribe().asCompletionStage();
            }).onFailure().invoke(err4 -> {
                List<Map<String, String>> userList = new ArrayList<>();
                Map<String, String> errorMap = new HashMap<>();
                errorMap.put("status", err4.getMessage());
                userList.add(errorMap);
                userListPromise.complete(userList);
                session.close().onItemOrFailure().invoke((c, err5) -> {
                    if (err5 != null) {
                        logger.error("{}{}", Err.displayErr(3), err5.getMessage());
                    }
                }).subscribe().asCompletionStage();
            }).subscribe().asCompletionStage();
        }).subscribe().asCompletionStage();

        return userListPromise.future();
    }

    public Uni<List<Users>> getUserByName(List<String> userNames, Mutiny.Session session) {
        final Promise<List<Users>> userPromise = Promise.promise();
        CriteriaBuilder builder = session.getCriteriaBuilder();
        CriteriaQuery<Users> userQuery = builder.createQuery(Users.class);
        Root<Users> root = userQuery.from(Users.class);

        userQuery.select(root).where(builder.and(root.get(Users_.NAME).in(userNames)));

        Mutiny.SelectionQuery<Users> query = session.createQuery(userQuery);
        query.getResultList().onItemOrFailure().invoke((users, err) -> {
            if (err != null) {
                logger.error("{}{}", Err.displayErr(4), err.getMessage());
            }
            userPromise.complete(users);
        }).subscribe().asCompletionStage();

        return userPromise.future();
    }

    public Uni<Users> getUserById(MessageUser messageUser, Mutiny.Session session) {
        Promise<Users> userPromise = Promise.promise();

        messageUser.setReactiveSession(session);
        CriteriaBuilder builder = session.getCriteriaBuilder();
        CriteriaQuery<Users> userQuery = builder.createQuery(Users.class);
        Root<Users> root = userQuery.from(Users.class);

        Predicate equalName = builder.equal(root.get(Users_.NAME), messageUser.getName());
        Predicate equalPassword = builder.equal(root.get(Users_.PASSWORD), messageUser.getPassword());
        userQuery.select(root).where(builder.and(equalName, equalPassword));

        Mutiny.SelectionQuery<Users> query = session.createQuery(userQuery);

        query.getSingleResult().onItemOrFailure().invoke((user, err) -> {
            if (err != null) {
                if (!err.getMessage().startsWith("No result found")) {
                    logger.error("{}{}", Err.displayErr(5), err.getMessage());
                }
                messageUser.setStatusMessage(err.getMessage());
            }
            userPromise.complete(user);

        }).subscribe().asCompletionStage();

        return userPromise.future();
    }

    public Uni<List<Messages>> getUserUndelivered(MessageUser messageUser, Mutiny.Session session) {
        Promise<List<Messages>> messagesPromise = Promise.promise();
        CriteriaBuilder builder = session.getCriteriaBuilder();
        CriteriaQuery<Messages> criteriaQuery = builder.createQuery(Messages.class);

        Root<Users> userRoot = criteriaQuery.from(Users.class);
        Join<Users, Messages> users = userRoot.join("messages");
        criteriaQuery.where(builder.equal(userRoot.get("id"), messageUser.getId()));

        session.createQuery(criteriaQuery.select(users)).getResultList()
          .onItemOrFailure().invoke((list, err) -> {
              if (err != null) {
                  logger.error("{}{}", Err.displayErr(6), err.getMessage());
              }
          }).subscribe().asCompletionStage().whenCompleteAsync((list, err) -> {
              if (err != null) {
                  logger.error("{}{}", Err.displayErr(7), err.getMessage());
                  messageUser.setStatusMessage(err.getMessage());
              }
              messagesPromise.complete(list);
          });
        return messagesPromise.future();
    }

    public Uni<Long> addMessage(MessageUser messageUser, String mess) {
        Promise<Long> returnId = Promise.promise();
        Messages message = new Messages();

        message.setPostDate(LocalDateTime.now());
        message.setMessage(mess);
        message.setFromHandle(messageUser.getName());
        io.vertx.core.Context duplicatedContext = getVertxContext();
        duplicatedContext.runOnContext(v -> {
            sessionFactory.openSession().onItemOrFailure().invoke((session, err) -> {
                messageUser.setReactiveSession(session);
                session.persist(message).onItemOrFailure().invoke((v2, err2) -> {
                    if (err2 != null) {
                        logger.error(Err.displayErr(8), err2.getMessage(), message.getId());
                        returnId.complete(0L);
                    } else {
                        returnId.complete(message.getId());
                    }
                }).subscribe().asCompletionStage();
            }).subscribe().asCompletionStage();
        });
        return returnId.future();
    }

    private Uni<String> persistUndelivered(List<Users> users, Long messageId, Mutiny.Session session) {
        Promise<String> persisted = Promise.promise();
        List<Undelivered> undeliveredList = new ArrayList<>();
        for (Users user : users) {
            Undelivered undelivered = new Undelivered();
            Undelivered.UndeliveredId undeliveredId = new Undelivered.UndeliveredId(user.getId(), messageId.longValue());
            undelivered.setUndeliveredId(undeliveredId);
            undeliveredList.add(undelivered);
        }
        for (Undelivered undelivered : undeliveredList) {
            session.persist(undelivered).onItemOrFailure().invoke((v, err) -> {
                if (err != null) {
                    logger.error("{}{}", Err.displayErr(9), err.getMessage());
                }
            }).subscribe().with(v -> {});
        }
        persisted.complete("");

        return persisted.future();
    }

    public Uni<Long> addUndelivered(MessageUser messageUser, List<String> destination, Long messageId)
      throws ExecutionException, InterruptedException {
        Promise<Long> finishedPromise = Promise.promise();
        Mutiny.Session session = messageUser.getReactiveSession();
        Map<String, Long> userIds = new HashMap<>();

        getUserByName(destination, session).onItemOrFailure().invoke((users, err) -> {
            persistUndelivered(users, messageId, session).onItemOrFailure().invoke((s, err1) -> {
                if (err1 != null) {
                    logger.error("{}{}", Err.displayErr(10), err1.getMessage());
                    session.close().onItemOrFailure().invoke((v, err3) -> {
                        if (err3 != null) {
                            logger.error("{}{}", Err.displayErr(11), err3.getMessage());
                        }
                        finishedPromise.complete(messageId);
                    }).subscribe().asCompletionStage();
                } else {
                    session.flush().onItemOrFailure().invoke((v2, err2) -> {
                        if (err2 != null) {
                            logger.error("{}{}", Err.displayErr(12), err2.getMessage());
                        }
                        session.clear();

                        session.close().onItemOrFailure().invoke((v, err3) -> {
                            if (err3 != null) {
                                logger.error("{}{}", Err.displayErr(13), err3.getMessage());
                            }
                            finishedPromise.complete(messageId);
                        }).subscribe().asCompletionStage();
                    }).subscribe().asCompletionStage();
                }
            }).subscribe().asCompletionStage();
        }).subscribe().asCompletionStage();

        return finishedPromise.future();
    }

    public Uni<MessageUser> selectUser(MessageUser messageUser)
      throws IOException {
        Promise<MessageUser> promiseMessageUser = Promise.promise();
        /*
            The context code is required to switch to the vertx context
            Because websocket is not ApplicationScoped
            Apparently secret stuff
         */
        Context duplicatedContext = getVertxContext();

        if(sessionFactory == null) {
            sessionFactory = GroupHibernateService.getSessionFactory();
        }
        duplicatedContext.runOnContext(v -> {
            sessionFactory.openSession().onItem().invoke(messageUser::setReactiveSession)
              .subscribe().with(session -> {

                  Uni<Users> uniUser = getUserById(messageUser, session);

                  CompletableFuture<Users> users = uniUser.onItem().invoke(foundUser -> {
                      if (messageUser.getStatusMessage() == null) { // update user if found
                          foundUser.setLastLogin(LocalDateTime.now());

                          foundUser.setLastLogin(LocalDateTime.now());

                          session.merge(foundUser).onItem().invoke(u -> {
                              session.flush().onItem().invoke(() -> {
                                  messageUser.setId(foundUser.getId());
                                  messageUser.setLastLogin(foundUser.getLastLogin());

                                  session.close().onItemOrFailure().invoke((c, err) -> {
                                      if (err != null) {
                                          logger.error("{}{}", Err.displayErr(14), err.getMessage());
                                      }
                                  }).subscribe().asCompletionStage();
                                  promiseMessageUser.complete(messageUser);
                              }).subscribe().asCompletionStage();
                          }).onItemOrFailure().invoke((m, err) -> {
                              if (err != null) {
                                  logger.error("{}{}", Err.displayErr(15), err.getMessage());
                              }
                          }).subscribe().asCompletionStage();
                      } else { // add user when not found
                          messageUser.setLastLogin(LocalDateTime.now());
                          if (messageUser.getIp() == null) {
                              messageUser.setIp("Unknown");
                          }


                          Users user = new Users();
                          if (messageUser.getLastLogin() == null) {
                              messageUser.setLastLogin(LocalDateTime.now());
                          }
                          user.setLastLogin(LocalDateTime.now());
                          user.setName(messageUser.getName());
                          user.setPassword(messageUser.getPassword());
                          user.setIp(messageUser.getIp());

                          session.persist(user).onItemOrFailure().invoke((v0, err0) -> {
                              if (err0 != null) {
                                  logger.error("{}{}", Err.displayErr(16), err0.getMessage());
                                  session.close().onItemOrFailure().invoke((v1, err1) -> {
                                        if (err1 != null) {
                                            logger.error("{}{}", Err.displayErr(17), err1.getMessage());
                                        }
                                    }).subscribe().asCompletionStage();
                                  promiseMessageUser.complete(messageUser);
                              } else {
                                  session.flush().onItemOrFailure().invoke((v1, err) -> {
                                      if (err != null) {
                                          logger.error("{}{}", Err.displayErr(18), err.getMessage());
                                      } else {
                                          messageUser.setId(user.getId());
                                          messageUser.setLastLogin(user.getLastLogin());
                                      }
                                      session.close().onItemOrFailure().invoke((v2, err3) -> {
                                          if (err3 != null) {
                                              logger.error("{}{}", Err.displayErr(17), err3.getMessage());
                                          }
                                          promiseMessageUser.complete(messageUser);
                                      }).subscribe().asCompletionStage();
                                  }).subscribe().asCompletionStage();
                              }
                          }).subscribe().asCompletionStage();
                      }
                  }).subscribeAsCompletionStage();
              });
        });
        return promiseMessageUser.future();
    }

    public Uni<String> buildUsersJson(MessageUser messageUser) {
        Promise<String> returnedJson = Promise.promise();
        Context duplicatedContext = getVertxContext();

        duplicatedContext.runOnContext(v -> {
            Uni<List<Map<String, String>>> userList = getAllUsers(messageUser);
            userList.onItemOrFailure().invoke((list, err) -> {
                if (err != null) {
                    logger.error("{}{}", Err.displayErr(1), err.getMessage());
                }
                JsonArray jsonArray = JsonArray.of(list);

                returnedJson.complete(jsonArray.toString().replace("[[", "[").replace("]]", "]"));
            }).subscribe().asCompletionStage();
        });

        return returnedJson.future();
    }

    private Uni<Messages> removeMessage(Messages message) {
        Promise<Messages> messagePromise = Promise.promise();

        sessionFactory.openSession().onItemOrFailure().invoke((session, err) -> {
            session.find(Messages.class, message.getId()).onItemOrFailure()
              .invoke((currentMessage, err1) -> {
                  if (err1 != null) {
                      logger.error("{}{}", Err.displayErr(19), err1.getMessage());
                      throw new RuntimeException(err1.getMessage());
                  }

                  if(currentMessage.getUndelivered().isEmpty()) {
                      session.remove(currentMessage).onItemOrFailure().invoke((v, err2) -> {
                          if (err2 != null) {
                              logger.error("{}{}", Err.displayErr(20), err2.getMessage());
                          }
                          session.flush().onItemOrFailure().invoke((v4, err4) -> {
                              if (err4 != null) {
                                  logger.error("{}{}", Err.displayErr(21), err4.getMessage());
                              }

                              session.close().onItemOrFailure().invoke((v5, err5) -> {
                                  if (err5 != null) {
                                      logger.error("{}{}", Err.displayErr(22), err5.getMessage());
                                  }
                                  messagePromise.complete(message);
                              }).subscribe().asCompletionStage();
                          }).subscribe().asCompletionStage();
                      }).subscribe().asCompletionStage();
                  } else {
                       session.close().onItemOrFailure().invoke((v5, err5) -> {
                          if (err5 != null) {
                              logger.error("{}{}", Err.displayErr(22), err5.getMessage());
                          }
                          messagePromise.complete(message);
                      }).subscribe().asCompletionStage();
                  }
              }).subscribe().asCompletionStage();

        }).subscribe().asCompletionStage();

        return messagePromise.future();
    }

    protected Uni<String> removeUndelivered(Long userId, Long messageId) {
        Promise<String> removePromise = Promise.promise();

        sessionFactory.openSession().onItemOrFailure().invoke((session, err0) -> {
            session.find(Undelivered.class, new Undelivered.UndeliveredId(userId, messageId))
              .onItemOrFailure().invoke((undelivered, err) -> {
                  if (err != null) {
                      logger.error("{}{}", Err.displayErr(23), err.getMessage());
                      session.close().subscribe().with(v2 -> {
                          removePromise.complete("");
                      });
                  } else {
                      session.remove(undelivered).onItemOrFailure().invoke((v, err2) -> {
                          if (err2 != null) {
                              logger.error("{}{}", Err.displayErr(24), err2.getMessage());
                          }
                          session.flush().onItemOrFailure().invoke((v2, err3) -> {
                              if (err3 != null) {
                                  logger.error("{}{}", Err.displayErr(25), err3.getMessage());
                              }

                              session.close().onItemOrFailure().invoke((v3, err4) -> {
                                  if (err4 != null) {
                                      logger.error("{}{}", Err.displayErr(26), err4.getMessage());
                                  }
                                  removePromise.complete("");
                              }).subscribe().asCompletionStage();
                          }).subscribe().asCompletionStage();
                      }).subscribe().asCompletionStage();
                  }
              }).subscribe().asCompletionStage();
        }).subscribe().asCompletionStage();
        return removePromise.future();
    }

    public Uni<Map<String, Integer>> processUserMessages(WebSocketConnection ws, MessageUser messageUser)
      throws ExecutionException, InterruptedException {
        Promise<Map<String, Integer>> messagesPromise = Promise.promise();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("MM-dd@HH");
        Map<String, Integer> counts = new ConcurrentHashMap<>();
        Context duplicatedContext = getVertxContext();

        duplicatedContext.runOnContext(v -> {
            sessionFactory.openSession().onItemOrFailure().invoke((session, err) -> {
                if (err != null) {
                    logger.error("{}{}", Err.displayErr(27), err.getMessage());
                    throw new RuntimeException(err.getMessage());
                }

                Uni<List<Messages>> uniList = getUserUndelivered(messageUser, session);
                uniList.onItemOrFailure().invoke((messageList, err1) -> {
                    if (messageUser.getStatusMessage() != null) {
                        counts.put("messages", 0);
                        if(!messageUser.getStatusMessage().startsWith(Err.displayErr(66))) {
                            logger.error("{}{}", Err.displayErr(28), messageUser.getStatusMessage());
                        }
                        session.close().subscribe().asCompletionStage();
                        throw new RuntimeException(messageUser.getStatusMessage());
                    }

                    MultiSubscribe<Messages> messages = Multi.createFrom().iterable(messageList)
                      .onItem().call(message -> {
                          ws.sendText(
                            message.getFromHandle() +
                              message.getPostDate().format(formatter) + " " + message.getMessage()
                          ).onItemOrFailure().invoke((v2, err2) -> {
                              if (err2 != null) {
                                  logger.error("{}{}", Err.displayErr(29), err2.getMessage());
                              }
                              removeUndelivered(messageUser.getId(),
                                message.getId()).onItemOrFailure()
                                .invoke((remove, removeErr) -> {
                                    if (removeErr != null) {
                                        logger.error("{}{}", Err.displayErr(30), removeErr.getMessage());
                                    } else {
                                        if(logger.isDebugEnabled()) {
                                            logger.debug("Removed Undelivered");
                                        }
                                        removeMessage(message)
                                          .onItemOrFailure().invoke((mess, err3) -> {
                                              if (err3 != null) {
                                                  logger.error("{}{}", Err.displayErr(20), err3.getMessage());
                                              }
                                              if(logger.isDebugEnabled()) {
                                                  logger.debug("Message Removed: {}", message.getId());
                                              }
                                          }).subscribe().asCompletionStage();
                                    }
                                }).subscribe().asCompletionStage();
                          }).subscribe().asCompletionStage();
                          /*
                            If remove message fails, increase the delay time.
                           */
                          return Uni.createFrom().nullItem().onItem().delayIt().by(Duration.ofMillis(25));
                      }).subscribe();


                    AtomicInteger counter = new AtomicInteger();
                    messages.asIterable().stream().iterator().forEachRemaining(m -> {
                        if (counter.incrementAndGet() == messageList.size()) {
                            counts.put("messages", messageList.size());
                            messagesPromise.complete(counts);
                        }
                    });
                }).subscribe().asCompletionStage();
            }).subscribe().asCompletionStage();
        });
        return messagesPromise.future();
    }

    public Uni<Long> deleteUser(MessageUser messageUser) {
        Promise<Long> userId = Promise.promise();
        Context duplicatedContext = getVertxContext();

        duplicatedContext.runOnContext(v -> {
            sessionFactory.openSession().onItemOrFailure().invoke((session, err) -> {
                messageUser.setReactiveSession(session);

                Uni<Users> uniUser = getUserById(messageUser, session);

                uniUser.onItemOrFailure().invoke((user, err1) -> {
                    if (err1 != null) {
                        logger.error("{}{}", Err.displayErr(31), err1.getMessage());
                    }
                    session.remove(user).onItemOrFailure().invoke((v2, err2) -> {
                        if (err2 != null) {
                            logger.error("{}{}", Err.displayErr(32), err2.getMessage());
                        }
                        session.flush().onItemOrFailure().invoke((v3, err3) -> {
                            userId.complete(user.getId());
                            session.close().subscribe().with(v4 -> {
                            });
                        }).subscribe().asCompletionStage();
                    }).subscribe().asCompletionStage();
                }).subscribe().asCompletionStage();
            }).subscribe().with(sess -> {
            });
        });
        return userId.future();
    }

    protected Context getVertxContext() {
        Context context = Server.getVertxMutiny().getDelegate().getOrCreateContext();
        Context duplicatedContext = VertxContext.getOrCreateDuplicatedContext(context);
        VertxContextSafetyToggle.setContextSafe(duplicatedContext, Boolean.TRUE);
        return duplicatedContext;
    }

    public void setSessionFactory(Mutiny.SessionFactory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }
}
