Files
LLM-Powered-Monitoring-Agent/monitor_agent.py

299 lines
12 KiB
Python

# LLM-Powered Monitoring Agent
import time
import json
import subprocess
import ollama
from discord_webhook import DiscordWebhook
import requests
import data_storage
import re
import os
from datetime import datetime, timezone
import pingparsing
# Load configuration
import config
from syslog_rfc5424_parser import parser
LOG_POSITION_FILE = 'log_position.txt'
# --- Data Ingestion & Parsing Functions ---
def get_system_logs():
"""Gets new lines from /var/log/syslog since the last check."""
try:
last_position = 0
if os.path.exists(LOG_POSITION_FILE):
with open(LOG_POSITION_FILE, 'r') as f:
last_position = int(f.read())
with open("/var/log/syslog", "r") as f:
f.seek(last_position)
log_lines = f.readlines()
current_position = f.tell()
with open(LOG_POSITION_FILE, 'w') as f:
f.write(str(current_position))
parsed_logs = []
for line in log_lines:
try:
parsed_logs.append(parser.parse(line).as_dict())
except Exception:
# If parsing fails, just append the raw line
parsed_logs.append({"raw_log": line.strip()})
return {"syslog": parsed_logs}
except FileNotFoundError:
print("Error: /var/log/syslog not found.")
return {"syslog": []}
except Exception as e:
print(f"Error reading syslog: {e}")
return {"syslog": []}
import pingparsing
def get_network_metrics():
"""Gets network metrics by pinging 8.8.8.8."""
try:
ping_parser = pingparsing.PingParsing()
transmitter = pingparsing.PingTransmitter()
transmitter.destination = "8.8.8.8"
transmitter.count = 3
result = transmitter.ping()
return ping_parser.parse(result).as_dict()
except Exception as e:
print(f"Error getting network metrics: {e}")
return {"error": "ping command failed"}
def get_cpu_temperature():
"""Gets the CPU temperature using the sensors command."""
try:
sensors_output = subprocess.check_output(["sensors"], text=True)
# Use regex to find the CPU temperature
match = re.search(r"Package id 0:\s+\+([\d\.]+)", sensors_output)
if match:
return {"cpu_temperature": float(match.group(1))}
else:
return {"cpu_temperature": "N/A"}
except (subprocess.CalledProcessError, FileNotFoundError):
print("Error: 'sensors' command not found. Please install lm-sensors.")
return {"cpu_temperature": "N/A"}
def get_gpu_temperature():
"""Gets the GPU temperature using the sensors command."""
try:
sensors_output = subprocess.check_output(["sensors"], text=True)
# Use regex to find the GPU temperature for amdgpu
match = re.search(r"edge:\s+\+([\d\.]+)", sensors_output)
if match:
return {"gpu_temperature": float(match.group(1))}
else:
# if amdgpu not found, try radeon
match = re.search(r"temp1:\s+\+([\d\.]+)", sensors_output)
if match:
return {"gpu_temperature": float(match.group(1))}
else:
return {"gpu_temperature": "N/A"}
except (subprocess.CalledProcessError, FileNotFoundError):
print("Error: 'sensors' command not found. Please install lm-sensors.")
return {"gpu_temperature": "N/A"}
def get_login_attempts():
"""Gets system login attempts from /var/log/auth.log."""
try:
with open("/var/log/auth.log", "r") as f:
log_lines = f.readlines()
failed_logins = []
for line in log_lines:
if "Failed password" in line:
failed_logins.append(line.strip())
return {"failed_login_attempts": failed_logins}
except FileNotFoundError:
print("Error: /var/log/auth.log not found.")
return {"failed_login_attempts": []}
except Exception as e:
print(f"Error reading login attempts: {e}")
return {"failed_logins": []}
# --- LLM Interaction Function ---
def analyze_data_with_llm(data, baselines):
"""Analyzes data with the local LLM."""
with open("CONSTRAINTS.md", "r") as f:
constraints = f.read()
with open("known_issues.json", "r") as f:
known_issues = json.load(f)
prompt = f"""
**Role:** You are a dedicated and expert system administrator. Your primary role is to identify anomalies and provide concise, actionable reports.
**Instruction:** Analyze the following system and network data for any activity that appears out of place or different. Consider unusual values, errors, or unexpected patterns as anomalies. Compare the current data with the historical baseline data to identify significant deviations. Consult the known issues feed to avoid flagging resolved or expected issues.
**Context:**
Here is the system data in JSON format for your analysis: {json.dumps(data, indent=2)}
**Historical Baseline Data:**
{json.dumps(baselines, indent=2)}
**Known Issues Feed:**
{json.dumps(known_issues, indent=2)}
**Constraints and Guidelines:**
{constraints}
**Output Request:** If you find an anomaly, provide a report as a single JSON object with two keys: "severity" and "reason". The "severity" must be one of "high", "medium", "low", or "none". The "reason" must be a natural language explanation of the anomaly. If no anomaly is found, return a single JSON object with "severity" set to "none" and "reason" as an empty string. Do not wrap the JSON in markdown or any other formatting.
**Reasoning Hint:** Think step by step to come to your conclusion. This is very important.
"""
try:
response = ollama.generate(model="llama3.1:8b", prompt=prompt)
# Sanitize the response to ensure it's valid JSON
sanitized_response = response['response'].strip()
# Find the first '{' and the last '}' to extract the JSON object
start_index = sanitized_response.find('{')
end_index = sanitized_response.rfind('}')
if start_index != -1 and end_index != -1:
json_string = sanitized_response[start_index:end_index+1]
try:
return json.loads(json_string)
except json.JSONDecodeError:
# If parsing a single object fails, try parsing as a list
try:
json_list = json.loads(json_string)
if isinstance(json_list, list) and json_list:
return json_list[0] # Return the first object in the list
except json.JSONDecodeError as e:
print(f"Error decoding LLM response: {e}")
# Fallback for invalid JSON
return {"severity": "low", "reason": response['response'].strip()}
else:
# Handle cases where the response is not valid JSON
print(f"LLM returned a non-JSON response: {sanitized_response}")
return {"severity": "low", "reason": sanitized_response}
except Exception as e:
print(f"Error interacting with LLM: {e}")
return None
# --- Alerting Functions ---
def send_discord_alert(message):
"""Sends an alert to Discord."""
webhook = DiscordWebhook(url=config.DISCORD_WEBHOOK_URL, content=message)
try:
response = webhook.execute()
if response.status_code == 200:
print("Discord alert sent successfully.")
else:
print(f"Error sending Discord alert: {response.status_code} - {response.content}")
except Exception as e:
print(f"Error sending Discord alert: {e}")
def send_google_home_alert(message):
"""Sends an alert to a Google Home speaker via Home Assistant."""
# Simplify the message for better TTS delivery
try:
response = ollama.generate(model="llama3.1:8b", prompt=f"Summarize the following message in a single sentence: {message}")
simplified_message = response['response'].strip()
except Exception as e:
print(f"Error summarizing message: {e}")
simplified_message = message.split('.')[0] # Take the first sentence as a fallback
url = f"{config.HOME_ASSISTANT_URL}/api/services/tts/speak"
headers = {
"Authorization": f"Bearer {config.HOME_ASSISTANT_TOKEN}",
"Content-Type": "application/json",
}
data = {
"entity_id": "all",
"media_player_entity_id": config.GOOGLE_HOME_SPEAKER_ID,
"message": simplified_message,
}
try:
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
print("Google Home alert sent successfully.")
else:
print(f"Error sending Google Home alert: {response.status_code} - {response.text}")
except Exception as e:
print(f"Error sending Google Home alert: {e}")
# --- Main Script Logic ---
daily_events = []
if __name__ == "__main__":
if config.TEST_MODE:
print("Running in test mode...")
system_logs = get_system_logs()
network_metrics = get_network_metrics()
cpu_temp = get_cpu_temperature()
gpu_temp = get_gpu_temperature()
login_attempts = get_login_attempts()
if system_logs and network_metrics:
combined_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"system_logs": system_logs,
"network_metrics": network_metrics,
"cpu_temperature": cpu_temp,
"gpu_temperature": gpu_temp,
"login_attempts": login_attempts
}
data_storage.store_data(combined_data)
llm_response = analyze_data_with_llm(combined_data, data_storage.calculate_baselines())
if llm_response and llm_response.get('severity') != "none":
print(f"Anomaly detected: {llm_response.get('reason')}")
if llm_response.get('severity') == "high":
send_discord_alert(llm_response.get('reason'))
send_google_home_alert(llm_response.get('reason'))
else:
print("No anomaly detected.")
else:
while True:
print("Running monitoring cycle...")
system_logs = get_system_logs()
network_metrics = get_network_metrics()
cpu_temp = get_cpu_temperature()
gpu_temp = get_gpu_temperature()
login_attempts = get_login_attempts()
if system_logs and network_metrics:
combined_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"system_logs": system_logs,
"network_metrics": network_metrics,
"cpu_temperature": cpu_temp,
"gpu_temperature": gpu_temp,
"login_attempts": login_attempts
}
data_storage.store_data(combined_data)
llm_response = analyze_data_with_llm(combined_data, data_storage.calculate_baselines())
if llm_response and llm_response.get('severity') != "none":
daily_events.append(llm_response.get('reason'))
if llm_response.get('severity') == "high":
send_discord_alert(llm_response.get('reason'))
send_google_home_alert(llm_response.get('reason'))
# Daily Recap Logic
current_time = time.strftime("%H:%M")
if current_time == config.DAILY_RECAP_TIME and daily_events:
recap_message = "\n".join(daily_events)
send_discord_alert(f"**Daily Recap:**\n{recap_message}")
daily_events = [] # Reset for the next day
time.sleep(300) # Run every 5 minutes