#!/usr/bin/env ruby

#  Copyright 2020-2021 Couchbase, Inc.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

require "set"
require "timeout"
require "optparse"
require "openssl"

class Object
  def to_b
    ![nil, false, 0, "", "0", "f", "F", "false", "FALSE", "off", "OFF", "no", "NO"].include?(self)
  end
end

options = {
  verbose: ENV.fetch("CB_VERBOSE", true).to_b,
  strict_encryption: ENV.fetch("CB_STRICT_ENCRYPTION", false).to_b,
  host: ENV.fetch("CB_HOST", "127.0.0.1"),
  username: ENV.fetch("CB_USERNAME", "Administrator"),
  password: ENV.fetch("CB_PASSWORD", "password"),
  bucket: ENV.fetch("CB_BUCKET", "default"),
  sec_bucket: ENV.fetch("CB_SEC_BUCKET", ""),
  server_quota: ENV.fetch("CB_SERVER_QUOTA", 3072).to_i,
  index_quota: ENV.fetch("CB_INDEX_QUOTA", 256).to_i,
  fts_quota: ENV.fetch("CB_FTS_QUOTA", 512).to_i,
  bucket_quota: ENV.fetch("CB_BUCKET_QUOTA", 256).to_i,
  enable_developer_preview: ENV.fetch("CB_DEVELOPER_PREVIEW", false).to_b,
  cluster_run_nodes: ENV.fetch("CB_CLUSTER_RUN_NODES", 0).to_i,
  sample_buckets: Set.new,
}
default_port = options[:strict_encryption] ? 18091 : 8091
if (options[:cluster_run_nodes]).positive?
  default_port = options[:strict_encryption] ? 19000 : 9000
end
options[:port] = ENV.fetch("CB_PORT", default_port).to_i
options[:sample_buckets] << "beer-sample" if ENV.fetch("CB_BEER_SAMPLE", false).to_b
options[:sample_buckets] << "travel-sample" if ENV.fetch("CB_TRAVEL_SAMPLE", false).to_b

require "net/http"
require "json"

class API
  def initialize(options)
    @options = options
    connect
  end

  def connect
    @client = Net::HTTP.start(@options[:host], @options[:port],
                              use_ssl: @options[:strict_encryption],
                              verify_mode: OpenSSL::SSL::VERIFY_NONE)
  end

  def url(path)
    "#{@client.use_ssl? ? 'https' : 'http'}://#{@options[:username]}@#{@options[:host]}:#{@options[:port]}#{path}"
  end

  def decode_response(response)
    payload =
      if /application\/json/.match?(response['content-type'])
        JSON.parse(response.body)
      else
        response.body
      end
    p payload if @options[:verbose]
    payload
  end

  def setup_request(request)
    request.basic_auth(@options[:username], @options[:password])
    request['accept'] = "application/json"
  end

  def get(path)
    puts "# GET #{url(path)}"
    req = Net::HTTP::Get.new(path)
    setup_request(req)
    res = @client.request(req)
    decode_response(res)
  rescue EOFError
    connect
    retry
  rescue => ex
    puts "#{__method__}: #{ex}, sleep for 1 second and retry"
    sleep(1)
    retry
  end

  def post_form(path, fields = {})
    puts "# POST #{url(path)} #{fields}"
    req = Net::HTTP::Post.new(path)
    setup_request(req)
    req.form_data = fields
    res = @client.request(req)
    decode_response(res)
  rescue EOFError
    connect
    retry
  rescue => ex
    puts "#{__method__}: #{ex}, sleep for 1 second and retry"
    sleep(1)
    retry
  end

  def post_json(path, object)
    data = JSON.generate(object)
    puts "# POST #{url(path)} #{data}"
    req = Net::HTTP::Post.new(path)
    req['content-type'] = "application/json"
    setup_request(req)
    res = @client.request(req, data)
    decode_response(res)
  rescue EOFError
    connect
    retry
  rescue => ex
    puts "#{__method__}: #{ex}, sleep for 1 second and retry"
    sleep(1)
    retry
  end

  def put_json(path, object)
    data = JSON.generate(object)
    puts "# PUT #{url(path)} #{data}"
    req = Net::HTTP::Put.new(path)
    req['content-type'] = "application/json"
    setup_request(req)
    res = @client.request(req, data)
    decode_response(res)
  rescue EOFError
    connect
    retry
  rescue => ex
    puts "#{__method__}: #{ex}, sleep for 1 second and retry"
    sleep(1)
    retry
  end
end

api = 
  begin
    API.new(options)
  rescue => ex
    puts "#{ex}, sleep for 1 second and retry"
    sleep(1)
    retry
  end

resp = api.get("/pools")
services = resp['allowedServices']

api.post_form("/pools/default",
              memoryQuota: options[:server_quota],
              indexMemoryQuota: options[:index_quota],
              ftsMemoryQuota: options[:fts_quota])
api.post_form("/node/controller/setupServices", services: services.join(","))

if options[:strict_encryption]
  api.post_form("/settings/autoFailover", enabled: false)
  api.post_form("/node/controller/enableExternalListener", nodeEncryption: "on")
  api.post_form("/node/controller/setupNetConfig", nodeEncryption: "on")
  api.post_form("/node/controller/disableUnusedExternalListeners")
  api.post_form("/settings/security", clusterEncryptionLevel: "strict")
end

api.post_form("/settings/web",
              password: options[:password],
              username: options[:username],
              port: "SAME")
api.post_form("/settings/indexes", storageMode: "plasma")

if options[:cluster_run_nodes] > 1
  known_nodes = []
  (1...options[:cluster_run_nodes]).each do |index|
    port = options[:port] + index
    port += 10_000 unless options[:strict_encryption]
    res = api.post_form("/pools/default/serverGroups/0/addNode",
                        hostname: "#{options[:host]}:#{port}",
                        services: services.join(","),
                        user: options[:username],
                        password: options[:password])
    known_nodes << res["otpNode"]
  end
  config = api.get("/pools/default")
  known_nodes << config["nodes"][0]["otpNode"]
  api.post_form("/controller/rebalance", knownNodes: known_nodes.join(","))

  rebalance_running = true
  while rebalance_running
    res = api.get("/pools/default/tasks")
    res.each do |task|
      if task["type"] == "rebalance" && task["status"] == "notRunning"
        rebalance_running = false
        break
      end
      sleep 1
    end
  end
end

number_of_replicas = 0
number_of_replicas = [options[:cluster_run_nodes] - 1, 3].min if options[:cluster_run_nodes] > 1
api.post_form("/pools/default/buckets",
              flushEnabled: 1,
              threadsNumber: 3,
              replicaIndex: 0,
              replicaNumber: number_of_replicas,
              evictionPolicy: "valueOnly",
              ramQuotaMB: options[:bucket_quota],
              bucketType: "membase",
              name: options[:bucket])

if options[:sec_bucket].to_b 
    api.post_form("/pools/default/buckets",
              flushEnabled: 1,
              threadsNumber: 3,
              replicaIndex: 0,
              replicaNumber: number_of_replicas,
              evictionPolicy: "valueOnly",
              ramQuotaMB: options[:bucket_quota],
              bucketType: "membase",
              name: options[:sec_bucket])
end

api.post_json("/sampleBuckets/install", options[:sample_buckets].to_a) if options[:sample_buckets].any?

api.post_form("/settings/developerPreview", enabled: true) if options[:enable_developer_preview]

def service_port_for_bucket(api, service, bucket, options)
  port_key = options[:strict_encryption] ? "#{service}SSL" : service
  port = 0
  while port.zero?
    config = api.get("/pools/default/b/#{bucket}")
    port = config["nodesExt"].find{|n| n["services"].key?(port_key)}["services"][port_key].to_i rescue 0
    sleep 1
  end
  port
end

query_api = API.new(options.merge(port: service_port_for_bucket(api, "n1ql", options[:bucket], options)))
query_api.post_form("/query/service", statement: "CREATE PRIMARY INDEX ON `#{options[:bucket]}` USING GSI")
options[:sample_buckets].each do |bucket|
  service_port_for_bucket(api, "n1ql", bucket, options)
  query_api.post_form("/query/service", statement: "CREATE PRIMARY INDEX ON `#{bucket}` USING GSI")
end

puts service_port_for_bucket(api, "fts", options[:bucket], options)

if options[:sample_buckets].include?("travel-sample") && services.include?("fts")
  search_api = API.new(options.merge(port: service_port_for_bucket(api, "fts", options[:bucket], options)))

  has_v6_nodes = 
    api.get("/pools/default")["nodes"]
    .any? { |node| node["version"] =~ /^6\./ && node["services"].include?("fts") }

  index_definition_path = File.join(__dir__, "travel-sample-index#{"-v6" if has_v6_nodes}.json")
  puts "using index definition: #{File.basename(index_definition_path)}"
  index_definition = JSON.load(File.read(index_definition_path))
  search_api.put_json("/api/index/travel-sample-index", index_definition)

  expected_number_of_the_documents = 1000
  indexed_documents = 0
  Timeout.timeout(10 * 60) do # give it 10 minutes to index
    while indexed_documents < expected_number_of_the_documents
      resp = search_api.get("/api/index/travel-sample-index/count")
      indexed_documents = resp['count'].to_i
      sleep 1
    end
  end
end

