Nmap... working?

This commit is contained in:
2025-08-20 12:51:11 -05:00
parent f6cbe1da8f
commit dd673829d2
6 changed files with 110 additions and 21 deletions

View File

@@ -11,6 +11,7 @@ import re
import os
from datetime import datetime, timezone
import pingparsing
import nmap
# Load configuration
import config
@@ -40,7 +41,7 @@ def get_system_logs():
parsed_logs = []
for line in log_lines:
try:
parsed_logs.append(parser.parse(line).as_dict())
parsed_logs.append(parser.parse(line).as_dict()) # type: ignore
except Exception:
# If parsing fails, just append the raw line
parsed_logs.append({"raw_log": line.strip()})
@@ -120,6 +121,21 @@ def get_login_attempts():
print(f"Error reading login attempts: {e}")
return {"failed_logins": []}
def get_nmap_scan_results():
"""Performs an Nmap scan and returns the results."""
try:
nm = nmap.PortScanner()
scan_options = config.NMAP_SCAN_OPTIONS
if os.geteuid() != 0 and "-sS" in scan_options:
print("Warning: Nmap -sS scan requires root privileges. Falling back to -sT.")
scan_options = scan_options.replace("-sS", "-sT")
scan_results = nm.scan(hosts=config.NMAP_TARGETS, arguments=scan_options)
return scan_results
except Exception as e:
print(f"Error performing Nmap scan: {e}")
return {"error": "Nmap scan failed"}
# --- LLM Interaction Function ---
def analyze_data_with_llm(data, baselines):
@@ -130,10 +146,35 @@ def analyze_data_with_llm(data, baselines):
with open("known_issues.json", "r") as f:
known_issues = json.load(f)
# Compare current nmap results with baseline
nmap_changes = {"new_hosts": [], "changed_ports": {}}
if "nmap_results" in data and "host_ports" in baselines:
current_hosts = set(data["nmap_results"].get("scan", {}).keys())
baseline_hosts = set(baselines["host_ports"].keys())
# New hosts
nmap_changes["new_hosts"] = sorted(list(current_hosts - baseline_hosts))
# Changed ports on existing hosts
for host in current_hosts.intersection(baseline_hosts):
current_ports = set()
if "tcp" in data["nmap_results"]["scan"][host]:
for port, port_data in data["nmap_results"]["scan"][host]["tcp"].items():
if port_data["state"] == "open":
current_ports.add(port)
baseline_ports = set(baselines["host_ports"].get(host, []))
newly_opened = sorted(list(current_ports - baseline_ports))
newly_closed = sorted(list(baseline_ports - current_ports))
if newly_opened or newly_closed:
nmap_changes["changed_ports"][host] = {"opened": newly_opened, "closed": newly_closed}
prompt = f"""
**Role:** You are a dedicated and expert system administrator. Your primary role is to identify anomalies and provide concise, actionable reports.
**Instruction:** Analyze the following system and network data for any activity that appears out of place or different. Consider unusual values, errors, or unexpected patterns as anomalies. Compare the current data with the historical baseline data to identify significant deviations. Consult the known issues feed to avoid flagging resolved or expected issues.
**Instruction:** Analyze the following system and network data for any activity that appears out of place or different. Consider unusual values, errors, or unexpected patterns as anomalies. Compare the current data with the historical baseline data to identify significant deviations. Consult the known issues feed to avoid flagging resolved or expected issues. Pay special attention to the Nmap scan results for any new or unexpected open ports.
**Context:**
Here is the system data in JSON format for your analysis: {json.dumps(data, indent=2)}
@@ -141,6 +182,9 @@ def analyze_data_with_llm(data, baselines):
**Historical Baseline Data:**
{json.dumps(baselines, indent=2)}
**Nmap Scan Changes:**
{json.dumps(nmap_changes, indent=2)}
**Known Issues Feed:**
{json.dumps(known_issues, indent=2)}
@@ -171,11 +215,11 @@ def analyze_data_with_llm(data, baselines):
except json.JSONDecodeError as e:
print(f"Error decoding LLM response: {e}")
# Fallback for invalid JSON
return {"severity": "low", "reason": response['response'].strip()}
return {{"severity": "low", "reason": response['response'].strip()}} # type: ignore
else:
# Handle cases where the response is not valid JSON
print(f"LLM returned a non-JSON response: {sanitized_response}")
return {"severity": "low", "reason": sanitized_response}
return {{"severity": "low", "reason": sanitized_response}} # type: ignore
except Exception as e:
print(f"Error interacting with LLM: {e}")
return None
@@ -236,6 +280,7 @@ if __name__ == "__main__":
cpu_temp = get_cpu_temperature()
gpu_temp = get_gpu_temperature()
login_attempts = get_login_attempts()
nmap_results = get_nmap_scan_results()
if system_logs and network_metrics:
combined_data = {
@@ -244,7 +289,8 @@ if __name__ == "__main__":
"network_metrics": network_metrics,
"cpu_temperature": cpu_temp,
"gpu_temperature": gpu_temp,
"login_attempts": login_attempts
"login_attempts": login_attempts,
"nmap_results": nmap_results
}
data_storage.store_data(combined_data)
@@ -258,6 +304,7 @@ if __name__ == "__main__":
else:
print("No anomaly detected.")
else:
nmap_scan_counter = 0
while True:
print("Running monitoring cycle...")
system_logs = get_system_logs()
@@ -266,6 +313,12 @@ if __name__ == "__main__":
gpu_temp = get_gpu_temperature()
login_attempts = get_login_attempts()
nmap_results = None
if nmap_scan_counter == 0:
nmap_results = get_nmap_scan_results()
nmap_scan_counter = (nmap_scan_counter + 1) % 4 # Run nmap scan every 4th cycle (20 minutes)
if system_logs and network_metrics:
combined_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
@@ -276,6 +329,9 @@ if __name__ == "__main__":
"login_attempts": login_attempts
}
if nmap_results:
combined_data["nmap_results"] = nmap_results
data_storage.store_data(combined_data)
llm_response = analyze_data_with_llm(combined_data, data_storage.calculate_baselines())
@@ -296,3 +352,4 @@ if __name__ == "__main__":
time.sleep(300) # Run every 5 minutes