Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | 6x 6x 6x 6x 6x 3x 3x 3x 3x 3x 3x 3x 3x 2x 2x 2x 2x 2x 2x 2x 2x 2x 3x 3x 3x 3x 1x 1x 1x 6x 6x 6x 6x 6x 6x 6x 6x 6x 12x 12x 12x 12x 113x 113x 113x 224x 224x 223x 1x 1x 1x 112x 112x 112x 112x 112x 10x 1x 1x 1x 2x | import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
import { SeraphConfig } from './config';
import { AlerterClient } from './alerter';
import { createLLMProvider } from './llm';
import { metrics } from './metrics';
// This code runs if we are in a worker thread
if (!isMainThread) {
const { config } = workerData;
const alerterClient = new AlerterClient(config);
const provider = createLLMProvider(config);
const analyzeLog = async (log: string) => {
const end = metrics.llmAnalysisLatency.startTimer({ provider: config.llm?.provider, model: config.llm?.model });
const prompt = `
Analyze the following log entry and determine if it requires an alert.
Respond with a JSON object with two fields: "decision" and "reason".
The "decision" field should be either "alert" or "ok".
The "reason" field should be a short explanation of the decision.
Log entry:
${log}
`;
try {
let text = await provider.generate(prompt);
end();
// Clean the response to ensure it's valid JSON
const jsonMatch = text.match(/```json\n([\s\S]*?)\n```/);
Iif (jsonMatch && jsonMatch[1]) {
text = jsonMatch[1];
}
return JSON.parse(text);
} catch (error: any) {
end();
metrics.analysisErrors.inc();
console.error(`[Worker ${process.pid}] Error analyzing log:`, error.message);
// Record the analysis error as a mitigation event
alerterClient.sendAlert({
source: 'log_analysis_error',
type: 'analysis_failed',
details: error.message,
log: log,
});
return { decision: 'error', reason: 'Error analyzing log' };
}
};
parentPort?.on('message', async (log: string) => {
console.log(`[Worker ${process.pid}] Received log:`, log.substring(0, 100) + '...');
const analysis = await analyzeLog(log);
if (analysis.decision === 'alert') {
metrics.alertsTriggered.inc({ provider: config.llm?.provider, model: config.llm?.model });
console.log(`[Worker ${process.pid}] Anomaly detected! Reason: ${analysis.reason}`);
alerterClient.sendAlert({
source: 'log_analysis',
type: 'anomaly_detected',
details: analysis.reason,
log: log,
});
}
});
}
export class AgentManager {
private workers: Worker[] = [];
private nextWorker = 0;
private recentLogs: string[] = [];
constructor(private config: SeraphConfig) {
if (isMainThread) {
this.initWorkers();
}
}
private initWorkers() {
metrics.activeWorkers.set(this.config.workers);
for (let i = 0; i < this.config.workers; i++) {
const worker = new Worker(__filename, {
workerData: { config: this.config },
});
worker.on('error', (err) => console.error(`Worker error:`, err));
worker.on('exit', (code) => {
Iif (code !== 0) console.error(`Worker stopped with exit code ${code}`);
});
this.workers.push(worker);
}
}
public dispatch(log: string) {
metrics.logsProcessed.inc();
// Pre-filtering logic
if (this.config.preFilters && this.config.preFilters.length > 0) {
for (const filter of this.config.preFilters) {
try {
const regex = new RegExp(filter);
if (regex.test(log)) {
metrics.logsSkipped.inc();
// Optionally log that a log was skipped
// console.log(`[AgentManager] Log skipped by filter: ${filter}`);
return; // Skip sending this log to a worker
}
} catch (error: any) {
console.error(`[AgentManager] Invalid regex in preFilters: ${filter}`, error.message);
}
}
}
Iif (this.workers.length === 0) {
console.error("No workers available to process logs.");
return;
}
this.workers[this.nextWorker].postMessage(log);
this.nextWorker = (this.nextWorker + 1) % this.workers.length;
// Store the log
this.recentLogs.push(log);
if (this.recentLogs.length > 100) {
this.recentLogs.shift();
}
}
public getRecentLogs(): string[] {
return this.recentLogs;
}
public shutdown() {
console.log("Shutting down all workers...");
for (const worker of this.workers) {
worker.terminate();
}
}
}
|