/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
 *   Copyright 2020-2021 Couchbase, Inc.
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 */

#pragma once

#include <couchbase/build_config.hxx>

#include "core/app_telemetry_meter.hxx"
#include "core/config_listener.hxx"
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
#include "core/columnar/bootstrap_notification_subscriber.hxx"
#endif
#include "core/logger/logger.hxx"
#include "core/metrics/meter_wrapper.hxx"
#include "core/operations/http_noop.hxx"
#include "core/service_type.hxx"
#include "core/tls_context_provider.hxx"
#include "core/tracing/noop_tracer.hxx"
#include "core/tracing/tracer_wrapper.hxx"
#include "http_command.hxx"
#include "http_context.hxx"
#include "http_session.hxx"
#include "http_traits.hxx"

#include <gsl/narrow>

#include <chrono>
#include <optional>
#include <queue>
#include <random>

namespace couchbase::core::io
{

#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
class http_session_manager
  : public std::enable_shared_from_this<http_session_manager>
  , public config_listener
  , public columnar::bootstrap_notification_subscriber
{
#else
class http_session_manager
  : public std::enable_shared_from_this<http_session_manager>
  , public config_listener
{
#endif
public:
  http_session_manager(std::string client_id,
                       asio::io_context& ctx,
                       tls_context_provider& tls,
                       origin& origin)
    : client_id_(std::move(client_id))
    , ctx_(ctx)
    , tls_(tls)
    , origin_(origin)
  {
  }

  void set_tracer(std::shared_ptr<tracing::tracer_wrapper> tracer)
  {
    tracer_ = std::move(tracer);
  }

  void set_meter(std::shared_ptr<metrics::meter_wrapper> meter)
  {
    meter_ = std::move(meter);
  }

  void set_app_telemetry_meter(std::shared_ptr<core::app_telemetry_meter> app_telemetry_meter)
  {
    app_telemetry_meter_ = std::move(app_telemetry_meter);
  }

  auto configuration_capabilities() const -> configuration_capabilities
  {
    std::scoped_lock config_lock(config_mutex_);
    return config_.capabilities;
  }

#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
  void notify_bootstrap_error(const impl::bootstrap_error& error) override
  {
    CB_LOG_DEBUG("Received bootstrap error notification. code={}, ec_message={}, message={}, "
                 "allow_fast_fail={}.",
                 error.ec.value(),
                 error.ec.message(),
                 error.error_message,
                 allow_fast_fail_ ? "true" : "false");
    if (allow_fast_fail_) {
      std::scoped_lock bootstrap_error_lock(last_bootstrap_error_mutex_);
      last_bootstrap_error_ = error;
      drain_deferred_queue(last_bootstrap_error_.value());
    }
  }

  void notify_bootstrap_success(const std::string& session_id) override
  {
    CB_LOG_DEBUG("Received successful bootstrap notification.  Session={}.", session_id);
    std::scoped_lock bootstrap_error_lock(last_bootstrap_error_mutex_);
    allow_fast_fail_ = false;
    last_bootstrap_error_.reset();
  }
#endif

  void update_config(topology::configuration config) override
  {
    {
      std::scoped_lock config_lock(config_mutex_, sessions_mutex_, next_index_mutex_);
      config_ = std::move(config);
      if (!config_.nodes.empty() && next_index_ >= config_.nodes.size()) {
        next_index_ = 0;
      }
      for (auto& [type, sessions] : idle_sessions_) {
        sessions.remove_if([&opts = options_, &cfg = config_](const auto& session) {
          return session && !cfg.has_node(opts.network,
                                          session->type(),
                                          opts.enable_tls,
                                          session->hostname(),
                                          session->port());
        });
      }
    }
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
    drain_deferred_queue({});
#endif
  }

#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
  void set_dispatch_timeout(const std::chrono::milliseconds timeout)
  {
    dispatch_timeout_ = timeout;
  }
#endif

  void set_configuration(const topology::configuration& config, const cluster_options& options)
  {
    std::size_t next_index = 0;
    if (config.nodes.size() > 1) {
      std::random_device rd;
      std::mt19937 gen(rd());
      std::uniform_int_distribution<std::size_t> dis(0, config.nodes.size() - 1);
      next_index = dis(gen);
    }
    {
      std::scoped_lock lock(config_mutex_, next_index_mutex_);
      options_ = options;
      next_index_ = next_index;
      config_ = config;
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
      if (!configured_) {
        configured_ = true;
      }
      if (allow_fast_fail_) {
        allow_fast_fail_ = false;
      }
#endif
    }
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
    drain_deferred_queue({});
#endif
  }

  void export_diag_info(diag::diagnostics_result& res)
  {
    std::scoped_lock lock(sessions_mutex_);

    for (const auto& [type, sessions] : busy_sessions_) {
      for (const auto& session : sessions) {
        if (session) {
          res.services[type].emplace_back(session->diag_info());
        }
      }
    }
    for (const auto& [type, sessions] : idle_sessions_) {
      for (const auto& session : sessions) {
        if (session) {
          res.services[type].emplace_back(session->diag_info());
        }
      }
    }
  }

  template<typename Collector>
  void ping(std::set<service_type> services,
            std::optional<std::chrono::milliseconds> timeout,
            std::shared_ptr<Collector> collector)
  {
    std::array known_types{
      service_type::query, service_type::analytics, service_type::search,
      service_type::view,  service_type::eventing,  service_type::management,
    };
    std::vector<topology::configuration::node> nodes{};
    {
      std::scoped_lock lock(config_mutex_);
      nodes = config_.nodes;
    }
    for (const auto& node : nodes) {
      for (auto type : known_types) {
        if (services.find(type) == services.end()) {
          continue;
        }
        std::uint16_t port = node.port_or(options_.network, type, options_.enable_tls, 0);
        if (port != 0) {
          const auto& hostname = node.hostname_for(options_.network);
          const auto& canonical_hostname = node.hostname;
          std::uint16_t canonical_port = node.port_or(type, options_.enable_tls, 0);
          auto session = create_session(type,
                                        node_details{
                                          hostname,
                                          port,
                                          node.node_uuid,
                                          canonical_hostname,
                                          canonical_port,
                                        });
          if (session->is_connected()) {
            std::scoped_lock lock(sessions_mutex_);
            busy_sessions_[type].push_back(session);
          }
          operations::http_noop_request request{};
          request.type = type;
          request.timeout = timeout;
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
          auto cmd = std::make_shared<operations::http_command<operations::http_noop_request>>(
            ctx_,
            request,
            tracer_,
            meter_,
            app_telemetry_meter_,
            options_.default_timeout_for(request.type),
            dispatch_timeout_);
#else
          auto cmd = std::make_shared<operations::http_command<operations::http_noop_request>>(
            ctx_,
            request,
            tracer_,
            meter_,
            app_telemetry_meter_,
            options_.default_timeout_for(request.type));
#endif
          cmd->start([start = std::chrono::steady_clock::now(),
                      self = shared_from_this(),
                      type,
                      cmd,
                      handler =
                        collector->build_reporter()](operations::http_noop_response&& resp) {
            diag::ping_state state = diag::ping_state::ok;
            std::optional<std::string> error{};
            if (auto ec = resp.ctx.ec; ec) {
              if (ec == errc::common::unambiguous_timeout ||
                  ec == errc::common::ambiguous_timeout) {
                state = diag::ping_state::timeout;
              } else {
                state = diag::ping_state::error;
              }
              error.emplace(fmt::format("code={}, message={}, http_code={}",
                                        ec.value(),
                                        ec.message(),
                                        resp.ctx.http_status));
            }
            auto remote_address = cmd->session_->remote_address();
            // If not connected, the remote address will be empty.  Better to
            // give the user some context on the "attempted" remote address.
            if (remote_address.empty()) {
              remote_address =
                fmt::format("{}:{}", cmd->session_->hostname(), cmd->session_->port());
            }
            handler->report(
              diag::endpoint_ping_info{ type,
                                        cmd->session_->id(),
                                        std::chrono::duration_cast<std::chrono::microseconds>(
                                          std::chrono::steady_clock::now() - start),
                                        remote_address,
                                        cmd->session_->local_address(),
                                        state,
                                        {},
                                        error });
            self->check_in(type, cmd->session_);
          });

          cmd->set_command_session(session);
          if (!session->is_connected()) {
            connect_then_send(session, cmd, {}, true);
          } else {
            cmd->send_to();
          }
        }
      }
    }
  }

  struct node_details {
    std::string hostname{};
    std::uint16_t port{};
    std::string node_uuid{};
    std::string canonical_hostname{};
    std::uint16_t canonical_port{};
  };

  auto check_out(service_type type,
                 std::string preferred_node_address,
                 const std::string& undesired_node_address = {})
    -> std::pair<std::error_code, std::shared_ptr<http_session>>
  {
    node_details preferred_node{};

    if (preferred_node_address.empty() && !undesired_node_address.empty()) {
      // This is currently only used by Columnar to avoid the node
      // that was last used and to satisfy the requirement for retrying on a different randomly
      // selected node (as opposed to any node we have an idle session to).
      if (auto n = pick_random_node(type, undesired_node_address); n.port != 0) {
        preferred_node_address = fmt::format("{}:{}", n.hostname, n.port);
        preferred_node = std::move(n);
      }
    } else if (!preferred_node_address.empty()) {
      // A 'sticky' node was specified. We should populate the node details.
      preferred_node = lookup_node(type, preferred_node_address);
      if (preferred_node.port == 0) {
        return { errc::common::service_not_available, nullptr };
      }
    }

    const std::scoped_lock lock(sessions_mutex_);
    idle_sessions_[type].remove_if([](const auto& s) {
      return !s;
    });
    busy_sessions_[type].remove_if([](const auto& s) {
      return !s;
    });
    pending_sessions_[type].remove_if([](const auto& s) {
      return !s;
    });
    std::shared_ptr<http_session> session{};
    while (!idle_sessions_[type].empty()) {
      if (preferred_node_address.empty()) {
        session = idle_sessions_[type].front();
        idle_sessions_[type].pop_front();
        if (session->reset_idle()) {
          break;
        }
      } else {
        auto ptr = std::find_if(idle_sessions_[type].begin(),
                                idle_sessions_[type].end(),
                                [&preferred_node_address,
                                 &h = preferred_node.hostname,
                                 &p = preferred_node.port](const auto& s) {
                                  // Check for a match using both the unresolved hostname & IP
                                  // address
                                  return (s->remote_address() == preferred_node_address ||
                                          (s->hostname() == h && s->port() == std::to_string(p)));
                                });
        if (ptr != idle_sessions_[type].end()) {
          session = *ptr;
          idle_sessions_[type].erase(ptr);
          if (session->reset_idle()) {
            break;
          }
        } else {
          session = create_session(type, preferred_node);
          break;
        }
      }
      CB_LOG_TRACE(
        "{} Idle timer has expired for \"{}:{}\".  Attempting to select another session.",
        session->log_prefix(),
        session->hostname(),
        session->port());
      session.reset();
    }
    if (!session) {
      auto node = preferred_node_address.empty() ? next_node(type)
                                                 : lookup_node(type, preferred_node_address);
      if (node.port == 0) {
        return { errc::common::service_not_available, nullptr };
      }
      session = create_session(type, node);
    }
    if (session->is_connected()) {
      busy_sessions_[type].push_back(session);
    } else {
      pending_sessions_[type].push_back(session);
    }
    return { {}, session };
  }

  void check_in(service_type type, std::shared_ptr<http_session> session)
  {
    if (!session) {
      return;
    }
    if (!session->is_connected()) {
      CB_LOG_DEBUG("{} HTTP session never connected.  Skipping check-in", session->log_prefix());
      return session.reset();
    }
    {
      std::scoped_lock lock(config_mutex_);
      if (!session->keep_alive() || !config_.has_node(options_.network,
                                                      session->type(),
                                                      options_.enable_tls,
                                                      session->hostname(),
                                                      session->port())) {
        return asio::post(session->get_executor(), [session]() {
          session->stop();
        });
      }
    }
    if (!session->is_stopped()) {
      session->set_idle(options_.idle_http_connection_timeout);
      CB_LOG_DEBUG("{} put HTTP session back to idle connections", session->log_prefix());
      std::scoped_lock lock(sessions_mutex_);
      idle_sessions_[type].push_back(session);
      busy_sessions_[type].remove_if([id = session->id()](const auto& s) -> bool {
        return !s || s->id() == id;
      });
      pending_sessions_[type].remove_if([id = session->id()](const auto& s) -> bool {
        return !s || s->id() == id;
      });
    }
  }

  void close()
  {
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
    drain_deferred_queue(errc::common::request_canceled);
#endif
    std::map<service_type, std::list<std::shared_ptr<http_session>>> busy_sessions, idle_sessions,
      pending_sessions;
    {
      const std::scoped_lock lock(sessions_mutex_);
      busy_sessions = std::move(busy_sessions_);
      idle_sessions = std::move(idle_sessions_);
      pending_sessions = std::move(pending_sessions_);
    }
    for (auto& [type, sessions] : idle_sessions) {
      for (auto& s : sessions) {
        if (s) {
          s->reset_idle();
          s.reset();
        }
      }
    }
    for (auto& [type, sessions] : busy_sessions) {
      for (auto& s : sessions) {
        if (s) {
          s->stop();
        }
      }
    }
    for (auto& [type, sessions] : pending_sessions) {
      for (auto& s : sessions) {
        if (s) {
          s->stop();
        }
      }
    }
  }

  template<typename Request, typename Handler>
  void execute(Request request, Handler&& handler)
  {
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
    if (!configured_) {
      return defer_command(request, std::move(handler));
    }
#endif
    std::string preferred_node;
    if constexpr (http_traits::supports_sticky_node_v<Request>) {
      if (request.send_to_node) {
        preferred_node = *request.send_to_node;
      }
    }
    auto [error, session] = check_out(request.type, preferred_node);
    if (error) {
      typename Request::error_context_type ctx{};
      ctx.ec = error;
      using encoded_response_type = typename Request::encoded_response_type;
      return handler(request.make_response(std::move(ctx), encoded_response_type{}));
    }

#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
    auto cmd = std::make_shared<operations::http_command<Request>>(
      ctx_,
      request,
      tracer_,
      meter_,
      app_telemetry_meter_,
      options_.default_timeout_for(request.type),
      dispatch_timeout_);
#else
    auto cmd = std::make_shared<operations::http_command<Request>>(
      ctx_,
      request,
      tracer_,
      meter_,
      app_telemetry_meter_,
      options_.default_timeout_for(request.type));
#endif

    using response_type = typename Request::response_type;
    cmd->start([self = shared_from_this(), cmd, handler = std::forward<Handler>(handler)](
                 response_type&& resp) mutable {
      handler(std::move(resp));
      self->check_in(cmd->request.type, cmd->session_);
    });
    cmd->set_command_session(session);
    if (!session->is_connected()) {
      connect_then_send(session, cmd, preferred_node);
    } else {
      cmd->send_to();
    }
  }

#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
  void connect_then_send_pending_op(
    std::shared_ptr<http_session> session,
    const std::string& preferred_node,
    std::chrono::time_point<std::chrono::steady_clock> dispatch_deadline,
    std::chrono::time_point<std::chrono::steady_clock> deadline,
    utils::movable_function<void(std::error_code, std::shared_ptr<http_session>)> callback)
#else
  void connect_then_send_pending_op(
    std::shared_ptr<http_session> session,
    const std::string& preferred_node,
    std::chrono::time_point<std::chrono::steady_clock> deadline,
    utils::movable_function<void(std::error_code, std::shared_ptr<http_session>)> callback)
#endif
  {
    session->connect([self = shared_from_this(),
                      session,
                      preferred_node,
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
                      dispatch_deadline,
#endif
                      deadline,
                      cb = std::move(callback)]() mutable {
      if (!session->is_connected()) {
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
        auto now = std::chrono::steady_clock::now();
        if (session->is_stopped()) {
          // Session was forced to stop (e.g. due to cluster being closed or cancellation)
          cb(errc::common::request_canceled, {});
          return;
        }
        if (dispatch_deadline < now || deadline < now) {
          session->stop();
          cb(errc::common::unambiguous_timeout, {});
          return;
        }
#else
        if (deadline < std::chrono::steady_clock::now()) {
          session->stop();
          return cb(errc::common::unambiguous_timeout, {});
          return;
        }
#endif

        // stop this session and create a new one w/ new hostname + port
        session->stop();
        const auto node = preferred_node.empty()
                            ? self->next_node(session->type())
                            : self->lookup_node(session->type(), preferred_node);
        if (node.port == 0) {
          cb(errc::common::service_not_available, {});
          return;
        }
        auto new_session = self->create_session(session->type(), node);
        if (new_session->is_connected()) {
          {
            const std::scoped_lock inner_lock(self->sessions_mutex_);
            self->busy_sessions_[new_session->type()].push_back(new_session);
          }
          cb({}, new_session);
        } else {
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
          {
            const std::scoped_lock inner_lock(self->sessions_mutex_);
            self->pending_sessions_[new_session->type()].push_back(new_session);
          }
          self->connect_then_send_pending_op(
            new_session, preferred_node, dispatch_deadline, deadline, cb);
#else
          self->connect_then_send_pending_op(new_session, preferred_node, deadline, cb);
#endif
        }
      } else {
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
        auto now = std::chrono::steady_clock::now();
        if (dispatch_deadline < now || deadline < now) {
          session->stop();
          cb(errc::common::unambiguous_timeout, {});
          return;
        }
#else
        if (deadline < std::chrono::steady_clock::now()) {
          session->stop();
          cb(errc::common::unambiguous_timeout, {});
          return;
        }
#endif
        {
          const std::scoped_lock inner_lock(self->sessions_mutex_);
          self->busy_sessions_[session->type()].push_back(session);
          self->pending_sessions_[session->type()].remove_if(
            [id = session->id()](const auto& s) -> bool {
              return !s || s->id() == id;
            });
        }
        cb({}, session);
      }
    });
  }

#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
  void add_to_deferred_queue(utils::movable_function<void(error_union)> command)
  {
    const std::scoped_lock lock_for_deferred_commands(deferred_commands_mutex_);
    deferred_commands_.emplace(std::move(command));
  }

  [[nodiscard]] auto dispatch_timeout() const -> std::chrono::milliseconds
  {
    return dispatch_timeout_;
  }

  [[nodiscard]] auto is_configured() const -> bool
  {
    return configured_;
  }

  [[nodiscard]] auto last_bootstrap_error() const -> std::optional<impl::bootstrap_error>
  {
    return last_bootstrap_error_;
  }
#endif

private:
  template<typename Request>
  void connect_then_send(std::shared_ptr<http_session> session,
                         std::shared_ptr<operations::http_command<Request>> cmd,
                         const std::string& preferred_node,
                         bool reuse_session = false)
  {
    session->connect([self = shared_from_this(),
                      session,
                      cmd,
                      preferred_node = std::move(preferred_node),
                      reuse_session]() mutable {
      if (!session->is_connected()) {
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
        auto now = std::chrono::steady_clock::now();
        if (cmd->dispatch_deadline_expiry() < now || cmd->deadline_expiry() < now) {
          // The http command will stop its session when the deadline expires.
          return;
        }
#else
        if (cmd->deadline_expiry() < std::chrono::steady_clock::now()) {
          // The http command will stop its session when the deadline expires.
          return;
        }
#endif
        if (reuse_session) {
          return self->connect_then_send(session, cmd, preferred_node, reuse_session);
        }
        // stop this session and create a new one w/ new hostname + port
        session->stop();
        const auto node = preferred_node.empty()
                            ? self->next_node(session->type())
                            : self->lookup_node(session->type(), preferred_node);
        if (node.port == 0) {
          cmd->invoke_handler(errc::common::service_not_available, {});
          return;
        }
        auto new_session = self->create_session(session->type(), node);
        cmd->set_command_session(new_session);
        if (new_session->is_connected()) {
          std::scoped_lock inner_lock(self->sessions_mutex_);
          self->busy_sessions_[new_session->type()].push_back(new_session);
          cmd->send_to();
        } else {
          self->connect_then_send(new_session, cmd, preferred_node);
        }
      } else {
        std::scoped_lock inner_lock(self->sessions_mutex_);
        self->busy_sessions_[session->type()].push_back(session);
        cmd->send_to();
      }
    });
  }

  auto create_session(service_type type, const node_details& node) -> std::shared_ptr<http_session>
  {
    std::shared_ptr<http_session> session;
    if (options_.enable_tls) {
      session = std::make_shared<http_session>(type,
                                               client_id_,
                                               node.node_uuid,
                                               ctx_,
                                               tls_,
                                               origin_,
                                               node.hostname,
                                               std::to_string(node.port),
                                               http_context{
                                                 config_,
                                                 options_,
                                                 query_cache_,
                                                 node.hostname,
                                                 node.port,
                                                 node.canonical_hostname,
                                                 node.canonical_port,
                                               });
    } else {
      session = std::make_shared<http_session>(type,
                                               client_id_,
                                               node.node_uuid,
                                               ctx_,
                                               origin_,
                                               node.hostname,
                                               std::to_string(node.port),
                                               http_context{
                                                 config_,
                                                 options_,
                                                 query_cache_,
                                                 node.hostname,
                                                 node.port,
                                                 node.canonical_hostname,
                                                 node.canonical_port,
                                               });
    }

    session->on_stop([type, id = session->id(), self = this->shared_from_this()]() {
      const std::scoped_lock inner_lock(self->sessions_mutex_);
      self->busy_sessions_[type].remove_if([&id](const auto& s) {
        return !s || s->id() == id;
      });
      self->idle_sessions_[type].remove_if([&id](const auto& s) {
        return !s || s->id() == id;
      });
    });
    return session;
  }

#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
  template<typename Request, typename Handler>
  void defer_command(Request request, Handler&& handler)
  {
    {
      std::scoped_lock bootstrap_error_lock(last_bootstrap_error_mutex_);
      if (last_bootstrap_error_.has_value()) {
        typename Request::error_context_type ctx{};
        ctx.ec = last_bootstrap_error_.value().ec;
        using response_type = typename Request::encoded_response_type;
        return handler(request.make_response(std::move(ctx), response_type{}));
      }
    }
    auto cmd = std::make_shared<operations::http_command<Request>>(
      ctx_,
      request,
      tracer_,
      meter_,
      app_telemetry_meter_,
      options_.default_timeout_for(request.type),
      dispatch_timeout_);
    using response_type = typename Request::response_type;
    cmd->start([self = shared_from_this(), cmd, handler = std::forward<Handler>(handler)](
                 response_type&& resp) mutable {
      handler(std::move(resp));
      self->check_in(cmd->request.type, cmd->session_);
    });
    CB_LOG_DEBUG(R"(Adding HTTP request to deferred queue: {}, client_context_id="{}")",
                 cmd->request.type,
                 cmd->client_context_id_);
    add_to_deferred_queue([self = shared_from_this(), cmd, request](error_union err) mutable {
      if (!std::holds_alternative<std::monostate>(err)) {
        using response_type = typename Request::encoded_response_type;
        return cmd->invoke_handler(err, response_type{});
      }

      // don't do anything if the command wasn't dispatched or has already timed out
      auto now = std::chrono::steady_clock::now();
      if (cmd->dispatch_deadline_expiry() < now || cmd->deadline_expiry() < now) {
        return;
      }
      std::string preferred_node;
      if constexpr (http_traits::supports_sticky_node_v<Request>) {
        if (request.send_to_node) {
          preferred_node = *request.send_to_node;
        }
      }
      auto [error, session] = self->check_out(request.type, preferred_node);
      if (error) {
        using response_type = typename Request::encoded_response_type;
        return cmd->invoke_handler(error, response_type{});
      }
      cmd->set_command_session(session);
      if (!session->is_connected()) {
        self->connect_then_send(session, cmd, preferred_node);
      } else {
        cmd->send_to();
      }
    });
  }

  void drain_deferred_queue(error_union err)
  {
    std::queue<utils::movable_function<void(error_union)>> commands{};
    {
      const std::scoped_lock lock(deferred_commands_mutex_);
      std::swap(deferred_commands_, commands);
    }
    if (!commands.empty()) {
      CB_LOG_TRACE("Draining deferred operation queue, size={}", commands.size());
    }
    while (!commands.empty()) {
      commands.front()(err);
      commands.pop();
    }
  }
#endif

  auto next_node(service_type type) -> node_details
  {
    std::scoped_lock lock(config_mutex_);
    auto candidates = config_.nodes.size();
    while (candidates > 0) {
      --candidates;
      const std::scoped_lock index_lock(next_index_mutex_);
      const auto& node = config_.nodes[next_index_];
      next_index_ = (next_index_ + 1) % config_.nodes.size();
      const std::uint16_t port = node.port_or(options_.network, type, options_.enable_tls, 0);
      if (port != 0) {
        return {
          node.hostname_for(options_.network),        port, node.node_uuid, node.hostname,
          node.port_or(type, options_.enable_tls, 0),
        };
      }
    }
    return {};
  }

  auto split_host_port(const std::string& address) -> std::pair<std::string, std::uint16_t>
  {
    const auto last_colon = address.find_last_of(':');
    if (last_colon == std::string::npos || address.size() - 1 == last_colon) {
      return { "", static_cast<std::uint16_t>(0U) };
    }
    auto hostname = address.substr(0, last_colon);
    auto port = gsl::narrow_cast<std::uint16_t>(std::stoul(address.substr(last_colon + 1)));
    return { hostname, port };
  }

  auto lookup_node(service_type type, const std::string& preferred_node_address) -> node_details
  {
    const std::scoped_lock lock(config_mutex_);
    auto [hostname, port] = split_host_port(preferred_node_address);
    for (const auto& node : config_.nodes) {
      if (node.hostname_for(options_.network) == hostname &&
          node.port_or(options_.network, type, options_.enable_tls, 0) == port) {
        return {
          hostname, port, node.node_uuid, node.hostname, node.port_or(type, options_.enable_tls, 0),
        };
      }
    }
    return {};
  }

  auto pick_random_node(service_type type, const std::string& undesired_node) -> node_details
  {
    std::vector<topology::configuration::node> candidate_nodes{};
    {
      const std::scoped_lock lock(config_mutex_);
      std::copy_if(config_.nodes.begin(),
                   config_.nodes.end(),
                   std::back_inserter(candidate_nodes),
                   [this, type, &undesired_node](const topology::configuration::node& node) {
                     auto endpoint = node.endpoint(options_.network, type, options_.enable_tls);
                     return endpoint.has_value() && (endpoint.value() != undesired_node);
                   });
    }

    if (candidate_nodes.empty()) {
      // Could not find any other nodes
      return {};
    }

    std::vector<topology::configuration::node> selected{};
    std::sample(candidate_nodes.begin(),
                candidate_nodes.end(),
                std::back_inserter(selected),
                1,
                std::mt19937{ std::random_device{}() });
    const auto& first_selected = selected.at(0);
    return {
      first_selected.hostname_for(options_.network),
      first_selected.port_or(options_.network, type, options_.enable_tls, 0),
      first_selected.node_uuid,
      first_selected.hostname,
      first_selected.port_or(type, options_.enable_tls, 0),
    };
  }

  std::string client_id_;
  asio::io_context& ctx_;
  tls_context_provider& tls_;
  origin& origin_;
  std::shared_ptr<tracing::tracer_wrapper> tracer_{ nullptr };
  std::shared_ptr<metrics::meter_wrapper> meter_{ nullptr };
  std::shared_ptr<core::app_telemetry_meter> app_telemetry_meter_{ nullptr };
  cluster_options options_{};

  topology::configuration config_{};
  mutable std::mutex config_mutex_{};
  std::map<service_type, std::list<std::shared_ptr<http_session>>> busy_sessions_{};
  std::map<service_type, std::list<std::shared_ptr<http_session>>> idle_sessions_{};
  std::map<service_type, std::list<std::shared_ptr<http_session>>> pending_sessions_{};
  std::size_t next_index_{ 0 };
  std::mutex next_index_mutex_{};
  std::mutex sessions_mutex_{};
  query_cache query_cache_{};
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
  std::atomic_bool configured_{ false };
  std::chrono::milliseconds dispatch_timeout_{};
  std::atomic_bool allow_fast_fail_{ true };
  std::queue<utils::movable_function<void(error_union)>> deferred_commands_{};
  std::mutex deferred_commands_mutex_{};
  std::optional<impl::bootstrap_error> last_bootstrap_error_;
  std::mutex last_bootstrap_error_mutex_{};
#endif
};
} // namespace couchbase::core::io
