import { Client as PgClient } from "pg";
import {
  Client,
  Connection,
  WorkflowExecutionAlreadyStartedError,
} from "@temporalio/client";
import { config } from "./config";

const POLL_INTERVAL = Number(process.env.EVENT_POLL_INTERVAL_MS) || 10_000; // 默认 10 秒
const BATCH_SIZE = 50;
const MAX_RETRIES = Number(process.env.OUTBOX_MAX_RETRIES) || 5;

/**
 * Outbox Poller 控制句柄。stop() 用于优雅关闭：
 * - 不再调度下一次 poll
 * - 等待当前 in-flight poll 完成
 * - 释放 PG 与 Temporal 连接
 */
export interface OutboxPollerHandle {
  stop(): Promise<void>;
}

/**
 * Outbox Poller — 定时扫描 t_workflow_event_outbox 未处理事件，匹配并触发 workflow。
 *
 * 工作原理：
 * 1. 业务表的 AFTER 触发器写入 outbox 行（同一事务，保证不丢）
 * 2. 本轮询器每 N 秒扫描未处理行
 * 3. 匹配 event_trigger workflow → 启动 Temporal 执行
 * 4. 标记行为已处理
 *
 * 事务边界（重要）：
 * - 每个 outbox 行用一个独立的 PG 事务包裹：BEGIN → SELECT FOR UPDATE → 触发 workflow → UPDATE processed → COMMIT
 * - workflowId 由 outbox.id 派生为确定性值（`wf-outbox-{id}-w{workflowId}`），
 *   即使在 workflow.start 与 COMMIT 之间崩溃，下次 poll 重试时 Temporal 会抛
 *   WorkflowExecutionAlreadyStartedError，被本模块识别为幂等成功，不会丢事件、不会重复启动 workflow。
 */
export async function startOutboxPoller(): Promise<OutboxPollerHandle> {
  const pg = new PgClient({ connectionString: config.database.url });
  await pg.connect();

  const temporalConn = await Connection.connect({
    address: config.temporal.address,
  });
  const temporalClient = new Client({
    connection: temporalConn,
    namespace: config.temporal.namespace,
  });

  console.log(
    `[OutboxPoller] Started. Polling every ${POLL_INTERVAL}ms, max retries: ${MAX_RETRIES}`,
  );

  const poll = async () => {
    try {
      // 1. 先 SELECT 候选 id（不锁），减少长事务持锁时间
      const { rows: candidates } = await pg.query(
        `SELECT id FROM t_workflow_event_outbox
          WHERE processed = FALSE
          ORDER BY created_at
          LIMIT $1`,
        [BATCH_SIZE],
      );

      if (candidates.length === 0) return;
      console.log(`[OutboxPoller] Found ${candidates.length} candidate events`);

      // 2. 逐行处理：每行一个独立事务，保证 workflow.start 与 outbox UPDATE 的原子性
      for (const { id } of candidates) {
        await processOneEvent(pg, temporalClient, Number(id));
      }
    } catch (err: any) {
      console.error("[OutboxPoller] Poll error:", err.message);
    }
  };

  // 自调度循环：每次 poll 完成后再排下一次，避免 setInterval 在 poll 慢于 POLL_INTERVAL 时重叠调用
  let stopped = false;
  let timer: NodeJS.Timeout | null = null;
  let inFlight: Promise<void> | null = null;

  const loop = async () => {
    if (stopped) return;
    inFlight = poll();
    try {
      await inFlight;
    } finally {
      inFlight = null;
      if (!stopped) {
        timer = setTimeout(loop, POLL_INTERVAL);
      }
    }
  };
  void loop();

  return {
    async stop() {
      stopped = true;
      if (timer) {
        clearTimeout(timer);
        timer = null;
      }
      // 等待当前正在执行的 poll 完成（含本轮所有事件处理）
      if (inFlight) {
        try {
          await inFlight;
        } catch {
          // poll 内部已捕获错误并写日志，这里忽略
        }
      }
      try {
        await pg.end();
      } catch (err: any) {
        console.error(
          "[OutboxPoller] PG client close error:",
          err?.message ?? err,
        );
      }
      try {
        await temporalConn.close();
      } catch (err: any) {
        console.error(
          "[OutboxPoller] Temporal connection close error:",
          err?.message ?? err,
        );
      }
      console.log("[OutboxPoller] Stopped, resources released");
    },
  };
}

/**
 * 处理单条 outbox 事件（一个事务）。
 *
 * 流程：
 *   BEGIN
 *   SELECT FOR UPDATE SKIP LOCKED
 *     ↓
 *   matchAndTrigger（INSERT instance + workflow.start，workflowId 由 outbox.id 派生）
 *     ↓
 *   ┌─ 全部成功:  UPDATE processed=TRUE → COMMIT
 *   ├─ 还能重试:  UPDATE retry_count（保持 processed=FALSE）→ COMMIT
 *   └─ 终态失败:  UPDATE processed=TRUE + last_error → COMMIT
 */
async function processOneEvent(
  pg: PgClient,
  temporalClient: Client,
  outboxId: number,
): Promise<void> {
  await pg.query("BEGIN");
  let inTransaction = true;
  try {
    // 行级锁：SKIP LOCKED 保证多实例 poller 不会同时处理同一行
    const { rows } = await pg.query(
      `SELECT id, event_name, payload, retry_count
         FROM t_workflow_event_outbox
         WHERE id = $1 AND processed = FALSE
         FOR UPDATE SKIP LOCKED`,
      [outboxId],
    );

    if (rows.length === 0) {
      // 已被另一实例处理或已 processed
      await pg.query("ROLLBACK");
      inTransaction = false;
      return;
    }

    const row = rows[0];
    try {
      const matched = await matchAndTrigger(
        pg,
        temporalClient,
        Number(row.id),
        row.event_name,
        row.payload,
      );
      if (matched === 0) {
        console.warn(
          `[OutboxPoller] No workflow matched event "${row.event_name}" (id=${row.id}). Event consumed without action.`,
        );
      }

      // 全部 workflow.start 成功（含幂等成功），原子提交"已处理"
      await pg.query(
        `UPDATE t_workflow_event_outbox
            SET processed = TRUE, processed_at = NOW()
            WHERE id = $1`,
        [row.id],
      );
      await pg.query("COMMIT");
      inTransaction = false;
    } catch (err: any) {
      // workflow.start / DB 操作失败 —— 回滚后用独立事务记录 retry 状态
      await pg.query("ROLLBACK");
      inTransaction = false;

      const nextRetry = (row.retry_count ?? 0) + 1;
      const errMsg = String(err?.message ?? err);

      if (nextRetry >= MAX_RETRIES) {
        console.error(
          `[OutboxPoller] Event ${row.id} (${row.event_name}) failed ${nextRetry} times. Marking as terminal failure: ${errMsg}`,
        );
        await pg.query(
          `UPDATE t_workflow_event_outbox
              SET processed = TRUE, processed_at = NOW(), retry_count = $1, last_error = $2
              WHERE id = $3`,
          [nextRetry, errMsg, row.id],
        );
      } else {
        console.error(
          `[OutboxPoller] Event ${row.id} (${row.event_name}) failed (retry ${nextRetry}/${MAX_RETRIES}): ${errMsg}`,
        );
        await pg.query(
          `UPDATE t_workflow_event_outbox
              SET retry_count = $1, last_error = $2
              WHERE id = $3`,
          [nextRetry, errMsg, row.id],
        );
      }
    }
  } catch (txErr: any) {
    // 事务级异常（PG 连接断等）— 尝试回滚兜底
    if (inTransaction) {
      try {
        await pg.query("ROLLBACK");
      } catch {
        /* ignore */
      }
    }
    console.error(
      `[OutboxPoller] Tx error for outbox ${outboxId}:`,
      txErr?.message ?? txErr,
    );
  }
}

/**
 * 查找监听该 eventName 的 workflow 并启动执行。
 *
 * **幂等性**：workflowId 由 outboxId + workflow_id 派生为确定性值。
 * 若先前 poll 已成功 start 但事务未 COMMIT 就崩溃，本次重试时
 * Temporal 会抛 WorkflowExecutionAlreadyStartedError —— 视为幂等成功。
 *
 * @returns 实际匹配并启动（或确认已启动）的 workflow 数量；0 表示无人订阅。
 */
async function matchAndTrigger(
  pg: PgClient,
  temporalClient: Client,
  outboxId: number,
  eventName: string,
  payload: Record<string, any>,
): Promise<number> {
  // 查询所有已发布且启用的 workflow 的最新版本 DSL
  const result = await pg.query(`
    SELECT w.id AS workflow_id, wv.id AS version_id, wv.dsl
    FROM t_workflow w
    JOIN t_workflow_version wv ON wv.workflow_id = w.id
    WHERE w.status = 'PUBLISHED'
      AND w.is_enabled = true
      AND wv.version = (
        SELECT MAX(wv2.version)
        FROM t_workflow_version wv2
        WHERE wv2.workflow_id = w.id
      )
  `);

  let matchedCount = 0;

  for (const row of result.rows) {
    const dsl = row.dsl;
    if (!dsl?.nodes) continue;

    // 找 event_trigger 节点且 eventName 匹配
    const trigger = dsl.nodes.find(
      (n: any) =>
        n.type === "event_trigger" &&
        matchEventName(n.config?.eventName, eventName),
    );
    if (!trigger) continue;

    // 可选: filterExpression 过滤
    if (trigger.config?.filterExpression) {
      if (!evaluateFilter(trigger.config.filterExpression, payload)) continue;
    }

    // 确定性 workflowId（用于幂等保护）
    const deterministicWorkflowId = `wf-outbox-${outboxId}-w${row.workflow_id}`;

    // 创建（或复用）WorkflowInstance —— 用 (workflow_id, source_outbox_id) UNIQUE 做幂等 upsert。
    // 崩溃恢复路径下 (workflow_id, outboxId) 相同 → 直接拿到上次的 instanceId，不会产生孤儿行。
    // 一个 outbox 事件可被多个 workflow 订阅，每个 workflow 各自得到一条 instance。
    const instanceUpsert = await pg.query(
      `INSERT INTO t_workflow_instance
         (workflow_id, version_id, status, context, creator, source_outbox_id, created_at, updated_at)
       VALUES ($1, $2, 'PENDING', $3, 'event_trigger', $4, NOW(), NOW())
       ON CONFLICT (workflow_id, source_outbox_id) DO UPDATE
         SET updated_at = NOW()
       RETURNING id, (xmax = 0) AS inserted`,
      [row.workflow_id, row.version_id, JSON.stringify(payload), outboxId],
    );
    const instanceId = Number(instanceUpsert.rows[0].id);
    const wasInserted = instanceUpsert.rows[0].inserted as boolean;

    // 启动 Temporal workflow（幂等：同 workflowId 第二次启动会抛 AlreadyStartedError）
    let temporalRunId = deterministicWorkflowId;
    try {
      const handle = await temporalClient.workflow.start("dslWorkflow", {
        taskQueue: config.temporal.taskQueue,
        workflowId: deterministicWorkflowId,
        args: [
          {
            instanceId,
            workflowId: Number(row.workflow_id),
            versionId: Number(row.version_id),
            dsl,
            context: payload,
          },
        ],
      });
      temporalRunId = handle.workflowId;
      console.log(
        `[OutboxPoller] Triggered workflow ${row.workflow_id} for event "${eventName}", instance=${instanceId}${wasInserted ? "" : " (recovered)"}, wfId=${deterministicWorkflowId}`,
      );
    } catch (err: any) {
      if (err instanceof WorkflowExecutionAlreadyStartedError) {
        // 上一轮 start 成功但 outbox 提交前崩溃 —— 这次重放视为幂等成功
        console.warn(
          `[OutboxPoller] Workflow ${deterministicWorkflowId} already exists (recovery). outboxId=${outboxId}, instance=${instanceId}`,
        );
      } else {
        throw err; // 其它错误冒泡，触发上层 ROLLBACK + retry
      }
    }

    // 回写 temporal_run_id（即便走的是 recovery 分支也确保字段不为空）
    await pg.query(
      `UPDATE t_workflow_instance SET temporal_run_id = $1 WHERE id = $2`,
      [temporalRunId, instanceId],
    );

    matchedCount++;
  }

  return matchedCount;
}

/**
 * 匹配事件名，支持通配符：
 * - "t_order.insert" 精确匹配
 * - "t_order.*" 匹配该表所有操作
 * - "*.insert" 匹配所有表的 insert
 */
function matchEventName(pattern: string | undefined, actual: string): boolean {
  if (!pattern) return false;
  if (pattern === actual) return true;
  if (pattern === "*") return true;

  const [patTable, patOp] = pattern.split(".");
  const [actTable, actOp] = actual.split(".");

  const tableMatch = patTable === "*" || patTable === actTable;
  const opMatch = !patOp || patOp === "*" || patOp === actOp;

  return tableMatch && opMatch;
}

/**
 * 简单的 filter 表达式求值。
 */
function evaluateFilter(
  expression: string,
  payload: Record<string, any>,
): boolean {
  try {
    const fn = new Function("data", `return ${expression}`);
    return !!fn(payload);
  } catch {
    return true;
  }
}
