Attempting to remove the LLM out of processing

This commit is contained in:
2025-08-23 19:03:40 -05:00
parent bebedb1e15
commit 6f7e99639c
4 changed files with 106 additions and 314 deletions

View File

@@ -192,73 +192,102 @@ def get_nmap_scan_results():
logger.error(f"Error performing Nmap scan: {e}")
return {"error": "Nmap scan failed"}
# --- LLM Interaction Function ---
# --- Data Analysis ---
def build_llm_prompt(data, baselines, nmap_changes, constraints, known_issues, port_applications):
"""Builds the prompt for the LLM analysis."""
return f"""
**Role:** You are a dedicated and expert system administrator. Your primary role is to identify anomalies and provide concise, actionable reports.
def analyze_data_locally(data, baselines, known_issues, port_applications):
"""Analyzes the collected data to find anomalies without using an LLM."""
anomalies = []
**Instruction:** Analyze the following system and network data for any activity that appears out of place or different. Consider unusual values, errors, or unexpected patterns as anomalies. Compare the current data with the historical baseline data to identify significant deviations. Consult the known issues feed to avoid flagging resolved or expected issues. Pay special attention to the Nmap scan results for any new or unexpected open ports. Pay special attention to network RTT fluctuations, but only report them as an anomaly if the fluctuation is greater than 10 seconds. Similarly, only report temperature fluctuations if the difference is greater than 5 degrees.
# Temperature checks
cpu_temp = data.get("cpu_temperature", {}).get("cpu_temperature")
gpu_temp = data.get("gpu_temperature", {}).get("gpu_temperature")
baseline_cpu_temp = baselines.get("average_cpu_temperature")
baseline_gpu_temp = baselines.get("average_gpu_temperature")
**Context:**
Here is the system data in JSON format for your analysis: {json.dumps(data, indent=2)}
if isinstance(cpu_temp, (int, float)) and isinstance(baseline_cpu_temp, (int, float)):
if abs(cpu_temp - baseline_cpu_temp) > 5:
anomalies.append({
"severity": "medium",
"reason": f"CPU temperature deviation detected. Current: {cpu_temp}°C, Baseline: {baseline_cpu_temp}°C"
})
**Historical Baseline Data:**
{json.dumps(baselines, indent=2)}
if isinstance(gpu_temp, (int, float)) and isinstance(baseline_gpu_temp, (int, float)):
if abs(gpu_temp - baseline_gpu_temp) > 5:
anomalies.append({
"severity": "medium",
"reason": f"GPU temperature deviation detected. Current: {gpu_temp}°C, Baseline: {baseline_gpu_temp}°C"
})
**Nmap Scan Changes:**
{json.dumps(nmap_changes, indent=2)}
# Network RTT check
current_rtt = data.get("network_metrics", {}).get("rtt_avg")
baseline_rtt = baselines.get("average_rtt_avg")
**Known Issues Feed:**
{json.dumps(known_issues, indent=2)}
if isinstance(current_rtt, (int, float)) and isinstance(baseline_rtt, (int, float)):
if abs(current_rtt - baseline_rtt) > 10000:
anomalies.append({
"severity": "high",
"reason": f"High network RTT fluctuation detected. Current: {current_rtt}ms, Baseline: {baseline_rtt}ms"
})
**Known Port Applications:**
{json.dumps(port_applications, indent=2)}
# Failed login attempts check
failed_logins = data.get("login_attempts", {}).get("failed_login_attempts")
if failed_logins:
anomalies.append({
"severity": "high",
"reason": f"{len(failed_logins)} failed login attempts detected."
})
**Constraints and Guidelines:**
{constraints}
**Output Request:** If you find an anomaly, provide a report as a single JSON object with two keys: "severity" and "reason". The "severity" must be one of "high", "medium", "low", or "none". The "reason" must be a natural language explanation of the anomaly. Please include specific values if the anomoly has them. If no anomaly is found, return a single JSON object with "severity" set to "none" and "reason" as an empty string. Do not wrap the JSON in markdown or any other formatting. Only return the JSON, and nothing else.
**Reasoning Hint:** Think step by step to come to your conclusion. This is very important.
"""
def analyze_data_with_llm(data, baselines):
"""Analyzes data with the local LLM."""
with open("CONSTRAINTS.md", "r") as f:
constraints = f.read()
with open("known_issues.json", "r") as f:
known_issues = json.load(f)
with open("port_applications.json", "r") as f:
port_applications = json.load(f)
# Compare current nmap results with baseline
nmap_changes = {"new_hosts": [], "changed_ports": {}}
# Nmap scan changes check
if "nmap_results" in data and "host_ports" in baselines:
current_hosts_info = {host['ip']: host for host in data["nmap_results"].get("hosts", [])}
current_hosts = set(current_hosts_info.keys())
baseline_hosts = set(baselines["host_ports"].keys())
# New hosts
nmap_changes["new_hosts"] = sorted(list(current_hosts - baseline_hosts))
new_hosts = sorted(list(current_hosts - baseline_hosts))
for host in new_hosts:
anomalies.append({
"severity": "high",
"reason": f"New host detected on the network: {host}"
})
# Changed ports on existing hosts
for host_ip in current_hosts.intersection(baseline_hosts):
current_ports = set(p['port'] for p in current_hosts_info[host_ip].get("open_ports", []))
baseline_ports = set(baselines["host_ports"].get(host_ip, []))
newly_opened = sorted(list(current_ports - baseline_ports))
newly_closed = sorted(list(baseline_ports - current_ports))
for port in newly_opened:
port_info = port_applications.get(str(port), "Unknown")
anomalies.append({
"severity": "medium",
"reason": f"New port opened on {host_ip}: {port} ({port_info})"
})
if newly_opened or newly_closed:
nmap_changes["changed_ports"][host_ip] = {"opened": newly_opened, "closed": newly_closed}
return anomalies
prompt = build_llm_prompt(data, baselines, nmap_changes, constraints, known_issues, port_applications)
# --- LLM Interaction Function ---
def build_llm_prompt(anomalies):
"""Builds the prompt for the LLM to generate a report from anomalies."""
return f"""
**Role:** You are a dedicated and expert system administrator. Your primary role is to provide a concise, actionable report based on a list of pre-identified anomalies.
**Instruction:** Please synthesize the following list of anomalies into a single, human-readable report. The report should be a single JSON object with two keys: "severity" and "reason". The "severity" should be the highest severity from the list of anomalies. The "reason" should be a summary of all the anomalies.
**Anomalies:**
{json.dumps(anomalies, indent=2)}
**Output Request:** Provide a report as a single JSON object with two keys: "severity" and "reason". The "severity" must be one of "high", "medium", "low", or "none". The "reason" must be a natural language explanation of the anomaly. If no anomaly is found, return a single JSON object with "severity" set to "none" and "reason" as an empty string. Do not wrap the JSON in markdown or any other formatting. Only return the JSON, and nothing else.
"""
def generate_llm_report(anomalies):
"""Generates a report from a list of anomalies using the local LLM."""
if not anomalies:
return {"severity": "none", "reason": ""}
prompt = build_llm_prompt(anomalies)
try:
response = ollama.generate(model="llama3.1:8b", prompt=prompt)
@@ -391,13 +420,22 @@ def run_monitoring_cycle(nmap_scan_counter):
data_storage.store_data(combined_data)
llm_response = analyze_data_with_llm(combined_data, data_storage.calculate_baselines())
with open("known_issues.json", "r") as f:
known_issues = json.load(f)
if llm_response and llm_response.get('severity') != "none":
daily_events.append(llm_response.get('reason'))
if llm_response.get('severity') == "high" and is_alerting_time():
send_discord_alert(llm_response, combined_data)
send_google_home_alert(llm_response.get('reason'))
with open("port_applications.json", "r") as f:
port_applications = json.load(f)
baselines = data_storage.calculate_baselines()
anomalies = analyze_data_locally(combined_data, baselines, known_issues, port_applications)
if anomalies:
llm_response = generate_llm_report(anomalies)
if llm_response and llm_response.get('severity') != "none":
daily_events.append(llm_response.get('reason'))
if llm_response.get('severity') == "high" and is_alerting_time():
send_discord_alert(llm_response, combined_data)
send_google_home_alert(llm_response.get('reason'))
return nmap_scan_counter
def main():
@@ -414,4 +452,4 @@ def main():
time.sleep(300) # Run every 5 minutes
if __name__ == "__main__":
main()
main()