Files
LLM-Powered-Monitoring-Agent/monitor_agent.py
Spencer e64b880c97 feat: Improve daily recap functionality
- Changed the daily recap time to 22:00.
- Modified the `send_daily_recap` function to split the recap message into multiple messages if it exceeds 2000 characters to avoid hitting the Discord message length limit.
- Added a 1-second delay between each message chunk to avoid rate limiting.
2025-09-15 13:27:40 -05:00

550 lines
21 KiB
Python
Executable File

# LLM-Powered Monitoring Agent
import time
import json
import subprocess
import ollama
from discord_webhook import DiscordWebhook
import requests
import database as data_storage
import re
import os
from datetime import datetime, timezone
import pingparsing
import nmap
import logging
from logging.handlers import TimedRotatingFileHandler
import docker
import schedule
# Load configuration
import config
from syslog_rfc5424_parser import parser
# --- Logging Configuration ---
LOG_FILE = "./tmp/monitoring_agent.log"
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Create a handler that rotates logs daily, keeping 1 backup
file_handler = TimedRotatingFileHandler(LOG_FILE, when="midnight", interval=1, backupCount=1)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
# Create a handler for console output
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
logger.addHandler(console_handler)
LOG_POSITION_FILE = 'log_position.txt'
AUTH_LOG_POSITION_FILE = 'auth_log_position.txt'
UFW_LOG_POSITION_FILE = 'ufw_log_position.txt'
# --- Data Ingestion & Parsing Functions ---
def get_ufw_logs():
"""Gets new lines from /var/log/ufw.log since the last check."""
try:
last_position = 0
if os.path.exists(UFW_LOG_POSITION_FILE):
with open(UFW_LOG_POSITION_FILE, 'r') as f:
last_position = int(f.read())
with open("/var/log/ufw.log", "r") as f:
f.seek(last_position)
log_lines = f.readlines()
current_position = f.tell()
with open(UFW_LOG_POSITION_FILE, 'w') as f:
f.write(str(current_position))
return log_lines
except FileNotFoundError:
logger.error("/var/log/ufw.log not found.")
return []
except Exception as e:
logger.error(f"Error reading ufw.log: {e}")
return []
def get_system_logs():
"""Gets new lines from /var/log/syslog since the last check."""
try:
last_position = 0
if os.path.exists(LOG_POSITION_FILE):
with open(LOG_POSITION_FILE, 'r') as f:
last_position = int(f.read())
with open("/var/log/syslog", "r") as f:
f.seek(last_position)
log_lines = f.readlines()
current_position = f.tell()
with open(LOG_POSITION_FILE, 'w') as f:
f.write(str(current_position))
parsed_logs = []
for line in log_lines:
try:
parsed_logs.append(parser.parse(line).as_dict()) # type: ignore
except Exception:
# If parsing fails, just append the raw line
parsed_logs.append({"raw_log": line.strip()})
return {"syslog": parsed_logs}
except FileNotFoundError:
logger.error("/var/log/syslog not found.")
return {"syslog": []}
except Exception as e:
logger.error(f"Error reading syslog: {e}")
return {"syslog": []}
def get_network_metrics():
"""Gets network metrics by pinging 8.8.8.8."""
try:
ping_parser = pingparsing.PingParsing()
transmitter = pingparsing.PingTransmitter()
transmitter.destination = "8.8.8.8"
transmitter.count = 3
result = transmitter.ping()
return ping_parser.parse(result).as_dict()
except Exception as e:
logger.error(f"Error getting network metrics: {e}")
return {"error": "ping command failed"}
def get_sensor_data():
"""Gets all sensor data at once."""
try:
return subprocess.check_output(["sensors"], text=True)
except (subprocess.CalledProcessError, FileNotFoundError):
logger.error("'sensors' command not found. Please install lm-sensors.")
return None
def get_cpu_temperature(sensors_output):
"""Gets the CPU temperature from the sensors output."""
if not sensors_output:
return {"cpu_temperature": "N/A"}
# Use regex to find the CPU temperature
match = re.search(r"Package id 0:\s+\+([\d\.]+)", sensors_output)
if match:
return {"cpu_temperature": float(match.group(1))}
else:
return {"cpu_temperature": "N/A"}
def get_gpu_temperature(sensors_output):
"""Gets the GPU temperature from the sensors output."""
if not sensors_output:
return {"gpu_temperature": "N/A"}
# Use regex to find the GPU temperature for amdgpu
match = re.search(r"edge:\s+\+([\d\.]+)", sensors_output)
if match:
return {"gpu_temperature": float(match.group(1))}
else:
# if amdgpu not found, try radeon
match = re.search(r"temp1:\s+\+([\d\.]+)", sensors_output)
if match:
return {"gpu_temperature": float(match.group(1))}
else:
return {"gpu_temperature": "N/A"}
def get_login_attempts():
"""Gets system login attempts from /var/log/auth.log since the last check."""
try:
last_position = 0
if os.path.exists(AUTH_LOG_POSITION_FILE):
with open(AUTH_LOG_POSITION_FILE, 'r') as f:
last_position = int(f.read())
with open("/var/log/auth.log", "r") as f:
f.seek(last_position)
log_lines = f.readlines()
current_position = f.tell()
with open(AUTH_LOG_POSITION_FILE, 'w') as f:
f.write(str(current_position))
failed_logins = []
for line in log_lines:
if "Failed password" in line:
failed_logins.append(line.strip())
return {"failed_login_attempts": failed_logins}
except FileNotFoundError:
logger.error("/var/log/auth.log not found.")
return {"failed_login_attempts": []}
except Exception as e:
logger.error(f"Error reading login attempts: {e}")
return {"failed_logins": []}
def get_nmap_scan_results():
"""Performs an Nmap scan and returns a structured summary."""
try:
nm = nmap.PortScanner()
scan_options = config.NMAP_SCAN_OPTIONS
if os.geteuid() != 0 and "-sS" in scan_options:
logger.warning("Nmap -sS scan requires root privileges. Falling back to -sT.")
scan_options = scan_options.replace("-sS", "-sT")
scan_results = nm.scan(hosts=config.NMAP_TARGETS, arguments=scan_options)
# Process the results into a more structured format
processed_results = {"hosts": []}
if "scan" in scan_results:
for host, scan_data in scan_results["scan"].items():
host_info = {
"ip": host,
"status": scan_data.get("status", {}).get("state", "unknown"),
"hostname": scan_data.get("hostnames", [{}])[0].get("name", ""),
"open_ports": []
}
if "tcp" in scan_data:
for port, port_data in scan_data["tcp"].items():
if port_data.get("state") == "open":
host_info["open_ports"].append({
"port": port,
"service": port_data.get("name", ""),
"product": port_data.get("product", ""),
"version": port_data.get("version", "")
})
processed_results["hosts"].append(host_info)
return processed_results
except Exception as e:
logger.error(f"Error performing Nmap scan: {e}")
return {"error": "Nmap scan failed"}
def get_docker_container_status():
"""Gets the status of configured Docker containers."""
if not config.DOCKER_CONTAINERS_TO_MONITOR:
return {"docker_container_status": {}}
try:
client = docker.from_env()
containers = client.containers.list(all=True)
status = {}
for container in containers:
if container.name in config.DOCKER_CONTAINERS_TO_MONITOR:
status[container.name] = container.status
return {"docker_container_status": status}
except Exception as e:
logger.error(f"Error getting Docker container status: {e}")
return {"docker_container_status": {}}
# --- Data Analysis ---
def analyze_data_locally(data, baselines, known_issues, port_applications):
"""Analyzes the collected data to find anomalies without using an LLM."""
anomalies = []
# Temperature checks
cpu_temp = data.get("cpu_temperature", {}).get("cpu_temperature")
gpu_temp = data.get("gpu_temperature", {}).get("gpu_temperature")
baseline_cpu_temp = baselines.get("average_cpu_temperature")
baseline_gpu_temp = baselines.get("average_gpu_temperature")
if isinstance(cpu_temp, (int, float)) and isinstance(baseline_cpu_temp, (int, float)):
if abs(cpu_temp - baseline_cpu_temp) > 5:
anomalies.append({
"severity": "medium",
"reason": f"CPU temperature deviation detected. Current: {cpu_temp}°C, Baseline: {baseline_cpu_temp}°C"
})
if isinstance(gpu_temp, (int, float)) and isinstance(baseline_gpu_temp, (int, float)):
if abs(gpu_temp - baseline_gpu_temp) > 5:
anomalies.append({
"severity": "medium",
"reason": f"GPU temperature deviation detected. Current: {gpu_temp}°C, Baseline: {baseline_gpu_temp}°C"
})
# Network RTT check
current_rtt = data.get("network_metrics", {}).get("rtt_avg")
baseline_rtt = baselines.get("average_rtt_avg")
if isinstance(current_rtt, (int, float)) and isinstance(baseline_rtt, (int, float)):
if abs(current_rtt - baseline_rtt) > 10000:
anomalies.append({
"severity": "high",
"reason": f"High network RTT fluctuation detected. Current: {current_rtt}ms, Baseline: {baseline_rtt}ms"
})
# Failed login attempts check
failed_logins = data.get("login_attempts", {}).get("failed_login_attempts")
if failed_logins:
anomalies.append({
"severity": "high",
"reason": f"{len(failed_logins)} failed login attempts detected."
})
# Nmap scan changes check
if "nmap_results" in data and "host_ports" in baselines:
current_hosts_info = {host['ip']: host for host in data["nmap_results"].get("hosts", [])}
current_hosts = set(current_hosts_info.keys())
baseline_hosts = set(baselines["host_ports"].keys())
# New hosts
new_hosts = sorted(list(current_hosts - baseline_hosts))
for host in new_hosts:
anomalies.append({
"severity": "high",
"reason": f"New host detected on the network: {host}"
})
# Changed ports on existing hosts
for host_ip in current_hosts.intersection(baseline_hosts):
current_ports = set(p['port'] for p in current_hosts_info[host_ip].get("open_ports", []))
baseline_ports = set(baselines["host_ports"].get(host_ip, []))
newly_opened = sorted(list(current_ports - baseline_ports))
for port in newly_opened:
port_info = port_applications.get(str(port), "Unknown")
anomalies.append({
"severity": "medium",
"reason": f"New port opened on {host_ip}: {port} ({port_info})"
})
# Docker container status check
docker_status = data.get("docker_container_status", {}).get("docker_container_status")
if docker_status:
for container_name, status in docker_status.items():
if status != "running":
anomalies.append({
"severity": "high",
"reason": f"Docker container '{container_name}' is not running. Current status: {status}"
})
# UFW log analysis
ufw_logs = data.get("ufw_logs", [])
if ufw_logs:
blocked_ips = {}
for log_line in ufw_logs:
if "[UFW BLOCK]" in log_line:
match = re.search(r"SRC=([\d\.]+)", log_line)
if match:
ip = match.group(1)
blocked_ips[ip] = blocked_ips.get(ip, 0) + 1
for ip, count in blocked_ips.items():
if count > 10:
anomalies.append({
"severity": "medium",
"reason": f"High number of blocked connections ({count}) from IP address: {ip}"
})
return anomalies
# --- LLM Interaction Function ---
def build_llm_prompt(anomalies):
"""Builds the prompt for the LLM to generate a report from anomalies."""
return f"""
**Role:** You are a dedicated and expert system administrator. Your primary role is to provide a concise, actionable report based on a list of pre-identified anomalies.
**Instruction:** Please synthesize the following list of anomalies into a single, human-readable report. The report should be a single JSON object with two keys: "severity" and "reason". The "severity" should be the highest severity from the list of anomalies. The "reason" should be a summary of all the anomalies.
**Anomalies:**
{json.dumps(anomalies, indent=2)}
**Output Request:** Provide a report as a single JSON object with two keys: "severity" and "reason". The "severity" must be one of "high", "medium", "low", or "none". The "reason" must be a natural language explanation of the anomaly. If no anomaly is found, return a single JSON object with "severity" set to "none" and "reason" as an empty string. Do not wrap the JSON in markdown or any other formatting. Only return the JSON, and nothing else.
"""
def generate_llm_report(anomalies):
"""Generates a report from a list of anomalies using the local LLM."""
logger.info("Generating LLM report...")
if not anomalies:
return {"severity": "none", "reason": ""}
prompt = build_llm_prompt(anomalies)
try:
response = ollama.generate(model="phi4-mini", prompt=prompt)
sanitized_response = response['response'].strip()
# Extract JSON from the response
try:
# Find the first '{' and the last '}' to extract the JSON object
start_index = sanitized_response.find('{')
end_index = sanitized_response.rfind('}')
if start_index != -1 and end_index != -1:
json_string = sanitized_response[start_index:end_index+1]
llm_response = json.loads(json_string)
logger.info(f"LLM Response: {llm_response}")
return llm_response
else:
# Handle cases where the response is not valid JSON
logger.warning(f"LLM returned a non-JSON response: {sanitized_response}")
return {"severity": "low", "reason": sanitized_response}
except json.JSONDecodeError as e:
logger.error(f"Error decoding LLM response: {e}")
# Fallback for invalid JSON
return {"severity": "low", "reason": sanitized_response}
except Exception as e:
logger.error(f"Error interacting with LLM: {e}")
return None
# --- Alerting Functions ---
def send_discord_alert(llm_response, combined_data):
"""Sends an alert to Discord."""
reason = llm_response.get('reason', 'No reason provided.')
message = f"""**High Severity Alert:**
> {reason}
**Relevant Data:**
```json
{json.dumps(combined_data, indent=2)}
```"""
webhook = DiscordWebhook(url=config.DISCORD_WEBHOOK_URL, content=message)
try:
response = webhook.execute()
if response.status_code == 200:
logger.info("Discord alert sent successfully.")
else:
logger.error(f"Error sending Discord alert: {response.status_code} - {response.content}")
except Exception as e:
logger.error(f"Error sending Discord alert: {e}")
def send_google_home_alert(message):
"""Sends an alert to a Google Home speaker via Home Assistant."""
# Simplify the message for better TTS delivery
try:
response = ollama.generate(model="llama3.1:8b", prompt=f"Summarize the following message in a single sentence: {message}")
simplified_message = response['response'].strip()
except Exception as e:
logger.error(f"Error summarizing message: {e}")
simplified_.message = message.split('.')[0] # Take the first sentence as a fallback
url = f"{config.HOME_ASSISTANT_URL}/api/services/tts/speak"
headers = {
"Authorization": f"Bearer {config.HOME_ASSISTANT_TOKEN}",
"Content-Type": "application/json",
}
data = {
"entity_id": "all",
"media_player_entity_id": config.GOOGLE_HOME_SPEAKER_ID,
"message": simplified_message, # type: ignore
}
try:
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
logger.info("Google Home alert sent successfully.")
else:
logger.error(f"Error sending Google Home alert: {response.status_code} - {response.text}")
except Exception as e:
logger.error(f"Error sending Google Home alert: {e}")
# --- Main Script Logic ---
def is_alerting_time():
"""Checks if the current time is within the alerting window (9am - 12am)."""
current_hour = datetime.now().hour
return 9 <= current_hour < 24
daily_events = []
def send_daily_recap():
"""Sends a daily recap of events to Discord."""
global daily_events
if daily_events:
recap_message = "**Daily Recap:**\n" + "\n".join(daily_events)
# Split the message into chunks of 2000 characters
message_chunks = [recap_message[i:i+2000] for i in range(0, len(recap_message), 2000)]
for chunk in message_chunks:
webhook = DiscordWebhook(url=config.DISCORD_WEBHOOK_URL, content=chunk)
try:
response = webhook.execute()
if response.status_code == 200:
logger.info("Daily recap chunk sent successfully.")
else:
logger.error(f"Error sending daily recap chunk: {response.status_code} - {response.content}")
except Exception as e:
logger.error(f"Error sending daily recap chunk: {e}")
time.sleep(1) # Wait 1 second between chunks to avoid rate limiting
daily_events = [] # Reset for the next day
def run_monitoring_cycle(nmap_scan_counter):
"""Runs a single monitoring cycle."""
logger.info("Running monitoring cycle...")
system_logs = get_system_logs()
network_metrics = get_network_metrics()
sensors_output = get_sensor_data()
cpu_temp = get_cpu_temperature(sensors_output)
gpu_temp = get_gpu_temperature(sensors_output)
login_attempts = get_login_attempts()
docker_container_status = get_docker_container_status()
ufw_logs = get_ufw_logs()
nmap_results = None
if nmap_scan_counter == 0:
nmap_results = get_nmap_scan_results()
nmap_scan_counter = (nmap_scan_counter + 1) % 4 # Run nmap scan every 4th cycle (20 minutes)
if system_logs and network_metrics:
combined_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"system_logs": system_logs,
"network_metrics": network_metrics,
"cpu_temperature": cpu_temp,
"gpu_temperature": gpu_temp,
"login_attempts": login_attempts,
"docker_container_status": docker_container_status,
"ufw_logs": ufw_logs
}
if nmap_results:
combined_data["nmap_results"] = nmap_results
data_storage.store_data(combined_data)
data_storage.enforce_retention_policy()
with open("known_issues.json", "r") as f:
known_issues = json.load(f)
with open("port_applications.json", "r") as f:
port_applications = json.load(f)
baselines = data_storage.calculate_baselines()
anomalies = analyze_data_locally(combined_data, baselines, known_issues, port_applications)
if anomalies:
logger.info(f"Detected {len(anomalies)} anomalies: {anomalies}")
llm_response = generate_llm_report(anomalies)
if llm_response and llm_response.get('severity') != "none":
daily_events.append(llm_response.get('reason'))
if llm_response.get('severity') == "high" and is_alerting_time():
send_discord_alert(llm_response, combined_data)
send_google_home_alert(llm_response.get('reason'))
return nmap_scan_counter
def main():
"""Main function to run the monitoring agent."""
data_storage.initialize_database()
if config.TEST_MODE:
logger.info("Running in test mode...")
run_monitoring_cycle(0)
else:
schedule.every().day.at(config.DAILY_RECAP_TIME).do(send_daily_recap)
nmap_scan_counter = 0
while True:
nmap_scan_counter = run_monitoring_cycle(nmap_scan_counter)
schedule.run_pending()
time.sleep(300) # Run every 5 minutes
if __name__ == "__main__":
main()