import { FileSystem } from "@effect/platform";
import type { SystemError } from "@effect/platform/Error";
import { Cache, Effect, Ref } from "effect";
import * as Schema from "effect/Schema";
import { Json } from "../layers/json.js";
import { DatabaseError } from "../core/errors.js";

export const BTreeNodeSchema = <K>(key_schema: Schema.Schema<any, K>) =>
  Schema.mutable(
    Schema.Struct({
      id: Schema.String,
      is_leaf: Schema.Boolean,
      keys: Schema.mutable(Schema.Array(key_schema)),
      values: Schema.mutable(Schema.Array(Schema.String)),
      children: Schema.mutable(Schema.Array(Schema.String))
    })
  );

export type BTreeNode<K> = Schema.Schema.Type<
  ReturnType<typeof BTreeNodeSchema<K>>
>;

export const RootPointerSchema = Schema.mutable(
  Schema.Struct({ root_id: Schema.String })
);
export type RootPointer = Schema.Schema.Type<typeof RootPointerSchema>;

export const makeBtreeService = <K>(
  three_path: string,
  key_schema: Schema.Schema<any, K>,
  order: number,
  cacheCapacity: number = 1000
) =>
  Effect.gen(function* () {
    const fs = yield* FileSystem.FileSystem;
    const jsonService = yield* Json;
    const node_schema = BTreeNodeSchema(key_schema);
    const root_pointer_path = `${three_path}/_root.json`;

    const semaphore = yield* Effect.makeSemaphore(1);

    yield* fs.makeDirectory(three_path, { recursive: true });

    const cache = yield* Cache.make({
      capacity: cacheCapacity,
      timeToLive: "24 hours",
      lookup: (id: string) =>
        fs.readFileString(`${three_path}/${id}.json`).pipe(
          Effect.flatMap(jsonService.parse),
          Effect.flatMap((data) => Schema.decode(node_schema)(data)),
          Effect.mapError(
            (e) =>
              new DatabaseError({
                message: `Failed to read node ${id}`,
                cause: e
              })
          )
        )
    });

    const readNode = (id: string) => cache.get(id);

    const writeNode = (node: BTreeNode<K>) =>
      Schema.encode(node_schema)(node).pipe(
        Effect.flatMap(jsonService.stringify),
        Effect.flatMap((content) =>
          fs.writeFileString(`${three_path}/${node.id}.json`, content)
        ),
        Effect.zipLeft(cache.invalidate(node.id)),
        Effect.mapError(
          (e) =>
            new DatabaseError({
              message: `Failed to write node ${node.id}`,
              cause: e
            })
        )
      );

    const createNode = (is_leaf: boolean) =>
      Effect.sync(
        () =>
          ({
            id: crypto.randomUUID() as string,
            is_leaf,
            keys: [],
            values: [],
            children: []
          }) as BTreeNode<K>
      ).pipe(Effect.tap(writeNode));

    const rootIdRef = yield* Effect.gen(function* () {
      const content = yield* fs.readFileString(root_pointer_path);
      const json = yield* jsonService.parse(content);
      const pointer = yield* Schema.decode(RootPointerSchema)(json);
      return yield* Ref.make(pointer.root_id);
    }).pipe(
      Effect.catchTag("SystemError", (e) =>
        e.reason === "NotFound"
          ? Effect.gen(function* () {
              const root_node = yield* createNode(true);
              const pointer: RootPointer = { root_id: root_node.id };
              const content = yield* jsonService.stringify(pointer);
              yield* fs.writeFileString(root_pointer_path, content);
              return yield* Ref.make(root_node.id);
            })
          : Effect.fail(e)
      )
    );

    const updateRootId = (new_root_id: string) =>
      Ref.set(rootIdRef, new_root_id).pipe(
        Effect.flatMap(() => jsonService.stringify({ root_id: new_root_id })),
        Effect.flatMap((content) =>
          fs.writeFileString(root_pointer_path, content)
        )
      );

    const splitChild = (
      parent: BTreeNode<K>,
      child_index: number,
      full_child: BTreeNode<K>
    ) =>
      Effect.gen(function* () {
        const new_sibling = yield* createNode(full_child.is_leaf);
        const median_index = order - 1;

        parent.keys.splice(child_index, 0, full_child.keys[median_index]);
        parent.values.splice(child_index, 0, full_child.values[median_index]);
        parent.children.splice(child_index + 1, 0, new_sibling.id);

        new_sibling.keys = full_child.keys.slice(median_index + 1);
        new_sibling.values = full_child.values.slice(median_index + 1);
        if (!full_child.is_leaf) {
          new_sibling.children = full_child.children.slice(median_index + 1);
        }

        full_child.keys = full_child.keys.slice(0, median_index);
        full_child.values = full_child.values.slice(0, median_index);
        if (!full_child.is_leaf) {
          full_child.children = full_child.children.slice(0, median_index + 1);
        }

        yield* Effect.all(
          [writeNode(parent), writeNode(full_child), writeNode(new_sibling)],
          { concurrency: "inherit" }
        );
      });

    const insertNonFull = (
      node: BTreeNode<K>,
      key: K,
      value: string
    ): Effect.Effect<void, DatabaseError | SystemError> =>
      Effect.gen(function* () {
        let i = node.keys.length - 1;
        if (node.is_leaf) {
          while (i >= 0 && key < node.keys[i]) {
            i--;
          }
          (node.keys as K[]).splice(i + 1, 0, key);
          (node.values as string[]).splice(i + 1, 0, value);

          yield* writeNode(node);
        } else {
          while (i >= 0 && key < node.keys[i]) {
            i--;
          }

          i++;

          let child = yield* readNode(node.children[i]);
          if (child.keys.length === 2 * order - 1) {
            yield* splitChild(node, i, child);

            if (key > node.keys[i]) {
              i++;
            }
          }

          const next_child = yield* readNode(node.children[i]);
          yield* insertNonFull(next_child, key, value);
        }
      });

    const findInNode = (
      node_id: string,
      key: K
    ): Effect.Effect<string | undefined, DatabaseError | SystemError> =>
      Effect.gen(function* () {
        const node = yield* readNode(node_id);
        let i = 0;
        while (i < node.keys.length && key > node.keys[i]) {
          i++;
        }

        if (i < node.keys.length && key === node.keys[i]) {
          return node.values[i];
        }

        if (node.is_leaf) {
          return undefined;
        }

        return yield* findInNode(node.children[i], key);
      });

    const findAllInNode = (
      node_id: string,
      key: K
    ): Effect.Effect<string[], DatabaseError | SystemError> =>
      Effect.gen(function* () {
        const node = yield* readNode(node_id);
        const results: string[] = [];
        let i = 0;

        while (i < node.keys.length) {
          if (key === node.keys[i]) {
            results.push(node.values[i]);
            // Even if we find it in an internal node, we must check children
            // because duplicates could be spread across children and siblings.
            if (!node.is_leaf) {
              const childResults = yield* findAllInNode(node.children[i], key);
              results.push(...childResults);
            }
          } else if (key < node.keys[i]) {
            // If key is smaller, it can only be in the left child
            if (!node.is_leaf) {
              const childResults = yield* findAllInNode(node.children[i], key);
              results.push(...childResults);
            }
            // Since keys are sorted, we don't need to check further keys or children to the right
            // UNLESS there are more duplicates in this node, but they would have been handled by the first 'if'
            return results;
          }
          i++;
        }

        // If we reached the end, check the last child
        if (!node.is_leaf) {
          const childResults = yield* findAllInNode(node.children[i], key);
          results.push(...childResults);
        }

        return results;
      });

    const findRangeInNode = (
      node_id: string,
      options: {
        min?: K;
        max?: K;
        minInclusive?: boolean;
        maxInclusive?: boolean;
      }
    ): Effect.Effect<
      { key: K; value: string }[],
      DatabaseError | SystemError
    > =>
      Effect.gen(function* () {
        const node = yield* readNode(node_id);
        const results: { key: K; value: string }[] = [];
        const { min, max, minInclusive = true, maxInclusive = true } = options;

        let i = 0;

        // Skip keys strictly smaller than min
        if (min != null) {
          while (i < node.keys.length && node.keys[i] < min) {
            i++;
          }
        }

        while (i < node.keys.length) {
          const currentKey = node.keys[i];

          // Check if we passed the max bound
          if (max != null) {
            if (maxInclusive ? currentKey > max : currentKey >= max) {
              // No more keys in this node can match.
              // But we might need to check child[i] if its keys could be within range.
              if (!node.is_leaf) {
                const childResults = yield* findRangeInNode(
                  node.children[i],
                  options
                );
                results.push(...childResults);
              }
              return results;
            }
          }

          // At this point, currentKey could be within range.
          // Before adding currentKey, check child[i]
          if (!node.is_leaf) {
            const childResults = yield* findRangeInNode(
              node.children[i],
              options
            );
            results.push(...childResults);
          }

          // Add currentKey if it satisfies min bound (max bound already checked)
          let inMin = true;
          if (min != null) {
            inMin = minInclusive ? currentKey >= min : currentKey > min;
          }

          if (inMin) {
            results.push({ key: currentKey, value: node.values[i] });
          }

          i++;
        }

        // Check the last child
        if (!node.is_leaf) {
          const childResults = yield* findRangeInNode(
            node.children[i],
            options
          );
          results.push(...childResults);
        }

        return results;
      });

    const getPredecessor = (
      node: BTreeNode<K>,
      i: number
    ): Effect.Effect<{ key: K; value: string }, DatabaseError | SystemError> =>
      Effect.gen(function* () {
        let current = yield* readNode(node.children[i]);
        while (!current.is_leaf) {
          current = yield* readNode(
            current.children[current.children.length - 1]
          );
        }
        return {
          key: current.keys[current.keys.length - 1],
          value: current.values[current.values.length - 1]
        };
      });

    const getSuccessor = (
      node: BTreeNode<K>,
      i: number
    ): Effect.Effect<{ key: K; value: string }, DatabaseError | SystemError> =>
      Effect.gen(function* () {
        let current = yield* readNode(node.children[i + 1]);
        while (!current.is_leaf) {
          current = yield* readNode(current.children[0]);
        }
        return {
          key: current.keys[0],
          value: current.values[0]
        };
      });

    const borrowFromLeft = (
      parent: BTreeNode<K>,
      i: number,
      child: BTreeNode<K>,
      leftSibling: BTreeNode<K>
    ) =>
      Effect.gen(function* () {
        child.keys.unshift(parent.keys[i - 1]);
        child.values.unshift(parent.values[i - 1]);
        if (!child.is_leaf) {
          child.children.unshift(leftSibling.children.pop()!);
        }

        parent.keys[i - 1] = leftSibling.keys.pop()!;
        parent.values[i - 1] = leftSibling.values.pop()!;

        yield* Effect.all(
          [writeNode(parent), writeNode(child), writeNode(leftSibling)],
          { concurrency: "inherit" }
        );
      });

    const borrowFromRight = (
      parent: BTreeNode<K>,
      i: number,
      child: BTreeNode<K>,
      rightSibling: BTreeNode<K>
    ) =>
      Effect.gen(function* () {
        child.keys.push(parent.keys[i]);
        child.values.push(parent.values[i]);
        if (!child.is_leaf) {
          child.children.push(rightSibling.children.shift()!);
        }

        parent.keys[i] = rightSibling.keys.shift()!;
        parent.values[i] = rightSibling.values.shift()!;

        yield* Effect.all(
          [writeNode(parent), writeNode(child), writeNode(rightSibling)],
          { concurrency: "inherit" }
        );
      });

    const merge = (
      parent: BTreeNode<K>,
      i: number,
      leftChild: BTreeNode<K>,
      rightChild: BTreeNode<K>
    ) =>
      Effect.gen(function* () {
        leftChild.keys.push(parent.keys[i]);
        leftChild.values.push(parent.values[i]);

        leftChild.keys.push(...rightChild.keys);
        leftChild.values.push(...rightChild.values);
        if (!leftChild.is_leaf) {
          leftChild.children.push(...rightChild.children);
        }

        parent.keys.splice(i, 1);
        parent.values.splice(i, 1);
        parent.children.splice(i + 1, 1);

        yield* Effect.all([writeNode(parent), writeNode(leftChild)], {
          concurrency: "inherit"
        });
        // NOTE: rightChild is now abandoned/garbage. In a real system we'd free its disk space.
      });

    const deleteFromNode = (
      node_id: string,
      key: K,
      value?: string
    ): Effect.Effect<boolean, DatabaseError | SystemError> =>
      Effect.gen(function* () {
        const node = yield* readNode(node_id);
        let i = 0;
        while (i < node.keys.length && key > node.keys[i]) {
          i++;
        }

        // 1. If key is in this node
        if (i < node.keys.length && key === node.keys[i]) {
          // Find if the specific value is in this node
          let foundIndex = -1;
          if (value !== undefined) {
            for (let j = i; j < node.keys.length && node.keys[j] === key; j++) {
              if (node.values[j] === value) {
                foundIndex = j;
                break;
              }
            }
          } else {
            foundIndex = i;
          }

          if (foundIndex !== -1) {
            if (node.is_leaf) {
              (node.keys as K[]).splice(foundIndex, 1);
              (node.values as string[]).splice(foundIndex, 1);
              yield* writeNode(node);
              return true;
            } else {
              // Internal node deletion
              const leftChild = yield* readNode(node.children[foundIndex]);
              if (leftChild.keys.length >= order) {
                const pred = yield* getPredecessor(node, foundIndex);
                node.keys[foundIndex] = pred.key;
                node.values[foundIndex] = pred.value;
                yield* writeNode(node);
                return yield* deleteFromNode(
                  leftChild.id,
                  pred.key,
                  pred.value
                );
              }

              const rightChild = yield* readNode(node.children[foundIndex + 1]);
              if (rightChild.keys.length >= order) {
                const succ = yield* getSuccessor(node, foundIndex);
                node.keys[foundIndex] = succ.key;
                node.values[foundIndex] = succ.value;
                yield* writeNode(node);
                return yield* deleteFromNode(
                  rightChild.id,
                  succ.key,
                  succ.value
                );
              }

              yield* merge(node, foundIndex, leftChild, rightChild);
              return yield* deleteFromNode(leftChild.id, key, value);
            }
          }
        }

        // 2. Not in this node or value mismatch in this node.
        if (node.is_leaf) return false;

        // If we have a specific value, it could be in ANY child where key could exist.
        // For standard B-Tree, this is child i, and if node.keys[i] == key, potentially child i+1 etc.

        const tryDelete = (
          idx: number
        ): Effect.Effect<boolean, DatabaseError | SystemError> =>
          Effect.gen(function* () {
            const currentNode = yield* readNode(node_id);
            if (idx >= currentNode.children.length) return false;

            let child = yield* readNode(currentNode.children[idx]);
            if (child.keys.length < order) {
              const leftSibling =
                idx > 0
                  ? yield* readNode(currentNode.children[idx - 1])
                  : undefined;
              const rightSibling =
                idx < currentNode.keys.length
                  ? yield* readNode(currentNode.children[idx + 1])
                  : undefined;

              if (leftSibling && leftSibling.keys.length >= order) {
                yield* borrowFromLeft(currentNode, idx, child, leftSibling);
              } else if (rightSibling && rightSibling.keys.length >= order) {
                yield* borrowFromRight(currentNode, idx, child, rightSibling);
              } else {
                if (leftSibling) {
                  yield* merge(currentNode, idx - 1, leftSibling, child);
                  child = leftSibling;
                  return yield* deleteFromNode(child.id, key, value);
                } else if (rightSibling) {
                  yield* merge(currentNode, idx, child, rightSibling);
                  // child is still child
                }
              }
            }
            return yield* deleteFromNode(child.id, key, value);
          });

        // Try child i
        const deleted = yield* tryDelete(i);
        if (deleted) return true;

        // If value is specified, we might need to check subsequent children if they also match the key
        if (value !== undefined) {
          let nextIdx = i + 1;
          while (
            nextIdx <= node.keys.length &&
            node.keys[nextIdx - 1] === key
          ) {
            const deletedNext = yield* tryDelete(nextIdx);
            if (deletedNext) return true;
            nextIdx++;
          }
        }

        return false;
      });

    return {
      /**
       * Inserts a key-value pair into the B-tree.
       *
       * @param key The key to insert.
       * @param value The value associated with the key.
       * @returns An effect that completes when the insertion is done.
       */
      insert: (key: K, value?: string) => {
        const insert_effect = Effect.gen(function* () {
          const root_id = yield* Ref.get(rootIdRef);
          let root = yield* readNode(root_id);

          if (root.keys.length === 2 * order - 1) {
            const new_root = yield* createNode(false);
            new_root.children.push(root.id);

            yield* splitChild(new_root, 0, root);
            yield* updateRootId(new_root.id);
            yield* insertNonFull(new_root, key, value ?? "");
          } else {
            yield* insertNonFull(root, key, value ?? "");
          }
        });

        return semaphore.withPermits(1)(insert_effect);
      },
      /**
       * Finds the value associated with a given key in the B-tree.
       * @param key The key to search for.
       * @returns - The value associated with the key, or undefined if not found.
       */
      find: (key: K) => {
        const find_effect = Ref.get(rootIdRef).pipe(
          Effect.flatMap((rootId) => findInNode(rootId, key))
        );

        return semaphore.withPermits(1)(find_effect);
      },
      /**
       * Finds all values associated with a given key in the B-tree.
       * @param key The key to search for.
       * @returns - An array of values associated with the key.
       */
      findAll: (key: K) => {
        const find_all_effect = Ref.get(rootIdRef).pipe(
          Effect.flatMap((rootId) => findAllInNode(rootId, key))
        );

        return semaphore.withPermits(1)(find_all_effect);
      },
      /**
       * Finds all key-value pairs within a specified range in the B-tree.
       * @param options Range options.
       * @returns - An array of key-value pairs within the range.
       */
      findRange: (options: {
        min?: K;
        max?: K;
        minInclusive?: boolean;
        maxInclusive?: boolean;
      }) => {
        const find_range_effect = Ref.get(rootIdRef).pipe(
          Effect.flatMap((rootId) => findRangeInNode(rootId, options))
        );

        return semaphore.withPermits(1)(find_range_effect);
      },
      /**
       * Deletes a key from the B-tree.
       * @param key The key to delete.
       * @param value Optional specific value to delete.
       * @returns true if the key was found and deleted, false otherwise.
       */
      delete: (key: K, value?: string) => {
        const delete_effect = Effect.gen(function* () {
          const root_id = yield* Ref.get(rootIdRef);
          const deleted = yield* deleteFromNode(root_id, key, value);

          // Root height reduction
          const root = yield* readNode(root_id);
          if (!root.is_leaf && root.keys.length === 0) {
            yield* updateRootId(root.children[0]);
          }

          return deleted;
        });
        return semaphore.withPermits(1)(delete_effect);
      }
    };
  });
