import { MutationRequest, MutationId, MutationResponse } from "./protocol.js";
import { JSONValue, convexToJson, jsonToConvex } from "../../values/index.js";
import { createError, logToConsole } from "../logging.js";
import { Long } from "../long.js";

type MutationStatus =
  | {
      status: "Requested";
      onResult: (result: any) => void;
      onFailure: (reason: any) => void;
    }
  | {
      status: "Committed";
      onResult: () => void;
      ts: Long;
    };

export class MutationManager {
  private inflightMutations: Map<
    MutationId,
    {
      message: MutationRequest;
      status: MutationStatus;
    }
  >;
  constructor() {
    this.inflightMutations = new Map();
  }

  request(
    udfPath: string,
    args: any[],
    mutationId: MutationId
  ): {
    message: MutationRequest;
    result: Promise<any>;
  } {
    const message: MutationRequest = {
      type: "Mutation",
      mutationId,
      udfPath,
      args: <JSONValue[]>convexToJson(args),
    };

    const result = new Promise((resolve, reject) => {
      this.inflightMutations.set(mutationId, {
        message,
        status: { status: "Requested", onResult: resolve, onFailure: reject },
      });
    });

    return { message, result };
  }

  /**
   * Update the state after receving a mutation response.
   *
   * @returns A MutationId if the mutation is complete and its optimistic update
   * can be dropped, null otherwise.
   */
  onResponse(response: MutationResponse): MutationId | null {
    const mutationInfo = this.inflightMutations.get(response.mutationId);
    if (mutationInfo === undefined) {
      // Annoyingly we can occasionally get responses to mutations that we're no
      // longer tracking. One flow where this happens is:
      // 1. Client sends mutation 1
      // 2. Client gets response for mutation 1. The sever says that it was committed at ts=10.
      // 3. Client is disconnected
      // 4. Client reconnects and re-issues queries and this mutation.
      // 5. Server sends transition message to ts=20
      // 6. Client drops mutation because it's already been observed.
      // 7. Client receives a second response for mutation 1 but doesn't know about it anymore.

      // The right fix for this is probably to add a reconciliation phase on
      // reconnection where we receive responses to all the mutations before
      // the transition message so this flow could never happen (CX-1513).

      // For now though, we can just ignore this message.
      return null;
    }

    // Because `.restart()` re-requests committed mutations, we may get some
    // responses for mutations that are already in the "Committed" state.
    // We can safely ignore those because we've already notified the UI about
    // their results.
    if (mutationInfo.status.status === "Requested") {
      const udfPath = mutationInfo.message.udfPath;
      for (const line of response.logLines) {
        logToConsole("info", "mutation", udfPath, line);
      }
      if (response.success) {
        // Even though we have the result at this point, wait to resolve the
        // mutation promise until after we transation past this timestamp.
        const status = mutationInfo.status;
        const onResult = () => status.onResult(jsonToConvex(response.result));

        mutationInfo.status = {
          status: "Committed",
          ts: response.ts,
          onResult,
        };
        // Need to wait until we transition past this timestamp to consider the
        // mutation complete.
        return null;
      } else {
        this.inflightMutations.delete(response.mutationId);
        logToConsole("error", "mutation", udfPath, response.result);
        mutationInfo.status.onFailure(
          createError("mutation", udfPath, response.result)
        );
        // If the mutation is a failure, then we consider it complete and should
        // drop the optimistic update.
        return response.mutationId;
      }
    }
    return null;
  }

  removeCompletedMutations(ts: Long): Set<MutationId> {
    const completeMutations: Set<MutationId> = new Set();
    for (const [mutationId, mutationInfo] of this.inflightMutations.entries()) {
      const status = mutationInfo.status;
      if (status.status == "Committed" && status.ts.lessThanOrEqual(ts)) {
        status.onResult();
        completeMutations.add(mutationId);
        this.inflightMutations.delete(mutationId);
      }
    }
    return completeMutations;
  }

  hasUncommittedMutations(): boolean {
    for (const mutationInfo of this.inflightMutations.values()) {
      if (mutationInfo.status.status === "Requested") {
        return true;
      }
    }
    return false;
  }

  restart(): MutationRequest[] {
    // When we reconnect to the backend, re-request all the in-flight mutations.

    // This includes mutations that have already been committed because we still
    // want to tell the backend to transition the client past the mutations
    // committed timestamp. This is safe because mutations are idempotent.
    const allMessages = [];
    for (const value of this.inflightMutations.values()) {
      allMessages.push(value.message);
    }
    return allMessages;
  }

  hasInflightMutation(): boolean {
    return this.inflightMutations.size > 0;
  }
}
