All files / src agent.ts

91.17% Statements 62/68
69.23% Branches 9/13
77.77% Functions 7/9
93.84% Lines 61/65

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 1406x   6x 6x 6x     6x 3x 3x 3x   3x 3x 3x                   3x 3x 2x     2x 2x       2x   2x 2x 2x     2x             2x       3x 3x 3x   3x 1x 1x 1x                   6x 6x 6x 6x   6x 6x 6x         6x 6x 12x     12x 12x     12x         113x     113x 113x 224x 224x 223x 1x     1x     1x         112x       112x 112x     112x 112x 10x         1x       1x 1x 2x        
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
import { SeraphConfig } from './config';
import { AlerterClient } from './alerter';
import { createLLMProvider } from './llm';
import { metrics } from './metrics';
 
// This code runs if we are in a worker thread
if (!isMainThread) {
  const { config } = workerData;
  const alerterClient = new AlerterClient(config);
  const provider = createLLMProvider(config);
 
  const analyzeLog = async (log: string) => {
    const end = metrics.llmAnalysisLatency.startTimer({ provider: config.llm?.provider, model: config.llm?.model });
    const prompt = `
    Analyze the following log entry and determine if it requires an alert.
    Respond with a JSON object with two fields: "decision" and "reason".
    The "decision" field should be either "alert" or "ok".
    The "reason" field should be a short explanation of the decision.
 
    Log entry:
    ${log}
    `;
 
    try {
      let text = await provider.generate(prompt);
      end();
      
      // Clean the response to ensure it's valid JSON
      const jsonMatch = text.match(/```json\n([\s\S]*?)\n```/);
      Iif (jsonMatch && jsonMatch[1]) {
        text = jsonMatch[1];
      }
 
      return JSON.parse(text);
    } catch (error: any) {
      end();
      metrics.analysisErrors.inc();
      console.error(`[Worker ${process.pid}] Error analyzing log:`, error.message);
      
      // Record the analysis error as a mitigation event
      alerterClient.sendAlert({
        source: 'log_analysis_error',
        type: 'analysis_failed',
        details: error.message,
        log: log,
      });
 
      return { decision: 'error', reason: 'Error analyzing log' };
    }
  };
 
  parentPort?.on('message', async (log: string) => {
    console.log(`[Worker ${process.pid}] Received log:`, log.substring(0, 100) + '...');
    const analysis = await analyzeLog(log);
 
    if (analysis.decision === 'alert') {
      metrics.alertsTriggered.inc({ provider: config.llm?.provider, model: config.llm?.model });
      console.log(`[Worker ${process.pid}] Anomaly detected! Reason: ${analysis.reason}`);
      alerterClient.sendAlert({
        source: 'log_analysis',
        type: 'anomaly_detected',
        details: analysis.reason,
        log: log,
      });
    }
  });
}
 
export class AgentManager {
  private workers: Worker[] = [];
  private nextWorker = 0;
  private recentLogs: string[] = [];
 
  constructor(private config: SeraphConfig) {
    if (isMainThread) {
      this.initWorkers();
    }
  }
 
  private initWorkers() {
    metrics.activeWorkers.set(this.config.workers);
    for (let i = 0; i < this.config.workers; i++) {
      const worker = new Worker(__filename, {
        workerData: { config: this.config },
      });
      worker.on('error', (err) => console.error(`Worker error:`, err));
      worker.on('exit', (code) => {
        Iif (code !== 0) console.error(`Worker stopped with exit code ${code}`);
      });
      this.workers.push(worker);
    }
  }
 
  public dispatch(log: string) {
    metrics.logsProcessed.inc();
 
    // Pre-filtering logic
    if (this.config.preFilters && this.config.preFilters.length > 0) {
      for (const filter of this.config.preFilters) {
        try {
          const regex = new RegExp(filter);
          if (regex.test(log)) {
            metrics.logsSkipped.inc();
            // Optionally log that a log was skipped
            // console.log(`[AgentManager] Log skipped by filter: ${filter}`);
            return; // Skip sending this log to a worker
          }
        } catch (error: any) {
          console.error(`[AgentManager] Invalid regex in preFilters: ${filter}`, error.message);
        }
      }
    }
 
    Iif (this.workers.length === 0) {
      console.error("No workers available to process logs.");
      return;
    }
    this.workers[this.nextWorker].postMessage(log);
    this.nextWorker = (this.nextWorker + 1) % this.workers.length;
 
    // Store the log
    this.recentLogs.push(log);
    if (this.recentLogs.length > 100) {
      this.recentLogs.shift();
    }
  }
 
  public getRecentLogs(): string[] {
    return this.recentLogs;
  }
 
  public shutdown() {
    console.log("Shutting down all workers...");
    for (const worker of this.workers) {
      worker.terminate();
    }
  }
}