/**
 * Task Manager — persistent task queue backed by SQLite.
 * Tasks survive server restarts. Background runner picks up queued tasks.
 */

import { randomUUID } from 'node:crypto';
import { spawn, execSync } from 'node:child_process';
import { realpathSync, existsSync } from 'node:fs';
import { join as pathJoin } from 'node:path';
import { getConfigDir } from './dirs';
import * as pty from 'node-pty';
import { getDb } from '../src/core/db/database';
import { getDbPath } from '../src/config';
import { loadSettings } from './settings';
import { notifyTaskComplete, notifyTaskFailed } from './notify';
import { getInstalledConnector } from './connectors/registry';
import { getAgent } from './agents';
import { recordUsage } from './usage-scanner';
import type { Task, TaskLogEntry, TaskStatus, TaskMode, WatchConfig } from '../src/types';

import { toIsoUTC } from './iso-time';
import { executeTmuxTask, killTmuxTaskSession } from './task-tmux-backend';
import { ownedKill } from './safe-reclaim';
import { tmux, tmuxAllowFail } from './safe-exec';
import { installForgeStopHook } from './workspace/skill-installer';

let _tmuxHookInstalled = false;

/** Access pipeline.ts's pipelineTaskIds Set via the shared globalThis
 *  Symbol it registers at module load. Avoids the circular static
 *  import (pipeline.ts → task-manager). Returns empty Set if pipeline.ts
 *  hasn't initialized — caller treats as "not a pipeline task". */
function pipelineTaskIdsRef(): Set<string> {
  const key = Symbol.for('mw-pipeline-task-ids');
  return (globalThis as any)[key] || new Set<string>();
}

const runnerKey = Symbol.for('mw-task-runner');
const gRunner = globalThis as any;
if (!gRunner[runnerKey]) gRunner[runnerKey] = { runner: null, currentTaskId: null };
const runnerState: { runner: ReturnType<typeof setInterval> | null; currentTaskId: string | null } = gRunner[runnerKey];

// Per-project concurrency: track which projects have a running prompt task
const runningProjects = new Set<string>();

// Event listeners for real-time updates
type TaskListener = (taskId: string, event: 'log' | 'status', data?: any) => void;
const listeners = new Set<TaskListener>();

export function onTaskEvent(fn: TaskListener): () => void {
  listeners.add(fn);
  return () => listeners.delete(fn);
}

function emit(taskId: string, event: 'log' | 'status', data?: any) {
  for (const fn of listeners) {
    try { fn(taskId, event, data); } catch {}
  }
}

function db() {
  return getDb(getDbPath());
}

// Per-task model overrides — top of the model-resolution chain.
export const taskModelOverrides = new Map<string, string>();

/**
 * Per-task append-system-prompt overrides. Used by Forge job/pipeline
 * to point the agent at specific skills for this run. Keyed by task
 * id; lives in-process (re-applied via the pipeline state on restart
 * since we re-create the task there).
 */
export const taskAppendSystemPromptOverrides = new Map<string, string>();

/**
 * Per-task extra CLI flags (in-process, keyed by task id). Used by kanban to
 * pass `--mcp-config <forge-only> --strict-mcp-config` so a card's one-shot
 * agent skips the scratch project's full MCP set (chrome etc.) and only loads
 * the Forge MCP — much faster cold start. Best-effort: if lost on restart the
 * task just falls back to the project's .mcp.json.
 */
export const taskExtraFlagsOverrides = new Map<string, string[]>();

/**
 * Skill names attached to a task (parallel to the append-system-prompt
 * override above). Purely for observability — surfaced in the task's
 * init log so a pipeline run shows which skills it loaded.
 */
export const taskSkillsOverrides = new Map<string, string[]>();

/** ` | Skills: a, b` for the init log, or '' when none attached. */
function skillsLogSuffix(taskId: string): string {
  const s = taskSkillsOverrides.get(taskId);
  return s && s.length ? ` | Skills: ${s.join(', ')}` : '';
}

// ─── CRUD ────────────────────────────────────────────────────

export function createTask(opts: {
  projectName: string;
  projectPath: string;
  prompt: string;
  mode?: TaskMode;
  priority?: number;
  conversationId?: string;  // Explicit override; otherwise auto-inherits from project
  scheduledAt?: string;     // ISO timestamp — task won't run until this time
  watchConfig?: WatchConfig;
  agent?: string;           // Agent ID (default: from settings)
  backend?: 'tmux';         // 'tmux' = run in dedicated tmux session; omit = default headless
}): Task {
  const id = randomUUID().slice(0, 8);
  const mode = opts.mode || 'prompt';
  const agent = opts.agent || '';

  // For prompt mode: auto-inherit conversation_id
  // For monitor mode: conversationId is required (the session to watch)
  const convId = opts.conversationId === ''
    ? null
    : (opts.conversationId || (mode === 'prompt' ? getProjectConversationId(opts.projectName) : null));

  db().prepare(`
    INSERT INTO tasks (id, project_name, project_path, prompt, mode, status, priority, conversation_id, log, scheduled_at, watch_config, agent, backend)
    VALUES (?, ?, ?, ?, ?, 'queued', ?, ?, '[]', ?, ?, ?, ?)
  `).run(
    id, opts.projectName, opts.projectPath, opts.prompt, mode,
    opts.priority || 0, convId || null, opts.scheduledAt || null,
    opts.watchConfig ? JSON.stringify(opts.watchConfig) : null,
    agent || null,
    opts.backend || null,
  );

  // Kick the runner
  ensureRunnerStarted();

  return getTask(id)!;
}

/**
 * Get the most recent conversation_id for a project.
 * This allows all tasks for the same project to share one Claude session.
 */
export function getProjectConversationId(projectName: string): string | null {
  const row = db().prepare(`
    SELECT conversation_id FROM tasks
    WHERE project_name = ? AND conversation_id IS NOT NULL AND status = 'done'
    ORDER BY completed_at DESC LIMIT 1
  `).get(projectName) as any;
  return row?.conversation_id || null;
}

export function getTask(id: string): Task | null {
  const row = db().prepare('SELECT * FROM tasks WHERE id = ?').get(id) as any;
  if (!row) return null;
  return rowToTask(row);
}

/**
 * Reconcile orphaned 'running' tasks. Tasks spawn child processes
 * owned by next-server (lib/claude-process); when next-server exits
 * (forge restart / crash / stop), those processes die but the DB row
 * stays at status='running' forever. Result: Activity panel /
 * /api/activity/summary keeps showing zombie tasks the user never
 * started; new dispatches can collide with stuck project locks.
 *
 * Called once at init.ts startup. Any row still showing 'running' is
 * by definition orphaned — its parent next-server process is gone
 * (otherwise we wouldn't be in startup). Mark all as failed with a
 * clear error so the user knows it was a restart, not a real failure.
 *
 * Idempotent — second run finds zero rows to update.
 */
export function reconcileOrphanedTasks(): number {
  const r = db().prepare(`
    UPDATE tasks
       SET status = 'failed',
           error = COALESCE(NULLIF(error, ''), 'orphaned by server restart — task process did not survive restart'),
           completed_at = datetime('now')
     WHERE status IN ('running', 'queued')
  `).run();
  const n = (r.changes as number) || 0;
  if (n > 0) {
    console.log(`[task-manager] reconciled ${n} orphaned task(s) (running→failed)`);
  }
  return n;
}

export function listTasks(status?: TaskStatus): Task[] {
  let query = 'SELECT * FROM tasks';
  const params: string[] = [];
  if (status) {
    query += ' WHERE status = ?';
    params.push(status);
  }
  query += ' ORDER BY created_at DESC';
  const rows = db().prepare(query).all(...params) as any[];
  return rows.map(rowToTask);
}

// Slim list — omits the heavy fields (log / git_diff / result_summary) so the
// task list view doesn't pull megabytes of JSON for tasks with long sessions.
// Callers that need the full body should fetch /api/tasks/[id] on demand.
export function listTasksLite(status?: TaskStatus): Task[] {
  const SLIM_COLS = `
    id, project_name, project_path, prompt, mode, status, priority,
    conversation_id, watch_config, git_branch, cost_usd, error, agent, backend,
    created_at, started_at, completed_at, scheduled_at,
    length(log) AS log_size,
    CASE WHEN result_summary IS NULL THEN NULL ELSE substr(result_summary, 1, 1024) END AS result_summary,
    CASE WHEN git_diff IS NULL THEN 0 ELSE 1 END AS has_git_diff
  `;
  let query = `SELECT ${SLIM_COLS} FROM tasks`;
  const params: string[] = [];
  if (status) {
    query += ' WHERE status = ?';
    params.push(status);
  }
  query += ' ORDER BY created_at DESC';
  const rows = db().prepare(query).all(...params) as any[];
  return rows.map(r => rowToLiteTask(r));
}

// Slice the log without parsing the whole JSON in JS — sqlite JSON1's
// json_each walks the array and we LIMIT/OFFSET in SQL. For a 1.6 MB log
// with 238 entries this returns just the requested ~100 in milliseconds.
export function getTaskLogSlice(id: string, opts: { offset?: number; limit?: number; truncate?: number } = {}):
  { entries: (TaskLogEntry & { _truncated?: number; _index?: number })[]; total: number } {
  const totalRow = db().prepare(
    `SELECT COALESCE(json_array_length(log), 0) AS n FROM tasks WHERE id = ?`
  ).get(id) as { n: number } | undefined;
  const total = totalRow?.n ?? 0;
  if (total === 0) return { entries: [], total: 0 };

  const limit = Math.max(1, Math.min(opts.limit ?? 200, 2000));
  const offset = Math.max(0, opts.offset ?? Math.max(0, total - limit));
  const truncate = opts.truncate ?? 8192;   // per-entry content cap; 0 = no cap
  const rows = db().prepare(
    `SELECT json_each.key AS idx, value FROM tasks, json_each(tasks.log)
     WHERE tasks.id = ?
     ORDER BY json_each.key
     LIMIT ? OFFSET ?`
  ).all(id, limit, offset) as { idx: number; value: string }[];

  const entries = rows.map(r => {
    let entry: TaskLogEntry & { _truncated?: number; _index?: number };
    try { entry = JSON.parse(r.value); }
    catch { entry = { type: 'system', content: '<unparseable entry>' } as any; }
    entry._index = r.idx;
    if (truncate > 0 && typeof entry.content === 'string' && entry.content.length > truncate) {
      const fullLen = entry.content.length;
      entry.content = entry.content.slice(0, truncate);
      entry._truncated = fullLen;
    }
    return entry;
  });
  return { entries, total };
}

// Single entry by index — used to "show full" a previously truncated entry.
export function getTaskLogEntry(id: string, index: number): TaskLogEntry | null {
  const row = db().prepare(
    `SELECT value FROM tasks, json_each(tasks.log)
     WHERE tasks.id = ? AND json_each.key = ?`
  ).get(id, index) as { value: string } | undefined;
  if (!row) return null;
  try { return JSON.parse(row.value); } catch { return null; }
}

// Fetch only the heavy fields by id (used when the client needs them after
// having gotten the lite list earlier).
export function getTaskBody(id: string): { resultSummary?: string; gitDiff?: string; error?: string } | null {
  const row = db().prepare(
    `SELECT result_summary, git_diff, error FROM tasks WHERE id = ?`
  ).get(id) as any;
  if (!row) return null;
  return {
    resultSummary: row.result_summary || undefined,
    gitDiff: row.git_diff || undefined,
    error: row.error || undefined,
  };
}

function rowToLiteTask(row: any): Task {
  return {
    id: row.id,
    projectName: row.project_name,
    projectPath: row.project_path,
    prompt: row.prompt,
    mode: row.mode || 'prompt',
    status: row.status,
    priority: row.priority,
    conversationId: row.conversation_id || undefined,
    watchConfig: row.watch_config ? JSON.parse(row.watch_config) : undefined,
    log: [],                                              // slim — fetch detail separately
    resultSummary: row.result_summary || undefined,      // first 1KB only
    gitDiff: undefined,                                   // not loaded
    gitBranch: row.git_branch || undefined,
    costUSD: row.cost_usd || undefined,
    error: row.error || undefined,
    createdAt: toIsoUTC(row.created_at) ?? row.created_at,
    startedAt: toIsoUTC(row.started_at) ?? undefined,
    completedAt: toIsoUTC(row.completed_at) ?? undefined,
    scheduledAt: toIsoUTC(row.scheduled_at) ?? undefined,
    agent: row.agent || undefined,
    backend: row.backend || undefined,
    logSize: row.log_size || 0,
    hasGitDiff: !!row.has_git_diff,
  } as Task;
}

export function cancelTask(id: string): boolean {
  const task = getTask(id);
  if (!task) return false;
  if (task.status === 'done' || task.status === 'failed') return false;

  // Cancel monitor tasks
  if (task.mode === 'monitor' && activeMonitors.has(id)) {
    cancelMonitor(id);
    return true;
  }

  // Kill tmux session for tmux-backend tasks
  if ((task as any).backend === 'tmux') {
    killTmuxTaskSession(id);
  }

  updateTaskStatus(id, 'cancelled');

  // Clean up project lock if this was a running prompt task
  if (task.status === 'running') {
    runningProjects.delete(task.projectName);
  }

  return true;
}

export function deleteTask(id: string): boolean {
  const task = getTask(id);
  if (!task) return false;
  if (task.status === 'running') cancelTask(id);
  // Always attempt cleanup for tmux tasks (session may be alive for debugging)
  if ((task as any).backend === 'tmux') killTmuxTaskSession(id);
  db().prepare('DELETE FROM tasks WHERE id = ?').run(id);
  return true;
}

/** Bulk-delete completed tasks older than the given cutoff. Skips
 *  anything currently running so we never yank state from under the
 *  task runner. Returns the count removed. */
export interface BulkDeleteTasksFilter {
  /** ISO timestamp; rows with created_at < this are eligible. */
  before: string;
  /** Statuses to include. Default ['done','failed','cancelled']. */
  statuses?: TaskStatus[];
}

export function bulkDeleteTasks(filter: BulkDeleteTasksFilter): number {
  const statuses = filter.statuses && filter.statuses.length
    ? filter.statuses
    : (['done', 'failed', 'cancelled'] as TaskStatus[]);
  const placeholders = statuses.map(() => '?').join(',');
  const r = db().prepare(
    `DELETE FROM tasks
       WHERE status IN (${placeholders})
         AND created_at < ?`,
  ).run(...statuses, filter.before);
  return r.changes;
}

export function updateTask(id: string, updates: { prompt?: string; projectName?: string; projectPath?: string; priority?: number; scheduledAt?: string; restart?: boolean }): Task | null {
  const task = getTask(id);
  if (!task) return null;

  // If running, cancel first
  if (task.status === 'running') cancelTask(id);

  const fields: string[] = [];
  const values: any[] = [];
  if (updates.prompt !== undefined) { fields.push('prompt = ?'); values.push(updates.prompt); }
  if (updates.projectName !== undefined) { fields.push('project_name = ?'); values.push(updates.projectName); }
  if (updates.projectPath !== undefined) { fields.push('project_path = ?'); values.push(updates.projectPath); }
  if (updates.priority !== undefined) { fields.push('priority = ?'); values.push(updates.priority); }
  if (updates.scheduledAt !== undefined) { fields.push('scheduled_at = ?'); values.push(updates.scheduledAt || null); }

  // Reset to queued so it runs again
  if (updates.restart) {
    fields.push("status = 'queued'", 'started_at = NULL', 'completed_at = NULL', 'error = NULL', "log = '[]'", 'result_summary = NULL', 'git_diff = NULL', 'cost_usd = NULL');
  }

  if (fields.length === 0) return task;

  values.push(id);
  db().prepare(`UPDATE tasks SET ${fields.join(', ')} WHERE id = ?`).run(...values);

  if (updates.restart) ensureRunnerStarted();

  return getTask(id);
}

export function retryTask(id: string): Task | null {
  const task = getTask(id);
  if (!task) return null;
  if (task.status !== 'failed' && task.status !== 'cancelled') return null;

  // Create a new task with same params (including agent)
  return createTask({
    projectName: task.projectName,
    projectPath: task.projectPath,
    prompt: task.prompt,
    priority: task.priority,
    agent: (task as any).agent || undefined,
  });
}

/**
 * Complete a tmux task whose in-memory waiter was lost (e.g. server restart).
 * Called by the hook endpoint fallback path in task-tmux-backend.
 */
export function finishTmuxTask(id: string, paneCapture: string): void {
  const summary = paneCapture.slice(0, 2048);
  db().prepare("UPDATE tasks SET status = 'done', result_summary = ?, completed_at = datetime('now') WHERE id = ? AND status = 'running'").run(summary, id);
  runningProjects.delete(getTask(id)?.projectName || '');
  appendLog(id, { type: 'result', content: paneCapture, timestamp: new Date().toISOString() });
  emit(id, 'status', 'done');
  const doneTask = getTask(id);
  if (doneTask) notifyTaskComplete(doneTask).catch(() => {});
}

// ─── Background Runner ───────────────────────────────────────

export function ensureRunnerStarted() {
  if (runnerState.runner) return;
  runnerState.runner = setInterval(processNextTask, 3000);
  // Also try immediately
  processNextTask();
}

export function stopRunner() {
  if (runnerState.runner) {
    clearInterval(runnerState.runner);
    runnerState.runner = null;
  }
}

async function processNextTask() {
  // Find all queued tasks ready to run
  const queued = db().prepare(`
    SELECT * FROM tasks WHERE status = 'queued'
    AND (scheduled_at IS NULL OR replace(replace(scheduled_at, 'T', ' '), 'Z', '') <= datetime('now'))
    ORDER BY priority DESC, created_at ASC
  `).all() as any[];

  for (const next of queued) {
    const task = rowToTask(next);

    if (task.mode === 'monitor') {
      // Monitor tasks run in background, don't block the runner
      startMonitorTask(task);
      continue;
    }

    // Skip if this project already has a running prompt task
    if (runningProjects.has(task.projectName)) continue;

    // Run this task
    runningProjects.add(task.projectName);
    runnerState.currentTaskId = task.id;

    // Execute async — don't await so we can process tasks for other projects in parallel
    executeTask(task)
      .catch((err: any) => {
        // Verbose diagnostic dump — the stack alone hid an issue where
        // err.stack was undefined (zombie tsx-runner-process throws had
        // no stack); name/typeof/keys helped pin it down. Keep verbose.
        const stack = err?.stack || err?.message || String(err);
        console.error(`[task-runner] executeTask failed for ${task.id}:`, JSON.stringify({ msg: err?.message, name: err?.name, stack: err?.stack, str: String(err), type: typeof err, keys: err && Object.keys(err) }));
        appendLog(task.id, { type: 'system', subtype: 'error', content: stack, timestamp: new Date().toISOString() });
        updateTaskStatus(task.id, 'failed', err?.message || String(err));
      })
      .finally(() => {
        runningProjects.delete(task.projectName);
        if (runnerState.currentTaskId === task.id) runnerState.currentTaskId = null;
      });
  }
}

/**
 * Surface installed-connector PATs as env vars to shell tasks. Lets
 * pipeline shell steps (push-and-mr, gh-issue-fix, ...) use the same
 * credential the user typed into Settings → Connectors instead of
 * relying on `glab auth login` / `gh auth login` having been run
 * separately (and not revoked).
 *
 * Only well-known connectors map to env vars. Token never leaves the
 * server — the bash child inherits the env directly.
 */
export function connectorEnv(): Record<string, string> {
  try {
    const out: Record<string, string> = {};
    const gitlab = getInstalledConnector('gitlab');
    if (gitlab?.enabled) {
      const tok = typeof gitlab.config?.token === 'string' ? gitlab.config.token.trim() : '';
      if (tok) {
        // glab CLI honours GITLAB_TOKEN; HTTP libs honour CI_JOB_TOKEN-style names too,
        // but GITLAB_TOKEN covers glab + most curl-based scripts in our pipelines.
        out.GITLAB_TOKEN = tok;
        let httpsBase = '';
        let httpsHost = '';
        if (typeof gitlab.config?.base_url === 'string' && gitlab.config.base_url) {
          try {
            const u = new URL(String(gitlab.config.base_url));
            out.GITLAB_URI = u.origin;
            httpsBase = u.origin.replace(/\/+$/, '');
            httpsHost = u.host;
          } catch {}
        }
        // Transparent SSH→HTTPS rewrite for pipeline `git push`. Without this,
        // a user-cloned `git@host:owner/repo.git` origin pushes over SSH and
        // hits 2fa_verify even though we have a PAT. Per-process via git's
        // GIT_CONFIG_COUNT/KEY/VALUE env triplet (git ≥ 2.31) — no write to
        // ~/.gitconfig or the repo, no SSH leak when token is absent.
        if (httpsBase && httpsHost) {
          const aliases: string[] = Array.isArray(gitlab.config?.ssh_aliases)
            ? gitlab.config.ssh_aliases.filter((s: unknown) => typeof s === 'string' && s.trim()).map((s: string) => s.trim())
            : [];
          const hosts = Array.from(new Set([httpsHost, ...aliases]));
          const target = `${httpsBase}/`.replace(/^https?:\/\//, (m) => `${m}oauth2:${encodeURIComponent(tok)}@`);
          const rules: Array<[string, string]> = [];
          for (const h of hosts) {
            rules.push([`url.${target}.insteadOf`, `git@${h}:`]);
            rules.push([`url.${target}.insteadOf`, `ssh://git@${h}/`]);
          }
          if (rules.length > 0) {
            out.GIT_CONFIG_COUNT = String(rules.length);
            rules.forEach(([k, v], i) => {
              out[`GIT_CONFIG_KEY_${i}`] = k;
              out[`GIT_CONFIG_VALUE_${i}`] = v;
            });
          }
        }
      }
    }
    const gh = getInstalledConnector('github-api');
    if (gh?.enabled) {
      const tok = typeof gh.config?.token === 'string' ? gh.config.token.trim() : '';
      if (tok) {
        out.GITHUB_TOKEN = tok;
        out.GH_TOKEN = tok; // gh CLI honours either; set both for safety.
      }
    }
    return out;
  } catch {
    return {};
  }
}

function executeTmuxBackendTask(task: Task): Promise<void> {
  // Ensure Stop hook is installed (idempotent; workspace daemon does this too,
  // but tmux tasks may run without the workspace daemon active)
  if (!_tmuxHookInstalled) {
    _tmuxHookInstalled = true;
    try { installForgeStopHook(Number(process.env.PORT) || 8403); } catch {}
  }

  updateTaskStatus(task.id, 'running');
  db().prepare('UPDATE tasks SET started_at = datetime(\'now\') WHERE id = ?').run(task.id);
  appendLog(task.id, { type: 'system', subtype: 'init', content: `Agent: ${(task as any).agent || 'claude'} | Backend: tmux${skillsLogSuffix(task.id)}`, timestamp: new Date().toISOString() });

  return executeTmuxTask(task, {
    appendLog: (entry) => appendLog(task.id, entry),
    isCancelled: () => getTask(task.id)?.status === 'cancelled',
    setStatus: (status, detail) => {
      if (status === 'done') {
        updateTaskStatus(task.id, 'done');
        if (detail?.resultSummary) db().prepare('UPDATE tasks SET result_summary = ? WHERE id = ?').run(detail.resultSummary.slice(0, 2048), task.id);
        if (detail?.costUSD) db().prepare('UPDATE tasks SET cost_usd = ? WHERE id = ?').run(detail.costUSD, task.id);
        runningProjects.delete(task.projectName);
        const doneTask = getTask(task.id); if (doneTask) notifyTaskComplete(doneTask).catch(() => {});
      } else if (status === 'failed') {
        updateTaskStatus(task.id, 'failed', detail?.error);
        if (detail?.costUSD) db().prepare('UPDATE tasks SET cost_usd = ? WHERE id = ?').run(detail.costUSD, task.id);
        runningProjects.delete(task.projectName);
        const failedTask = getTask(task.id); if (failedTask) notifyTaskFailed(failedTask).catch(() => {});
      } else {
        updateTaskStatus(task.id, 'cancelled');
        runningProjects.delete(task.projectName);
      }
      emit(task.id, 'status');
    },
  }).catch((err) => {
    updateTaskStatus(task.id, 'failed', err?.message || String(err));
    runningProjects.delete(task.projectName);
    emit(task.id, 'status');
  });
}

function executeShellTask(task: Task): Promise<void> {
  return new Promise((resolve) => {
    updateTaskStatus(task.id, 'running');
    db().prepare('UPDATE tasks SET started_at = datetime(\'now\') WHERE id = ?').run(task.id);
    console.log(`[task:shell] ${task.projectName}: "${task.prompt.slice(0, 80)}"`);

    // Use an absolute path: when the parent's PATH is stripped (which has
    // happened under certain supervisor relaunches), bare 'bash' becomes
    // ENOENT, the 'error' event fires, and without the listener below it
    // bubbles up to the process as an uncaughtException — crashing the
    // entire Next.js worker and triggering a supervisor restart loop.
    const shell = process.env.SHELL && process.env.SHELL.endsWith('bash') ? process.env.SHELL : '/bin/bash';
    const child = spawn(shell, ['-c', task.prompt], {
      cwd: task.projectPath,
      env: {
        ...process.env,
        PATH: process.env.PATH || '/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin',
        ...connectorEnv(),
      },
      stdio: ['ignore', 'pipe', 'pipe'],
    });

    let stdout = '';
    let stderr = '';
    let resolved = false;
    const finish = (status: 'done' | 'failed', summary: string, error?: string) => {
      if (resolved) return;
      resolved = true;
      if (status === 'done') {
        db().prepare('UPDATE tasks SET status = ?, result_summary = ?, completed_at = datetime(\'now\') WHERE id = ?')
          .run('done', summary, task.id);
        emit(task.id, 'status', 'done');
      } else {
        db().prepare('UPDATE tasks SET status = ?, error = ?, completed_at = datetime(\'now\') WHERE id = ?')
          .run('failed', error || summary || 'unknown error', task.id);
        emit(task.id, 'status', 'failed');
      }
      resolve();
    };

    // CRITICAL: without this listener, spawn errors (ENOENT, EACCES, …)
    // become uncaughtException and crash the worker.
    child.on('error', (err: NodeJS.ErrnoException) => {
      const msg = `spawn failed: ${err.code || ''} ${err.message}`.trim();
      console.error(`[task:shell] ${task.id} ${msg}`);
      appendLog(task.id, { type: 'system', subtype: 'text', content: msg + '\n', timestamp: new Date().toISOString() });
      finish('failed', '', msg);
    });

    child.stdout?.on('data', (chunk: Buffer) => {
      const text = chunk.toString();
      stdout += text;
      appendLog(task.id, { type: 'system', subtype: 'text', content: text, timestamp: new Date().toISOString() });
    });
    child.stderr?.on('data', (chunk: Buffer) => { stderr += chunk.toString(); });

    child.on('exit', (code) => {
      if (code === 0) finish('done', stdout.trim());
      else            finish('failed', '', stderr.trim() || `Exit code ${code}`);
    });
  });
}

function executeTask(task: Task): Promise<void> {
  if (task.mode === 'shell') return executeShellTask(task);
  if ((task as any).backend === 'tmux') return executeTmuxBackendTask(task);

  return new Promise((resolve, reject) => {
    const settings = loadSettings();
    const agentId = (task as any).agent || settings.defaultAgent || 'claude';
    const adapter = getAgent(agentId);

    // Model priority: per-task override > agent scene model > 'default'
    // (agent picks its own default). "default" means "no override".
    const agentCfg = settings.agents?.[agentId];
    const agentModel = agentCfg?.models?.task;
    const effectiveAgentModel = agentModel && agentModel !== 'default' ? agentModel : null;
    const model = taskModelOverrides.get(task.id) || effectiveAgentModel || 'default';
    const supportsModel = adapter.config.capabilities?.supportsModel;
    const spawnOpts = adapter.buildTaskSpawn({
      projectPath: task.projectPath,
      prompt: task.prompt,
      model: supportsModel && model && model !== 'default' ? model : undefined,
      conversationId: task.conversationId || undefined,
      skipPermissions: true,
      outputFormat: adapter.config.capabilities?.supportsStreamJson ? 'stream-json' : undefined,
      appendSystemPrompt: taskAppendSystemPromptOverrides.get(task.id),
      extraFlags: taskExtraFlagsOverrides.get(task.id),
    });
    taskExtraFlagsOverrides.delete(task.id); // consumed once at spawn

    // Surface connector PATs (GITLAB_TOKEN, GITHUB_TOKEN, …) so the
    // child's Bash invocations (claude's Bash tool, gh/glab shells)
    // pick them up automatically. Same logic as executeShellTask.
    const env = { ...process.env, ...connectorEnv(), ...(spawnOpts.env || {}) };
    delete env.CLAUDECODE;

    // Corporate SSL: if the server was launched without NODE_EXTRA_CA_CERTS but
    // a CA bundle exists at <configDir>/corporate-ca.pem, wire it through so the
    // spawned CLI doesn't die with "SSL certificate verification failed".
    if (!env.NODE_EXTRA_CA_CERTS) {
      const corpCa = pathJoin(getConfigDir(), 'corporate-ca.pem');
      if (existsSync(corpCa)) env.NODE_EXTRA_CA_CERTS = corpCa;
    }

    updateTaskStatus(task.id, 'running');
    db().prepare('UPDATE tasks SET started_at = datetime(\'now\') WHERE id = ?').run(task.id);

    const agentName = adapter.config.name || agentId;
    console.log(`[task] ${task.projectName} [${agentName}${supportsModel && model ? '/' + model : ''}]: "${task.prompt.slice(0, 60)}..."`);

    // Log agent info as first entry
    appendLog(task.id, { type: 'system', subtype: 'init', content: `Agent: ${agentName}${supportsModel && model && model !== 'default' ? ` | Model: ${model}` : ''}${skillsLogSuffix(task.id)}`, timestamp: new Date().toISOString() });

    const needsTTY = adapter.config.capabilities?.requiresTTY;
    let child: any;
    let ptyProcess: any = null;

    if (needsTTY) {
      // Use node-pty for agents that require a terminal environment.
      // pty is imported at top level (was a runtime require() that
      // hit "require is not defined" under ESM + concurrent loads).
      ptyProcess = pty.spawn(spawnOpts.cmd, spawnOpts.args, {
        name: 'xterm-256color',
        cols: 120,
        rows: 40,
        cwd: task.projectPath,
        env,
      });
      // Strip terminal control codes from PTY output for clean logging
      const stripAnsi = (s: string) => s
        .replace(/\x1b\[[0-9;?]*[a-zA-Z]/g, '')   // CSI sequences
        .replace(/\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)/g, '') // OSC sequences
        .replace(/\x1b[()][0-9A-B]/g, '')           // charset
        .replace(/\x1b[=>]/g, '')                    // keypad
        .replace(/\r/g, '')                          // carriage return
        .replace(/\x07/g, '');                       // bell

      // Auto-kill PTY after idle (interactive agents don't exit on their own)
      let ptyBytes = 0;
      let ptyIdleTimer: any = null;
      const PTY_IDLE_MS = 15000; // 15s idle = done

      // Create a child-like interface for pty
      let exitCb: Function | null = null;

      ptyProcess.onData((data: string) => {
        const clean = stripAnsi(data);
        ptyBytes += clean.length;
        if (dataCallback) dataCallback(Buffer.from(clean));
        // Reset idle timer
        if (ptyIdleTimer) clearTimeout(ptyIdleTimer);
        if (ptyBytes > 500) {
          ptyIdleTimer = setTimeout(() => {
            console.log(`[task] PTY idle timeout — killing process (${ptyBytes} bytes received)`);
            try { ownedKill(ptyProcess); } catch {}
          }, PTY_IDLE_MS);
        }
      });

      ptyProcess.onExit(({ exitCode }: any) => {
        if (ptyIdleTimer) clearTimeout(ptyIdleTimer);
        if (exitCb) exitCb(exitCode, null);
      });

      let dataCallback: Function | null = null;
      child = {
        stdout: { on: (evt: string, cb: Function) => { if (evt === 'data') dataCallback = cb; } },
        stderr: { on: (_evt: string, _cb: Function) => {} },
        on: (evt: string, cb: Function) => { if (evt === 'exit') exitCb = cb; if (evt === 'error') {} },
        kill: (sig: string) => { if (ptyIdleTimer) clearTimeout(ptyIdleTimer); try { ownedKill(ptyProcess, sig as NodeJS.Signals); } catch {} },
        stdin: null,
        pid: ptyProcess.pid,
      };
    } else {
      child = spawn(spawnOpts.cmd, spawnOpts.args, {
        cwd: task.projectPath,
        env,
        stdio: ['pipe', 'pipe', 'pipe'],
      });
      child.stdin?.end();
    }

    let buffer = '';
    let resultText = '';
    let totalCost = 0;
    let sessionId = '';
    let modelUsed = '';
    let totalInputTokens = 0;
    let totalOutputTokens = 0;

    child.on('error', (err: any) => {
      const stack = err?.stack || err?.message || String(err);
      console.error(`[task-runner] Spawn error for ${task.id}:`, stack);
      appendLog(task.id, { type: 'system', subtype: 'error', content: `[spawn-error] ${stack}`, timestamp: new Date().toISOString() });
      updateTaskStatus(task.id, 'failed', err?.message || String(err));
      reject(err);
    });

    child.stdout?.on('data', (data: Buffer) => {
      // stdout chunk processing (silent)

      // Check if cancelled
      if (getTask(task.id)?.status === 'cancelled') {
        ownedKill(child, 'SIGTERM');
        return;
      }

      buffer += data.toString();
      const lines = buffer.split('\n');
      buffer = lines.pop() || '';

      for (const line of lines) {
        if (!line.trim()) continue;
        let jsonParsed = false;
        try {
          const parsed = JSON.parse(line);
          jsonParsed = true;
          const entries = parseStreamJson(parsed);
          for (const entry of entries) {
            // Skip Claude's Model init line for non-claude agents (we already logged our own)
            if (entry.subtype === 'init' && agentId !== 'claude' && entry.content?.startsWith('Model:')) continue;
            appendLog(task.id, entry);
          }

          if (parsed.session_id) sessionId = parsed.session_id;
          if (parsed.type === 'system' && parsed.subtype === 'init' && parsed.model) {
            modelUsed = parsed.model;
          }
          // Accumulate token usage from assistant messages
          if (parsed.type === 'assistant' && parsed.message?.usage) {
            totalInputTokens += parsed.message.usage.input_tokens || 0;
            totalOutputTokens += parsed.message.usage.output_tokens || 0;
          }
          if (parsed.type === 'result') {
            resultText = typeof parsed.result === 'string' ? parsed.result : JSON.stringify(parsed.result);
            totalCost = parsed.total_cost_usd || 0;
            if (parsed.total_input_tokens) totalInputTokens = parsed.total_input_tokens;
            if (parsed.total_output_tokens) totalOutputTokens = parsed.total_output_tokens;
          }
        } catch {
          // Non-JSON output (generic agents) — log as raw text
          if (!jsonParsed) {
            resultText += (resultText ? '\n' : '') + line;
            appendLog(task.id, { type: 'system', subtype: 'text', content: line, timestamp: new Date().toISOString() });
          }
        }
      }
    });

    child.stderr?.on('data', (data: Buffer) => {
      const text = data.toString().trim();
      // stderr logged to task log only
      if (text) {
        appendLog(task.id, { type: 'system', subtype: 'error', content: text, timestamp: new Date().toISOString() });
      }
    });

    child.on('exit', (code: any, signal: any) => {
      // Process exit handled below
      // Process remaining buffer
      if (buffer.trim()) {
        try {
          const parsed = JSON.parse(buffer);
          const entries = parseStreamJson(parsed);
          for (const entry of entries) appendLog(task.id, entry);
          if (parsed.type === 'result') {
            resultText = typeof parsed.result === 'string' ? parsed.result : JSON.stringify(parsed.result);
            totalCost = parsed.total_cost_usd || 0;
          }
        } catch {}
      }

      // Save conversation ID for follow-up
      if (sessionId) {
        db().prepare('UPDATE tasks SET conversation_id = ? WHERE id = ?').run(sessionId, task.id);
      }

      // Capture git diff. Three-way fallback so pipeline nodes (which commit
      // their own work in a worktree) don't end up with an empty diff:
      //   1. `git diff HEAD`        — uncommitted changes (interactive tasks)
      //   2. `git diff @{u}..HEAD`  — commits ahead of upstream (worktree pushed nothing yet)
      //   3. `git show HEAD`        — last commit's diff (fallback when no upstream tracking)
      try {
        const tryDiff = (cmd: string): string => {
          try {
            return execSync(cmd, { cwd: task.projectPath, timeout: 5000, stdio: ['ignore', 'pipe', 'ignore'] }).toString();
          } catch { return ''; }
        };
        let diff = tryDiff('git diff HEAD');
        if (!diff.trim()) diff = tryDiff('git diff @{upstream}..HEAD --no-color');
        if (!diff.trim()) diff = tryDiff('git show HEAD --no-color');
        if (diff.trim()) {
          db().prepare('UPDATE tasks SET git_diff = ? WHERE id = ?').run(diff, task.id);
        }
      } catch {}

      const currentStatus = getTask(task.id)?.status;
      if (currentStatus === 'cancelled') {
        resolve();
        return;
      }

      if (code === 0) {
        db().prepare(`
          UPDATE tasks SET status = 'done', result_summary = ?, cost_usd = ?, completed_at = datetime('now')
          WHERE id = ?
        `).run(resultText, totalCost, task.id);
        emit(task.id, 'status', 'done');
        console.log(`[task] Done: ${task.id} ${task.projectName} (cost: $${totalCost?.toFixed(4) || '0'}, ${totalInputTokens}in/${totalOutputTokens}out)`);
        // Record usage
        try {
          let isPipeline = false;
          isPipeline = pipelineTaskIdsRef().has(task.id);
          recordUsage({
            sessionId: sessionId || task.id,
            source: isPipeline ? 'pipeline' : 'task',
            projectPath: task.projectPath,
            projectName: task.projectName,
            model: modelUsed || 'unknown',
            inputTokens: totalInputTokens,
            outputTokens: totalOutputTokens,
            taskId: task.id,
          });
        } catch {}
        const doneTask = getTask(task.id);
        if (doneTask) notifyTaskComplete(doneTask).catch(() => {});
        notifyTerminalSession(task, 'done', sessionId);
        resolve();
      } else {
        const errMsg = `Process exited with code ${code}`;
        console.error(`[task] Failed: ${task.id} ${task.projectName} — ${errMsg}`);
        updateTaskStatus(task.id, 'failed', errMsg);
        const failedTask = getTask(task.id);
        if (failedTask) notifyTaskFailed(failedTask).catch(() => {});
        notifyTerminalSession(task, 'failed', sessionId);
        reject(new Error(errMsg));
      }
    });

    child.on('error', (err: any) => {
      const stack = err?.stack || err?.message || String(err);
      console.error(`[task:shell] child error for ${task.id}:`, stack);
      appendLog(task.id, { type: 'system', subtype: 'error', content: `[child-error] ${stack}`, timestamp: new Date().toISOString() });
      updateTaskStatus(task.id, 'failed', err?.message || String(err));
      reject(err);
    });
  });
}

// ─── Terminal notification ────────────────────────────────────

/**
 * Notify tmux terminal sessions in the same project directory that a task completed.
 * Sends a visible bell character so the user knows to resume.
 */
function notifyTerminalSession(task: Task, status: 'done' | 'failed', sessionId?: string) {
  // Skip pipeline tasks — they have their own notification system
  try {
    const pipelineTaskIds = pipelineTaskIdsRef();
    if (pipelineTaskIds.has(task.id)) return;
  } catch {}

  try {
    // Forge runs tmux on its OWN socket (safe-exec chokepoint), so these must go
    // through tmux()/tmuxAllowFail — a raw `tmux` would hit the machine's default
    // server and never see Forge's mw-* sessions. argv form also drops the manual
    // shell-quoting (no shell to inject into). One list-sessions pulls name + pane
    // path + pane command in a single call, instead of an N+1 fan-out of two extra
    // display-message spawns per session on every task completion.
    const out = tmuxAllowFail(
      ['list-sessions', '-F', '#{session_name}|#{pane_current_path}|#{pane_current_command}'],
      { timeout: 3000 },
    ).stdout.trim();
    if (!out) return;

    for (const line of out.split('\n')) {
      // name is first, pane command last; the pane PATH (middle) is the only field
      // that could itself contain a '|', so rejoin the middle rather than a bare
      // 3-way destructure.
      const parts = line.split('|');
      const name = parts[0];
      const paneCmd = parts.length > 1 ? parts[parts.length - 1] : '';
      const cwd = parts.slice(1, -1).join('|');
      // Non-default instances name sessions `mw<md5(dataDir)[:6]>-…`, not `mw-…`,
      // so match the prefixed form too or staging terminals never get notified.
      if (!name || !/^mw[0-9a-f]*-/.test(name)) continue;
      try {
        // Match: same dir, parent dir, or child dir
        const match = cwd && (
          cwd === task.projectPath ||
          cwd.startsWith(task.projectPath + '/') ||
          task.projectPath.startsWith(cwd + '/')
        );
        if (!match) continue;

        if (status === 'done') {
          const summary = task.prompt.slice(0, 80).replace(/"/g, "'");
          const msg = `A background task just completed. Task: "${summary}". Please check git diff and continue.`;

          // If a process is running (claude/node), send as input
          if (paneCmd !== 'zsh' && paneCmd !== 'bash' && paneCmd !== 'fish') {
            tmux(['send-keys', '-t', name, '--', msg, 'Enter'], { timeout: 2000 });
          } else {
            tmux(['display-message', '-t', name, `✅ Task ${task.id} done — changes ready`], { timeout: 2000 });
          }
        } else {
          tmux(['display-message', '-t', name, `❌ Task ${task.id} failed`], { timeout: 2000 });
        }
      } catch {}
    }
  } catch {}
}

// ─── Helpers ─────────────────────────────────────────────────

/**
 * Resolve the claude binary path. `claude` is typically a symlink to a .js file,
 * which can't be spawned directly without a shell. We resolve to the real .js path
 * and run it with `node`.
 */
function resolveClaudePath(claudePath: string): { cmd: string; prefix: string[] } {
  try {
    // Try to find the real path
    let resolved = claudePath;
    try {
      const which = execSync(`which ${claudePath}`, { encoding: 'utf-8' }).trim();
      resolved = realpathSync(which);
    } catch {
      resolved = realpathSync(claudePath);
    }

    // If it's a .js file, run with node
    if (resolved.endsWith('.js') || resolved.endsWith('.mjs')) {
      return { cmd: process.execPath, prefix: [resolved] };
    }

    return { cmd: resolved, prefix: [] };
  } catch {
    // Fallback: use node to run it
    return { cmd: process.execPath, prefix: [claudePath] };
  }
}

function parseStreamJson(parsed: any): TaskLogEntry[] {
  const entries: TaskLogEntry[] = [];
  const ts = new Date().toISOString();

  if (parsed.type === 'system' && parsed.subtype === 'init') {
    entries.push({ type: 'system', subtype: 'init', content: `Model: ${parsed.model || 'unknown'}`, timestamp: ts });
    return entries;
  }

  if (parsed.type === 'assistant' && parsed.message?.content) {
    for (const block of parsed.message.content) {
      if (block.type === 'text' && block.text) {
        entries.push({ type: 'assistant', subtype: 'text', content: block.text, timestamp: ts });
      } else if (block.type === 'tool_use') {
        entries.push({
          type: 'assistant',
          subtype: 'tool_use',
          content: typeof block.input === 'string' ? block.input : JSON.stringify(block.input || {}),
          tool: block.name,
          timestamp: ts,
        });
      } else if (block.type === 'tool_result') {
        entries.push({
          type: 'assistant',
          subtype: 'tool_result',
          content: typeof block.content === 'string' ? block.content : JSON.stringify(block.content || ''),
          timestamp: ts,
        });
      }
    }
    return entries;
  }

  if (parsed.type === 'result') {
    entries.push({
      type: 'result',
      subtype: parsed.subtype || 'success',
      content: typeof parsed.result === 'string' ? parsed.result : JSON.stringify(parsed.result || ''),
      timestamp: ts,
    });
    return entries;
  }

  if (parsed.type === 'rate_limit_event') return entries;

  entries.push({ type: 'assistant', subtype: parsed.type || 'unknown', content: JSON.stringify(parsed), timestamp: ts });
  return entries;
}

function appendLog(taskId: string, entry: TaskLogEntry) {
  const row = db().prepare('SELECT log FROM tasks WHERE id = ?').get(taskId) as any;
  if (!row) return;
  const log: TaskLogEntry[] = JSON.parse(row.log);
  log.push(entry);
  db().prepare('UPDATE tasks SET log = ? WHERE id = ?').run(JSON.stringify(log), taskId);
  emit(taskId, 'log', entry);
}

function updateTaskStatus(id: string, status: TaskStatus, error?: string) {
  if (status === 'failed' || status === 'cancelled') {
    db().prepare('UPDATE tasks SET status = ?, error = ?, completed_at = datetime(\'now\') WHERE id = ?').run(status, error || null, id);
  } else {
    db().prepare('UPDATE tasks SET status = ? WHERE id = ?').run(status, id);
  }
  emit(id, 'status', status);
}

function rowToTask(row: any): Task {
  return {
    id: row.id,
    projectName: row.project_name,
    projectPath: row.project_path,
    prompt: row.prompt,
    mode: row.mode || 'prompt',
    status: row.status,
    priority: row.priority,
    conversationId: row.conversation_id || undefined,
    watchConfig: row.watch_config ? JSON.parse(row.watch_config) : undefined,
    log: JSON.parse(row.log || '[]'),
    resultSummary: row.result_summary || undefined,
    gitDiff: row.git_diff || undefined,
    gitBranch: row.git_branch || undefined,
    costUSD: row.cost_usd || undefined,
    error: row.error || undefined,
    createdAt: toIsoUTC(row.created_at) ?? row.created_at,
    startedAt: toIsoUTC(row.started_at) ?? undefined,
    completedAt: toIsoUTC(row.completed_at) ?? undefined,
    scheduledAt: toIsoUTC(row.scheduled_at) ?? undefined,
    agent: row.agent || undefined,
    backend: row.backend || undefined,
  } as Task;
}

// ─── Monitor task execution ──────────────────────────────────

import { getSessionFilePath, readSessionEntries, tailSessionFile, type SessionEntry } from './claude-sessions';

const activeMonitors = new Map<string, () => void>(); // taskId → cleanup fn

function startMonitorTask(task: Task) {
  if (!task.conversationId || !task.watchConfig) {
    updateTaskStatus(task.id, 'failed', 'Monitor task requires a session and watch config');
    return;
  }

  const config = task.watchConfig;
  const fp = getSessionFilePath(task.projectName, task.conversationId);
  if (!fp) {
    updateTaskStatus(task.id, 'failed', `Session file not found: ${task.conversationId}`);
    return;
  }

  console.log(`[monitor] Starting monitor ${task.id} for ${task.projectName}/${task.conversationId.slice(0, 8)} — condition: ${config.condition}, action: ${config.action}, file: ${fp}`);

  updateTaskStatus(task.id, 'running');
  appendLog(task.id, {
    type: 'system', subtype: 'init',
    content: `Monitoring session ${task.conversationId.slice(0, 12)} — condition: ${config.condition}, action: ${config.action}`,
    timestamp: new Date().toISOString(),
  });

  // Read initial state
  const initialEntries = readSessionEntries(fp);
  let lastEntryCount = initialEntries.length;
  let lastActivityTime = Date.now();

  // Idle check timer
  let idleTimer: ReturnType<typeof setInterval> | null = null;
  if (config.condition === 'idle') {
    const idleMs = (config.idleMinutes || 10) * 60_000;
    idleTimer = setInterval(() => {
      if (Date.now() - lastActivityTime > idleMs) {
        triggerMonitorAction(task, `Session idle for ${config.idleMinutes || 10} minutes`);
        if (!config.repeat) stopMonitor(task.id);
      }
    }, 30_000);
  }

  // Notification throttling: batch updates and send at most once per interval
  const notifyInterval = (config.notifyIntervalSeconds || 60) * 1000;
  let lastNotifyTime = 0;
  let pendingContext: string[] = [];
  let notifyTimer: ReturnType<typeof setTimeout> | null = null;

  function scheduleNotify(context: string, immediate?: boolean) {
    pendingContext.push(context);
    if (immediate) {
      flushNotify();
      return;
    }
    if (notifyTimer) return; // already scheduled
    const elapsed = Date.now() - lastNotifyTime;
    const delay = Math.max(0, notifyInterval - elapsed);
    notifyTimer = setTimeout(flushNotify, delay);
  }

  function flushNotify() {
    if (notifyTimer) { clearTimeout(notifyTimer); notifyTimer = null; }
    if (pendingContext.length === 0) return;
    const summary = pendingContext.length === 1
      ? pendingContext[0]
      : `${pendingContext.length} updates:\n\n${pendingContext.slice(-5).join('\n\n')}`;
    pendingContext = [];
    lastNotifyTime = Date.now();
    triggerMonitorAction(task, summary);
  }

  // Tail the file for changes (uses fs.watch + 5s polling fallback)
  const stopTail = tailSessionFile(fp, (newEntries) => {
    lastActivityTime = Date.now();
    lastEntryCount += newEntries.length;

    // Check conditions
    if (config.condition === 'change') {
      scheduleNotify(summarizeNewEntries(newEntries));
      if (!config.repeat) stopMonitor(task.id);
    }

    if (config.condition === 'keyword' && config.keyword) {
      const kw = config.keyword.toLowerCase();
      const matched = newEntries.find(e => e.content.toLowerCase().includes(kw));
      if (matched) {
        scheduleNotify(`Keyword "${config.keyword}" found: ${matched.content.slice(0, 200)}`, true);
        if (!config.repeat) stopMonitor(task.id);
      }
    }

    if (config.condition === 'error') {
      const errors = newEntries.filter(e =>
        e.type === 'system' && e.content.toLowerCase().includes('error')
      );
      if (errors.length > 0) {
        scheduleNotify(`Error detected: ${errors[0].content.slice(0, 200)}`, true);
        if (!config.repeat) stopMonitor(task.id);
      }
    }

    if (config.condition === 'complete') {
      // Check if last assistant entry looks like completion
      const lastAssistant = [...newEntries].reverse().find(e => e.type === 'assistant_text');
      if (lastAssistant) {
        // Heuristic: check if there are no more tool calls after the last text
        const lastIdx = newEntries.lastIndexOf(lastAssistant);
        const afterToolUse = newEntries.slice(lastIdx + 1).some(e => e.type === 'tool_use');
        if (!afterToolUse && newEntries.length > 2) {
          // Wait a bit to see if more entries come
          setTimeout(() => {
            if (Date.now() - lastActivityTime > 30_000) {
              scheduleNotify(`Session appears complete.\n\nLast: ${lastAssistant.content.slice(0, 300)}`, true);
              if (!config.repeat) stopMonitor(task.id);
            }
          }, 35_000);
        }
      }
    }
  }, (err) => {
    console.error(`[monitor] ${task.id} tail error:`, err.message);
    appendLog(task.id, {
      type: 'system', subtype: 'error',
      content: `File watch error: ${err.message}`,
      timestamp: new Date().toISOString(),
    });
  });

  const cleanup = () => {
    stopTail();
    if (idleTimer) clearInterval(idleTimer);
    flushNotify(); // send any remaining batched notifications
  };

  activeMonitors.set(task.id, cleanup);
}

function stopMonitor(taskId: string) {
  const cleanup = activeMonitors.get(taskId);
  if (cleanup) {
    cleanup();
    activeMonitors.delete(taskId);
  }
  updateTaskStatus(taskId, 'done');
}

// Also export for cancel
export function cancelMonitor(taskId: string) {
  stopMonitor(taskId);
  updateTaskStatus(taskId, 'cancelled');
}

async function triggerMonitorAction(task: Task, context: string) {
  const config = task.watchConfig!;

  appendLog(task.id, {
    type: 'system', subtype: 'text',
    content: `⚡ Triggered: ${context}`,
    timestamp: new Date().toISOString(),
  });

  if (config.action === 'notify') {
    // Send Telegram notification
    const settings = loadSettings();
    if (settings.telegramBotToken && settings.telegramChatId) {
      const msg = config.actionPrompt
        ? config.actionPrompt.replace('{{context}}', context)
        : `📋 Monitor: ${task.projectName}/${task.conversationId?.slice(0, 8)}\n\n${context}`;
      await sendTelegramDirect(settings.telegramBotToken, settings.telegramChatId, msg);
    }
  } else if (config.action === 'message' && config.actionPrompt && task.conversationId) {
    // Send a message to the session by creating a prompt task (will queue if project is busy)
    const newTask = createTask({
      projectName: task.projectName,
      projectPath: task.projectPath,
      prompt: config.actionPrompt,
      conversationId: task.conversationId,
    });
    const queued = runningProjects.has(task.projectName) ? ' (queued — project busy)' : '';
    appendLog(task.id, {
      type: 'system', subtype: 'text',
      content: `Created follow-up task ${newTask.id}${queued}: ${config.actionPrompt.slice(0, 100)}`,
      timestamp: new Date().toISOString(),
    });
  } else if (config.action === 'task' && config.actionPrompt) {
    const project = config.actionProject || task.projectName;
    createTask({
      projectName: project,
      projectPath: task.projectPath,
      prompt: config.actionPrompt,
    });
    appendLog(task.id, {
      type: 'system', subtype: 'text',
      content: `Created new task for ${project}: ${config.actionPrompt.slice(0, 100)}`,
      timestamp: new Date().toISOString(),
    });
  }
}

async function sendTelegramDirect(token: string, chatId: string, text: string) {
  try {
    const res = await fetch(`https://api.telegram.org/bot${token}/sendMessage`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ chat_id: chatId, text, disable_web_page_preview: true }),
    });
    if (!res.ok) {
      const body = await res.text();
      console.error(`[monitor] Telegram send failed: ${res.status} ${body}`);
    }
  } catch (err) {
    console.error('[monitor] Telegram send error:', err);
  }
}

function summarizeNewEntries(entries: SessionEntry[]): string {
  const parts: string[] = [];
  for (const e of entries) {
    if (e.type === 'user') parts.push(`👤 ${e.content.slice(0, 100)}`);
    else if (e.type === 'assistant_text') parts.push(`🤖 ${e.content.slice(0, 150)}`);
    else if (e.type === 'tool_use') parts.push(`🔧 ${e.toolName || 'tool'}`);
  }
  return parts.slice(0, 5).join('\n') || 'Activity detected';
}
