- Update the LLM prompt to instruct it to ignore RTT fluctuations below 10 seconds. - Update PROGRESS.md to reflect the completion of the task.
395 lines
15 KiB
Python
395 lines
15 KiB
Python
# LLM-Powered Monitoring Agent
|
|
|
|
import time
|
|
import json
|
|
import subprocess
|
|
import ollama
|
|
from discord_webhook import DiscordWebhook
|
|
import requests
|
|
import data_storage
|
|
import re
|
|
import os
|
|
from datetime import datetime, timezone
|
|
import pingparsing
|
|
import nmap
|
|
import logging
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
|
|
# Load configuration
|
|
import config
|
|
|
|
from syslog_rfc5424_parser import parser
|
|
|
|
# --- Logging Configuration ---
|
|
LOG_FILE = "monitoring_agent.log"
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Create a handler that rotates logs daily, keeping 1 backup
|
|
file_handler = TimedRotatingFileHandler(LOG_FILE, when="midnight", interval=1, backupCount=1)
|
|
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
|
|
# Create a handler for console output
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
|
|
logger.addHandler(file_handler)
|
|
logger.addHandler(console_handler)
|
|
|
|
|
|
LOG_POSITION_FILE = 'log_position.txt'
|
|
AUTH_LOG_POSITION_FILE = 'auth_log_position.txt'
|
|
|
|
# --- Data Ingestion & Parsing Functions ---
|
|
|
|
def get_system_logs():
|
|
"""Gets new lines from /var/log/syslog since the last check."""
|
|
try:
|
|
last_position = 0
|
|
if os.path.exists(LOG_POSITION_FILE):
|
|
with open(LOG_POSITION_FILE, 'r') as f:
|
|
last_position = int(f.read())
|
|
|
|
with open("/var/log/syslog", "r") as f:
|
|
f.seek(last_position)
|
|
log_lines = f.readlines()
|
|
current_position = f.tell()
|
|
|
|
with open(LOG_POSITION_FILE, 'w') as f:
|
|
f.write(str(current_position))
|
|
|
|
parsed_logs = []
|
|
for line in log_lines:
|
|
try:
|
|
parsed_logs.append(parser.parse(line).as_dict()) # type: ignore
|
|
except Exception:
|
|
# If parsing fails, just append the raw line
|
|
parsed_logs.append({"raw_log": line.strip()})
|
|
|
|
return {"syslog": parsed_logs}
|
|
except FileNotFoundError:
|
|
logger.error("/var/log/syslog not found.")
|
|
return {"syslog": []}
|
|
except Exception as e:
|
|
logger.error(f"Error reading syslog: {e}")
|
|
return {"syslog": []}
|
|
|
|
|
|
def get_network_metrics():
|
|
"""Gets network metrics by pinging 8.8.8.8."""
|
|
try:
|
|
ping_parser = pingparsing.PingParsing()
|
|
transmitter = pingparsing.PingTransmitter()
|
|
transmitter.destination = "8.8.8.8"
|
|
transmitter.count = 3
|
|
result = transmitter.ping()
|
|
return ping_parser.parse(result).as_dict()
|
|
except Exception as e:
|
|
logger.error(f"Error getting network metrics: {e}")
|
|
return {"error": "ping command failed"}
|
|
|
|
def get_sensor_data():
|
|
"""Gets all sensor data at once."""
|
|
try:
|
|
return subprocess.check_output(["sensors"], text=True)
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
logger.error("'sensors' command not found. Please install lm-sensors.")
|
|
return None
|
|
|
|
def get_cpu_temperature(sensors_output):
|
|
"""Gets the CPU temperature from the sensors output."""
|
|
if not sensors_output:
|
|
return {"cpu_temperature": "N/A"}
|
|
# Use regex to find the CPU temperature
|
|
match = re.search(r"Package id 0:\s+\+([\d\.]+)", sensors_output)
|
|
if match:
|
|
return {"cpu_temperature": float(match.group(1))}
|
|
else:
|
|
return {"cpu_temperature": "N/A"}
|
|
|
|
def get_gpu_temperature(sensors_output):
|
|
"""Gets the GPU temperature from the sensors output."""
|
|
if not sensors_output:
|
|
return {"gpu_temperature": "N/A"}
|
|
# Use regex to find the GPU temperature for amdgpu
|
|
match = re.search(r"edge:\s+\+([\d\.]+)", sensors_output)
|
|
if match:
|
|
return {"gpu_temperature": float(match.group(1))}
|
|
else:
|
|
# if amdgpu not found, try radeon
|
|
match = re.search(r"temp1:\s+\+([\d\.]+)", sensors_output)
|
|
if match:
|
|
return {"gpu_temperature": float(match.group(1))}
|
|
else:
|
|
return {"gpu_temperature": "N/A"}
|
|
|
|
|
|
def get_login_attempts():
|
|
"""Gets system login attempts from /var/log/auth.log since the last check."""
|
|
try:
|
|
last_position = 0
|
|
if os.path.exists(AUTH_LOG_POSITION_FILE):
|
|
with open(AUTH_LOG_POSITION_FILE, 'r') as f:
|
|
last_position = int(f.read())
|
|
|
|
with open("/var/log/auth.log", "r") as f:
|
|
f.seek(last_position)
|
|
log_lines = f.readlines()
|
|
current_position = f.tell()
|
|
|
|
with open(AUTH_LOG_POSITION_FILE, 'w') as f:
|
|
f.write(str(current_position))
|
|
|
|
failed_logins = []
|
|
for line in log_lines:
|
|
if "Failed password" in line:
|
|
failed_logins.append(line.strip())
|
|
|
|
return {"failed_login_attempts": failed_logins}
|
|
except FileNotFoundError:
|
|
logger.error("/var/log/auth.log not found.")
|
|
return {"failed_login_attempts": []}
|
|
except Exception as e:
|
|
logger.error(f"Error reading login attempts: {e}")
|
|
return {"failed_logins": []}
|
|
|
|
def get_nmap_scan_results():
|
|
"""Performs an Nmap scan and returns a structured summary."""
|
|
try:
|
|
nm = nmap.PortScanner()
|
|
scan_options = config.NMAP_SCAN_OPTIONS
|
|
if os.geteuid() != 0 and "-sS" in scan_options:
|
|
logger.warning("Nmap -sS scan requires root privileges. Falling back to -sT.")
|
|
scan_options = scan_options.replace("-sS", "-sT")
|
|
|
|
scan_results = nm.scan(hosts=config.NMAP_TARGETS, arguments=scan_options)
|
|
|
|
# Process the results into a more structured format
|
|
processed_results = {"hosts": []}
|
|
if "scan" in scan_results:
|
|
for host, scan_data in scan_results["scan"].items():
|
|
host_info = {
|
|
"ip": host,
|
|
"status": scan_data.get("status", {}).get("state", "unknown"),
|
|
"hostname": scan_data.get("hostnames", [{}])[0].get("name", ""),
|
|
"open_ports": []
|
|
}
|
|
if "tcp" in scan_data:
|
|
for port, port_data in scan_data["tcp"].items():
|
|
if port_data.get("state") == "open":
|
|
host_info["open_ports"].append({
|
|
"port": port,
|
|
"service": port_data.get("name", ""),
|
|
"product": port_data.get("product", ""),
|
|
"version": port_data.get("version", "")
|
|
})
|
|
processed_results["hosts"].append(host_info)
|
|
|
|
return processed_results
|
|
except Exception as e:
|
|
logger.error(f"Error performing Nmap scan: {e}")
|
|
return {"error": "Nmap scan failed"}
|
|
|
|
# --- LLM Interaction Function ---
|
|
|
|
def build_llm_prompt(data, baselines, nmap_changes, constraints, known_issues):
|
|
"""Builds the prompt for the LLM analysis."""
|
|
return f"""
|
|
**Role:** You are a dedicated and expert system administrator. Your primary role is to identify anomalies and provide concise, actionable reports.
|
|
|
|
**Instruction:** Analyze the following system and network data for any activity that appears out of place or different. Consider unusual values, errors, or unexpected patterns as anomalies. Compare the current data with the historical baseline data to identify significant deviations. Consult the known issues feed to avoid flagging resolved or expected issues. Pay special attention to the Nmap scan results for any new or unexpected open ports. Pay special attention to network RTT fluctuations, but only report them as an anomaly if the fluctuation is greater than 10 seconds.
|
|
|
|
**Context:**
|
|
Here is the system data in JSON format for your analysis: {json.dumps(data, indent=2)}
|
|
|
|
**Historical Baseline Data:**
|
|
{json.dumps(baselines, indent=2)}
|
|
|
|
**Nmap Scan Changes:**
|
|
{json.dumps(nmap_changes, indent=2)}
|
|
|
|
**Known Issues Feed:**
|
|
{json.dumps(known_issues, indent=2)}
|
|
|
|
**Constraints and Guidelines:**
|
|
{constraints}
|
|
|
|
**Output Request:** If you find an anomaly, provide a report as a single JSON object with two keys: "severity" and "reason". The "severity" must be one of "high", "medium", "low", or "none". The "reason" must be a natural language explanation of the anomaly. Please include specific values if the anomoly has them. If no anomaly is found, return a single JSON object with "severity" set to "none" and "reason" as an empty string. Do not wrap the JSON in markdown or any other formatting. Only return the JSON, and nothing else.
|
|
|
|
|
|
**Reasoning Hint:** Think step by step to come to your conclusion. This is very important.
|
|
"""
|
|
|
|
def analyze_data_with_llm(data, baselines):
|
|
"""Analyzes data with the local LLM."""
|
|
with open("CONSTRAINTS.md", "r") as f:
|
|
constraints = f.read()
|
|
|
|
with open("known_issues.json", "r") as f:
|
|
known_issues = json.load(f)
|
|
|
|
# Compare current nmap results with baseline
|
|
nmap_changes = {"new_hosts": [], "changed_ports": {}}
|
|
if "nmap_results" in data and "host_ports" in baselines:
|
|
current_hosts_info = {host['ip']: host for host in data["nmap_results"].get("hosts", [])}
|
|
current_hosts = set(current_hosts_info.keys())
|
|
baseline_hosts = set(baselines["host_ports"].keys())
|
|
|
|
# New hosts
|
|
nmap_changes["new_hosts"] = sorted(list(current_hosts - baseline_hosts))
|
|
|
|
# Changed ports on existing hosts
|
|
for host_ip in current_hosts.intersection(baseline_hosts):
|
|
current_ports = set(p['port'] for p in current_hosts_info[host_ip].get("open_ports", []))
|
|
|
|
baseline_ports = set(baselines["host_ports"].get(host_ip, []))
|
|
|
|
newly_opened = sorted(list(current_ports - baseline_ports))
|
|
newly_closed = sorted(list(baseline_ports - current_ports))
|
|
|
|
if newly_opened or newly_closed:
|
|
nmap_changes["changed_ports"][host_ip] = {"opened": newly_opened, "closed": newly_closed}
|
|
|
|
prompt = build_llm_prompt(data, baselines, nmap_changes, constraints, known_issues)
|
|
|
|
try:
|
|
response = ollama.generate(model="llama3.1:8b", prompt=prompt)
|
|
sanitized_response = response['response'].strip()
|
|
|
|
# Extract JSON from the response
|
|
try:
|
|
# Find the first '{' and the last '}' to extract the JSON object
|
|
start_index = sanitized_response.find('{')
|
|
end_index = sanitized_response.rfind('}')
|
|
if start_index != -1 and end_index != -1:
|
|
json_string = sanitized_response[start_index:end_index+1]
|
|
llm_response = json.loads(json_string)
|
|
logger.info(f"LLM Response: {llm_response}")
|
|
return llm_response
|
|
else:
|
|
# Handle cases where the response is not valid JSON
|
|
logger.warning(f"LLM returned a non-JSON response: {sanitized_response}")
|
|
return {"severity": "low", "reason": sanitized_response}
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Error decoding LLM response: {e}")
|
|
# Fallback for invalid JSON
|
|
return {"severity": "low", "reason": sanitized_response}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error interacting with LLM: {e}")
|
|
return None
|
|
|
|
|
|
# --- Alerting Functions ---
|
|
|
|
def send_discord_alert(message):
|
|
"""Sends an alert to Discord."""
|
|
webhook = DiscordWebhook(url=config.DISCORD_WEBHOOK_URL, content=message)
|
|
try:
|
|
response = webhook.execute()
|
|
if response.status_code == 200:
|
|
logger.info("Discord alert sent successfully.")
|
|
else:
|
|
logger.error(f"Error sending Discord alert: {response.status_code} - {response.content}")
|
|
except Exception as e:
|
|
logger.error(f"Error sending Discord alert: {e}")
|
|
|
|
def send_google_home_alert(message):
|
|
"""Sends an alert to a Google Home speaker via Home Assistant."""
|
|
# Simplify the message for better TTS delivery
|
|
try:
|
|
response = ollama.generate(model="llama3.1:8b", prompt=f"Summarize the following message in a single sentence: {message}")
|
|
simplified_message = response['response'].strip()
|
|
except Exception as e:
|
|
logger.error(f"Error summarizing message: {e}")
|
|
simplified_.message = message.split('.')[0] # Take the first sentence as a fallback
|
|
|
|
url = f"{config.HOME_ASSISTANT_URL}/api/services/tts/speak"
|
|
headers = {
|
|
"Authorization": f"Bearer {config.HOME_ASSISTANT_TOKEN}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
data = {
|
|
"entity_id": "all",
|
|
"media_player_entity_id": config.GOOGLE_HOME_SPEAKER_ID,
|
|
"message": simplified_message,
|
|
}
|
|
try:
|
|
response = requests.post(url, headers=headers, json=data)
|
|
if response.status_code == 200:
|
|
logger.info("Google Home alert sent successfully.")
|
|
else:
|
|
logger.error(f"Error sending Google Home alert: {response.status_code} - {response.text}")
|
|
except Exception as e:
|
|
logger.error(f"Error sending Google Home alert: {e}")
|
|
|
|
# --- Main Script Logic ---
|
|
|
|
def is_alerting_time():
|
|
"""Checks if the current time is within the alerting window (9am - 12am)."""
|
|
current_hour = datetime.now().hour
|
|
return 9 <= current_hour < 24
|
|
|
|
daily_events = []
|
|
|
|
def run_monitoring_cycle(nmap_scan_counter):
|
|
"""Runs a single monitoring cycle."""
|
|
logger.info("Running monitoring cycle...")
|
|
system_logs = get_system_logs()
|
|
network_metrics = get_network_metrics()
|
|
sensors_output = get_sensor_data()
|
|
cpu_temp = get_cpu_temperature(sensors_output)
|
|
gpu_temp = get_gpu_temperature(sensors_output)
|
|
login_attempts = get_login_attempts()
|
|
|
|
nmap_results = None
|
|
if nmap_scan_counter == 0:
|
|
nmap_results = get_nmap_scan_results()
|
|
|
|
nmap_scan_counter = (nmap_scan_counter + 1) % 4 # Run nmap scan every 4th cycle (20 minutes)
|
|
|
|
if system_logs and network_metrics:
|
|
combined_data = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"system_logs": system_logs,
|
|
"network_metrics": network_metrics,
|
|
"cpu_temperature": cpu_temp,
|
|
"gpu_temperature": gpu_temp,
|
|
"login_attempts": login_attempts
|
|
}
|
|
|
|
if nmap_results:
|
|
combined_data["nmap_results"] = nmap_results
|
|
|
|
data_storage.store_data(combined_data)
|
|
|
|
llm_response = analyze_data_with_llm(combined_data, data_storage.calculate_baselines())
|
|
|
|
if llm_response and llm_response.get('severity') != "none":
|
|
daily_events.append(llm_response.get('reason'))
|
|
if llm_response.get('severity') == "high" and is_alerting_time():
|
|
send_discord_alert(llm_response.get('reason'))
|
|
send_google_home_alert(llm_response.get('reason'))
|
|
return nmap_scan_counter
|
|
|
|
def main():
|
|
"""Main function to run the monitoring agent."""
|
|
if config.TEST_MODE:
|
|
logger.info("Running in test mode...")
|
|
run_monitoring_cycle(0)
|
|
else:
|
|
nmap_scan_counter = 0
|
|
while True:
|
|
nmap_scan_counter = run_monitoring_cycle(nmap_scan_counter)
|
|
|
|
# Daily Recap Logic
|
|
current_time = time.strftime("%H:%M")
|
|
if current_time == config.DAILY_RECAP_TIME and daily_events: # type: ignore
|
|
recap_message = "\n".join(daily_events)
|
|
send_discord_alert(f"**Daily Recap:**\n{recap_message}")
|
|
daily_events = [] # Reset for the next day
|
|
|
|
time.sleep(300) # Run every 5 minutes
|
|
|
|
if __name__ == "__main__":
|
|
main() |