import {
  MimeType,
  OpenInferenceSpanKind,
  SemanticConventions,
} from "@arizeai/openinference-semantic-conventions";
import type { NodeTracerProvider, Tracer } from "@arizeai/phoenix-otel";
import {
  type DiagLogLevel,
  objectAsAttributes,
  register,
  SpanStatusCode,
} from "@arizeai/phoenix-otel";
import invariant from "tiny-invariant";

import type { components } from "../__generated__/api/v1";
import { createClient, type PhoenixClient } from "../client";
import { createLogger, type Logger } from "../logger";
import type { ClientFn } from "../types/core";
import type {
  EvaluationResult,
  Evaluator,
  ExperimentEvaluatorLike,
  IncompleteEvaluation,
  TaskOutput,
} from "../types/experiments";
import { Channel, ChannelError } from "../utils/channel";
import { ensureString } from "../utils/ensureString";
import { toObjectHeaders } from "../utils/toObjectHeaders";
import { getExperimentInfo } from "./getExperimentInfo.js";
import { getExperimentEvaluators } from "./helpers";
import { logEvalResumeSummary, PROGRESS_PREFIX } from "./logging";

/**
 * Error thrown when evaluation is aborted due to a failure in stopOnFirstError mode.
 * This provides semantic context that the abort was intentional, not an infrastructure failure.
 * @internal - Not exported to minimize API surface area
 */
class EvaluationAbortedError extends Error {
  constructor(message: string, cause?: Error) {
    super(message);
    this.name = "EvaluationAbortedError";
    this.cause = cause;
  }
}

/**
 * Error thrown when the producer fails to fetch incomplete evaluations from the server.
 * This is a critical error that should always be surfaced, even in stopOnFirstError=false mode.
 * @internal - Not exported to minimize API surface area
 */
class EvaluationFetchError extends Error {
  constructor(message: string, cause?: Error) {
    super(message);
    this.name = "EvaluationFetchError";
    this.cause = cause;
  }
}

export type ResumeEvaluationParams = ClientFn & {
  /**
   * The ID of the experiment to resume evaluations for
   */
  readonly experimentId: string;
  /**
   * A single evaluator or list of evaluators to run on incomplete evaluations
   */
  readonly evaluators:
    | ExperimentEvaluatorLike
    | readonly ExperimentEvaluatorLike[];
  /**
   * The logger to use
   * @default createLogger()
   */
  readonly logger?: Logger;
  /**
   * The number of concurrent evaluation executions
   * @default 5
   */
  readonly concurrency?: number;
  /**
   * Whether to set the global tracer provider when running evaluators.
   * @default true
   */
  readonly setGlobalTracerProvider?: boolean;
  /**
   * Whether to use batch span processor for tracing.
   * @default true
   */
  readonly useBatchSpanProcessor?: boolean;
  /**
   * Log level to set for the default DiagConsoleLogger when tracing.
   */
  readonly diagLogLevel?: DiagLogLevel;
  /**
   * Stop processing and exit as soon as any evaluation fails.
   * @default false
   */
  readonly stopOnFirstError?: boolean;
};

const DEFAULT_PAGE_SIZE = 50 as const;
/**
 * Channel capacity multiplier for producer-consumer buffering.
 * A value of 2 enables pipeline efficiency: workers process batch N while
 * the producer fetches batch N+1, eliminating idle time without excessive
 * memory usage. The channel blocks when full, providing natural backpressure.
 */
const CHANNEL_CAPACITY_MULTIPLIER = 2 as const;

/**
 * Evaluation item for the producer-consumer channel
 */
type EvalItem = {
  readonly incompleteEval: IncompleteEvaluation;
  readonly evaluator: Evaluator;
};

/**
 * Transforms API incomplete evaluation response to IncompleteEvaluation
 */
function buildIncompleteEvaluation(
  apiResponse: components["schemas"]["IncompleteExperimentEvaluation"]
): IncompleteEvaluation {
  return {
    experimentRun: {
      id: apiResponse.experiment_run.id,
      experimentId: apiResponse.experiment_run.experiment_id,
      datasetExampleId: apiResponse.experiment_run.dataset_example_id,
      output: apiResponse.experiment_run.output ?? null,
      startTime: new Date(apiResponse.experiment_run.start_time),
      endTime: new Date(apiResponse.experiment_run.end_time),
      error: apiResponse.experiment_run.error ?? null,
      traceId: apiResponse.experiment_run.trace_id ?? null,
    },
    datasetExample: {
      id: apiResponse.dataset_example.id,
      input: apiResponse.dataset_example.input,
      output: apiResponse.dataset_example.output ?? null,
      metadata: apiResponse.dataset_example.metadata || {},
      updatedAt: new Date(apiResponse.dataset_example.updated_at),
    },
    evaluationNames: apiResponse.evaluation_names,
  };
}

/**
 * Determines if an evaluator should run for an incomplete evaluation
 */
function shouldRunEvaluator(
  evaluator: Evaluator,
  incompleteEval: IncompleteEvaluation
): boolean {
  // Match evaluator name directly
  return incompleteEval.evaluationNames.includes(evaluator.name);
}

/**
 * Handles fetch errors with helpful version information for unsupported features
 */
async function handleEvaluationFetchError(
  error: unknown,
  client: PhoenixClient,
  featureName: string
): Promise<never> {
  // Check if this is a JSON parse error (likely 404 HTML response from old server)
  const isJsonError =
    error instanceof SyntaxError &&
    error.message.toLowerCase().includes("json");

  if (isJsonError) {
    // Fetch server version to provide helpful context
    let versionInfo = "";
    try {
      const baseUrl = client.config.baseUrl || "";
      const versionRes = await fetch(`${baseUrl}/arize_phoenix_version`);
      if (versionRes.ok) {
        const version = await versionRes.text();
        versionInfo = ` Your current server version is ${version}.`;
      }
    } catch {
      // Ignore errors fetching version
    }

    throw new Error(
      `The ${featureName} feature is not available on this Phoenix server. ` +
        "Please upgrade your Phoenix server to use this feature." +
        versionInfo
    );
  }
  throw error;
}

/**
 * Sets up OpenTelemetry tracer for evaluation tracing
 */
function setupEvaluationTracer({
  projectName,
  baseUrl,
  headers,
  useBatchSpanProcessor,
  diagLogLevel,
  setGlobalTracerProvider,
}: {
  projectName: string | null;
  baseUrl: string;
  headers?: Record<string, string>;
  useBatchSpanProcessor: boolean;
  diagLogLevel?: DiagLogLevel;
  setGlobalTracerProvider: boolean;
}): { provider: NodeTracerProvider; tracer: Tracer } | null {
  if (!projectName) {
    return null;
  }

  const provider = register({
    projectName,
    url: baseUrl,
    headers,
    batch: useBatchSpanProcessor,
    diagLogLevel,
    global: setGlobalTracerProvider,
  });

  const tracer = provider.getTracer(projectName);
  return { provider, tracer };
}

/**
 * Resume incomplete evaluations for an experiment.
 *
 * This function identifies which evaluations have not been completed (either missing or failed)
 * and runs the evaluators only for those runs. This is useful for:
 * - Recovering from transient evaluator failures
 * - Adding new evaluators to completed experiments
 * - Completing partially evaluated experiments
 *
 * The function processes incomplete evaluations in batches using pagination to minimize memory usage.
 *
 * Evaluation names are matched to evaluator names. For example, if you pass
 * an evaluator with name "accuracy", it will check for and resume any runs missing the "accuracy" evaluation.
 *
 * **Note:** Multi-output evaluators (evaluators that return an array of results) are not
 * supported for resume operations. Each evaluator should produce a single evaluation
 * result with a name matching the evaluator's name.
 *
 * @throws {Error} Throws different error types based on failure:
 *   - "EvaluationFetchError": Unable to fetch incomplete evaluations from the server.
 *     Always thrown regardless of stopOnFirstError, as it indicates critical infrastructure failure.
 *   - "EvaluationAbortedError": stopOnFirstError=true and an evaluator failed.
 *     Original error preserved in `cause` property.
 *   - Generic Error: Other evaluator execution errors or unexpected failures.
 *
 * @example
 * ```ts
 * import { resumeEvaluation } from "@arizeai/phoenix-client/experiments";
 *
 * // Standard usage: evaluation name matches evaluator name
 * try {
 *   await resumeEvaluation({
 *     experimentId: "exp_123",
 *     evaluators: [{
 *       name: "correctness",
 *       kind: "CODE",
 *       evaluate: async ({ output, expected }) => ({
 *         score: output === expected ? 1 : 0
 *       })
 *     }],
 *   });
 * } catch (error) {
 *   // Handle by error name (no instanceof needed)
 *   if (error.name === "EvaluationFetchError") {
 *     console.error("Failed to connect to server:", error.cause);
 *   } else if (error.name === "EvaluationAbortedError") {
 *     console.error("Evaluation stopped due to error:", error.cause);
 *   } else {
 *     console.error("Unexpected error:", error);
 *   }
 * }
 *
 * // Stop on first error (useful for debugging)
 * await resumeEvaluation({
 *   experimentId: "exp_123",
 *   evaluators: [myEvaluator],
 *   stopOnFirstError: true, // Exit immediately on first failure
 * });
 * ```
 */
export async function resumeEvaluation({
  client: _client,
  experimentId,
  evaluators: _evaluators,
  logger = createLogger(),
  concurrency = 5,
  setGlobalTracerProvider = true,
  useBatchSpanProcessor = true,
  diagLogLevel,
  stopOnFirstError = false,
}: ResumeEvaluationParams): Promise<void> {
  const client = _client ?? createClient();
  const pageSize = DEFAULT_PAGE_SIZE;

  // Normalize evaluators to array
  const evaluators = getExperimentEvaluators(
    Array.isArray(_evaluators) ? _evaluators : [_evaluators]
  );
  // Validate inputs
  invariant(evaluators.length > 0, "Must specify at least one evaluator");

  // Get experiment info
  logger.info(`${PROGRESS_PREFIX.start}Checking for incomplete evaluations.`);
  const experiment = await getExperimentInfo({ client, experimentId });

  // Initialize tracer (only if experiment has a project_name)
  const baseUrl = client.config.baseUrl;
  invariant(
    baseUrl,
    "Phoenix base URL not found. Please set PHOENIX_HOST or set baseUrl on the client."
  );

  const tracerSetup = setupEvaluationTracer({
    projectName: experiment.projectName,
    baseUrl,
    headers: client.config.headers
      ? toObjectHeaders(client.config.headers)
      : undefined,
    useBatchSpanProcessor,
    diagLogLevel,
    setGlobalTracerProvider,
  });

  const provider = tracerSetup?.provider ?? null;
  const evalTracer = tracerSetup?.tracer ?? null;

  // Build evaluation names list for query - derive from evaluator names
  const evaluationNamesList = evaluators.map((e) => e.name);

  // Create a CSP-style bounded buffer for evaluation distribution
  const evalChannel = new Channel<EvalItem>(
    pageSize * CHANNEL_CAPACITY_MULTIPLIER
  );

  // Abort controller for stopOnFirstError coordination
  const abortController = new AbortController();
  const { signal } = abortController;

  let totalProcessed = 0;
  let totalCompleted = 0;
  let totalFailed = 0;

  // Producer: Fetch incomplete evaluations and send to channel
  async function fetchIncompleteEvaluations(): Promise<void> {
    let cursor: string | null = null;

    try {
      do {
        // Stop fetching if abort signal received
        if (signal.aborted) {
          logger.debug(`${PROGRESS_PREFIX.progress}Stopping fetch.`);
          break;
        }

        let res: {
          data?: components["schemas"]["GetIncompleteEvaluationsResponseBody"];
          error?: unknown;
        };

        try {
          res = await client.GET(
            "/v1/experiments/{experiment_id}/incomplete-evaluations",
            {
              params: {
                path: {
                  experiment_id: experimentId,
                },
                query: {
                  cursor,
                  limit: pageSize,
                  evaluation_name: evaluationNamesList,
                },
              },
            }
          );
        } catch (error: unknown) {
          // Check for version compatibility issues and throw helpful error
          try {
            await handleEvaluationFetchError(
              error,
              client,
              "resume_evaluation"
            );
            // TypeScript: handleEvaluationFetchError never returns, but add throw for safety
            throw new Error("handleEvaluationFetchError should never return");
          } catch (handledError) {
            // Wrap the error (from handleEvaluationFetchError or original) in semantic error type
            throw new EvaluationFetchError(
              "Failed to fetch incomplete evaluations from server",
              handledError instanceof Error ? handledError : undefined
            );
          }
        }

        // Check for API errors
        if (res.error) {
          throw new EvaluationFetchError(
            `Failed to fetch incomplete evaluations: ${ensureString(res.error)}`
          );
        }

        cursor = res.data?.next_cursor ?? null;
        const batchIncomplete = res.data?.data;
        invariant(batchIncomplete, "Failed to fetch incomplete evaluations");

        if (batchIncomplete.length === 0) {
          if (totalProcessed === 0) {
            logger.info(
              `${PROGRESS_PREFIX.completed}No incomplete evaluations found.`
            );
          }
          break;
        }

        if (totalProcessed === 0) {
          logger.info(`${PROGRESS_PREFIX.start}Resuming evaluations.`);
        }

        // Build evaluation tasks and send to channel
        let batchCount = 0;
        for (const incomplete of batchIncomplete) {
          // Stop sending items if abort signal received
          if (signal.aborted) {
            break;
          }

          const incompleteEval = buildIncompleteEvaluation(incomplete);

          const evaluatorsToRun = evaluators.filter((evaluator) =>
            shouldRunEvaluator(evaluator, incompleteEval)
          );

          // Flatten: Send one channel item per evaluator
          for (const evaluator of evaluatorsToRun) {
            // Stop sending items if abort signal received
            if (signal.aborted) {
              break;
            }

            await evalChannel.send({ incompleteEval, evaluator });
            batchCount++;
            totalProcessed++;
          }
        }

        logger.debug(
          `${PROGRESS_PREFIX.progress}Fetched batch of ${batchCount} evaluation tasks.`
        );
      } while (cursor !== null && !signal.aborted);
    } catch (error) {
      // Re-throw with context preservation
      if (error instanceof EvaluationFetchError) {
        throw error;
      }
      // ChannelError from blocked send() should bubble up naturally
      // (happens when channel closes while producer is blocked)
      if (error instanceof ChannelError) {
        throw error;
      }
      // Wrap any unexpected errors from channel operations
      throw new EvaluationFetchError(
        "Unexpected error during evaluation fetch",
        error instanceof Error ? error : undefined
      );
    } finally {
      evalChannel.close(); // Signal workers we're done
    }
  }

  // Worker: Process evaluations from channel
  async function processEvaluationsFromChannel(): Promise<void> {
    for await (const item of evalChannel) {
      // Stop processing if abort signal received
      if (signal.aborted) {
        break;
      }

      try {
        await runSingleEvaluation({
          client,
          experimentId,
          evaluator: item.evaluator,
          experimentRun: item.incompleteEval.experimentRun,
          datasetExample: item.incompleteEval.datasetExample,
          tracer: evalTracer,
        });
        totalCompleted++;
      } catch (error) {
        totalFailed++;
        logger.error(
          `Failed to run evaluator "${item.evaluator.name}" for run ${item.incompleteEval.experimentRun.id}: ${error}`
        );

        // If stopOnFirstError is enabled, abort and re-throw
        if (stopOnFirstError) {
          logger.warn("Stopping on first error");
          abortController.abort();
          throw error;
        }
      }
    }
  }

  // Start concurrent execution
  // Wrap in try-finally to ensure channel is always closed, even if Promise.all throws
  let executionError: Error | null = null;
  try {
    const producerTask = fetchIncompleteEvaluations();
    const workerTasks = Array.from({ length: concurrency }, () =>
      processEvaluationsFromChannel()
    );

    // Wait for producer and all workers to finish
    await Promise.all([producerTask, ...workerTasks]);
  } catch (error) {
    // Classify and handle errors based on their nature
    const err = error instanceof Error ? error : new Error(String(error));

    // Always surface producer/infrastructure errors
    if (error instanceof EvaluationFetchError) {
      // Producer failed - this is ALWAYS critical regardless of stopOnFirstError
      logger.error(`Critical: Failed to fetch evaluations from server`);
      executionError = err;
    } else if (error instanceof ChannelError && signal.aborted) {
      // Channel closed due to intentional abort - wrap in semantic error
      executionError = new EvaluationAbortedError(
        "Evaluation stopped due to error in concurrent evaluator",
        err
      );
    } else if (stopOnFirstError) {
      // Worker error in stopOnFirstError mode - already logged by worker
      executionError = err;
    } else {
      // Unexpected error (not from worker, not from producer fetch)
      // This could be a bug in our code or infrastructure failure
      logger.error(`Unexpected error during evaluation: ${err.message}`);
      executionError = err;
    }
  } finally {
    // Ensure channel is closed even if there are unexpected errors
    // This is a safety net in case producer's finally block didn't execute
    if (!evalChannel.isClosed) {
      evalChannel.close();
    }
  }

  // Only show completion message if we didn't stop on error
  if (!executionError) {
    logger.info(`${PROGRESS_PREFIX.completed}Evaluations completed.`);
  }

  if (totalFailed > 0 && !executionError) {
    logger.warn(`${totalFailed} out of ${totalProcessed} evaluations failed.`);
  }

  logEvalResumeSummary(logger, {
    experimentId: experiment.id,
    processed: totalProcessed,
    completed: totalCompleted,
    failed: totalFailed,
  });

  // Flush spans (if tracer was initialized)
  if (provider) {
    await provider.forceFlush();
  }

  // Re-throw error if evaluation failed
  if (executionError) {
    throw executionError;
  }
}

/**
 * Record evaluation results to API.
 */
async function recordEvaluationResults({
  client,
  evaluator,
  experimentRun,
  results,
  error,
  startTime,
  endTime,
  traceId = null,
}: {
  readonly client: PhoenixClient;
  readonly evaluator: Evaluator;
  readonly experimentRun: IncompleteEvaluation["experimentRun"];
  readonly results?: readonly EvaluationResult[];
  readonly error?: string;
  readonly startTime: Date;
  readonly endTime: Date;
  readonly traceId?: string | null;
}): Promise<void> {
  if (results) {
    // Success case: record each evaluation result
    for (const singleResult of results) {
      await client.POST("/v1/experiment_evaluations", {
        body: {
          experiment_run_id: experimentRun.id,
          name: evaluator.name,
          annotator_kind: evaluator.kind,
          result: {
            score: singleResult.score ?? null,
            label: singleResult.label ?? null,
            explanation: singleResult.explanation ?? null,
            metadata: singleResult.metadata ?? {},
          },
          start_time: startTime.toISOString(),
          end_time: endTime.toISOString(),
          error: null,
          trace_id: traceId,
        },
      });
    }
  } else if (error) {
    // Error case: record failed evaluation with evaluator name
    await client.POST("/v1/experiment_evaluations", {
      body: {
        experiment_run_id: experimentRun.id,
        name: evaluator.name,
        annotator_kind: evaluator.kind,
        result: null,
        start_time: startTime.toISOString(),
        end_time: endTime.toISOString(),
        error,
        trace_id: traceId,
      },
    });
  }
}

/**
 * Run a single evaluation and record the result.
 */
async function runSingleEvaluation({
  client,
  experimentId,
  evaluator,
  experimentRun,
  datasetExample,
  tracer,
}: {
  readonly client: PhoenixClient;
  readonly experimentId: string;
  readonly evaluator: Evaluator;
  readonly experimentRun: IncompleteEvaluation["experimentRun"];
  readonly datasetExample: IncompleteEvaluation["datasetExample"];
  readonly tracer: Tracer | null;
}): Promise<void> {
  const startTime = new Date();

  // Prepare evaluator inputs
  const taskOutput: TaskOutput = experimentRun.output ?? null;
  const expectedOutput = datasetExample.output ?? undefined;

  // If no tracer (no project_name), execute without tracing
  if (!tracer) {
    let results: readonly EvaluationResult[] | undefined;
    let error: string | undefined;

    try {
      const result = await Promise.resolve(
        evaluator.evaluate({
          input: datasetExample.input,
          output: taskOutput,
          expected: expectedOutput,
          metadata: datasetExample.metadata,
        })
      );
      results = Array.isArray(result) ? result : [result];
    } catch (err) {
      error = err instanceof Error ? err.message : String(err);
      throw err;
    } finally {
      const endTime = new Date();
      await recordEvaluationResults({
        client,
        evaluator,
        experimentRun,
        results,
        error,
        startTime,
        endTime,
      });
    }
    return;
  }

  // With tracer: wrap execution in a span for observability
  return tracer.startActiveSpan(
    `Evaluation: ${evaluator.name}`,
    async (span) => {
      // Set span attributes for input
      span.setAttributes({
        [SemanticConventions.OPENINFERENCE_SPAN_KIND]:
          OpenInferenceSpanKind.EVALUATOR,
        [SemanticConventions.INPUT_MIME_TYPE]: MimeType.JSON,
        [SemanticConventions.INPUT_VALUE]: ensureString({
          input: datasetExample.input,
          output: experimentRun.output,
          expected: datasetExample.output,
          metadata: datasetExample.metadata,
        }),
        ...objectAsAttributes({
          experiment_id: experimentId,
          experiment_run_id: experimentRun.id,
          dataset_example_id: datasetExample.id,
        }),
      });

      let results: readonly EvaluationResult[] | undefined;
      let error: string | undefined;

      try {
        // Execute the evaluator (only once!)
        const result = await Promise.resolve(
          evaluator.evaluate({
            input: datasetExample.input,
            output: taskOutput,
            expected: expectedOutput,
            metadata: datasetExample.metadata,
          })
        );

        results = Array.isArray(result) ? result : [result];

        // Set output span attributes
        span.setAttributes({
          [SemanticConventions.OUTPUT_MIME_TYPE]: MimeType.JSON,
          [SemanticConventions.OUTPUT_VALUE]: ensureString(result),
        });

        // Set attributes from first result for span metadata
        if (results[0]) {
          span.setAttributes(objectAsAttributes(results[0]));
        }

        span.setStatus({ code: SpanStatusCode.OK });
      } catch (err) {
        error = err instanceof Error ? err.message : String(err);

        span.setStatus({
          code: SpanStatusCode.ERROR,
          message: error,
        });
        span.recordException(err as Error);

        throw err;
      } finally {
        const endTime = new Date();
        span.end();

        // Record results to API
        await recordEvaluationResults({
          client,
          evaluator,
          experimentRun,
          results,
          error,
          startTime,
          endTime,
          traceId: span.spanContext().traceId,
        });
      }
    }
  );
}
