Files
LLM-Powered-Monitoring-Agent/monitor_agent.py

238 lines
10 KiB
Python

# LLM-Powered Monitoring Agent
import time
import json
import subprocess
import ollama
from discord_webhook import DiscordWebhook
import requests
import data_storage
import jc
# Load configuration
import config
# --- Data Ingestion & Parsing Functions ---
def get_system_logs():
"""Simulates collecting and parsing system logs."""
# Mock log entry for demonstration
mock_log_entry = '{"timestamp": "2025-08-15T12:00:00Z", "log": "Failed login attempt for user \'root\' from 10.0.0.1"}'
try:
parsed_log = json.loads(mock_log_entry)
return parsed_log
except json.JSONDecodeError as e:
print(f"Error parsing system log: {e}")
return None
def get_network_metrics():
"""Simulates collecting and parsing network data."""
# Mock ping output for demonstration
mock_ping_output = '''{"destination_ip":"8.8.8.8","data_bytes":56,"pattern":null,"destination":"8.8.8.8","duplicates":0,"packets_transmitted":3,"packets_received":3,"packet_loss_percent":0.0,"time_ms":2003.0,"round_trip_ms_min":18.79,"round_trip_ms_avg":21.212,"round_trip_ms_max":22.787,"round_trip_ms_stddev":1.738,"responses":[{"type":"reply","timestamp":null,"bytes":64,"response_ip":"8.8.8.8","icmp_seq":1,"ttl":111,"time_ms":18.8,"duplicate":false},{"type":"reply","timestamp":null,"bytes":64,"response_ip":"8.8.8.8","icmp_seq":2,"ttl":111,"time_ms":22.8,"duplicate":false},{"type":"reply","timestamp":null,"bytes":64,"response_ip":"8.8.8.8","icmp_seq":3,"ttl":111,"time_ms":22.1,"duplicate":false}]}'''
try:
parsed_ping = json.loads(mock_ping_output)
if parsed_ping:
return {
"packets_transmitted": parsed_ping.get("packets_transmitted"),
"packets_received": parsed_ping.get("packets_received"),
"packet_loss_percent": parsed_ping.get("packet_loss_percent"),
"round_trip_ms_avg": parsed_ping.get("round_trip_ms_avg"),
}
return None
except json.JSONDecodeError as e:
print(f"Error parsing network metrics: {e}")
return None
def get_cpu_temperature():
"""Gets the CPU temperature using the sensors command."""
try:
subprocess.check_output(["sensors"], text=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("Error: 'sensors' command not found. Please install lm-sensors.")
return {"cpu_temperature": "N/A"}
try:
sensors_output = subprocess.check_output(["sensors"], text=True)
parsed_sensors = jc.parse('sensors', sensors_output)
# This is a simplified example, you may need to adjust the parsing logic based on your specific hardware
cpu_temp = parsed_sensors[0]['values'][0]['input']
return {"cpu_temperature": cpu_temp}
except (subprocess.CalledProcessError, FileNotFoundError, KeyError, IndexError, jc.exceptions.ParseError) as e:
print(f"Error getting CPU temperature: {e}")
return {"cpu_temperature": "N/A"}
def get_gpu_temperature():
"""Gets the GPU temperature using the sensors command."""
try:
subprocess.check_output(["sensors"], text=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("Error: 'sensors' command not found. Please install lm-sensors.")
return {"gpu_temperature": "N/A"}
try:
sensors_output = subprocess.check_output(["sensors"], text=True)
parsed_sensors = jc.parse('sensors', sensors_output)
# This is a simplified example, you may need to adjust the parsing logic based on your specific hardware
# Look for the adapter that contains "amdgpu" or "radeon"
for adapter in parsed_sensors:
if 'amdgpu' in adapter.get('adapter', '').lower() or 'radeon' in adapter.get('adapter', '').lower():
gpu_temp = adapter['values'][0]['input']
return {"gpu_temperature": gpu_temp}
return {"gpu_temperature": "N/A"}
except (subprocess.CalledProcessError, FileNotFoundError, KeyError, IndexError, jc.exceptions.ParseError) as e:
print(f"Error getting GPU temperature: {e}")
return {"gpu_temperature": "N/A"}
def get_login_attempts():
"""Gets system login attempts from /var/log/auth.log."""
try:
with open("/var/log/auth.log", "r") as f:
log_lines = f.readlines()
failed_logins = []
for line in log_lines:
if "Failed password" in line:
failed_logins.append(line.strip())
return {"failed_login_attempts": failed_logins}
except FileNotFoundError:
print("Error: /var/log/auth.log not found.")
return {"failed_login_attempts": []}
except Exception as e:
print(f"Error reading login attempts: {e}")
return {"failed_login_attempts": []}
# --- LLM Interaction Function ---
def analyze_data_with_llm(data, baselines):
"""Analyzes data with the local LLM."""
prompt = f"""
**Role:** You are a dedicated and expert system administrator. Your primary role is to identify anomalies and provide concise, actionable reports.
**Instruction:** Analyze the following system and network data for any activity that appears out of place or different. Consider unusual values, errors, or unexpected patterns as anomalies. Compare the current data with the historical baseline data to identify significant deviations.
**Context:**
Here is the system data in JSON format for your analysis: {json.dumps(data, indent=2)}
**Historical Baseline Data:**
{json.dumps(baselines, indent=2)}
**Output Request:** If you find an anomaly, provide a report as a single, coherent, natural language paragraph. The report must clearly state the anomaly, its potential cause, and its severity (e.g., high, medium, low). If no anomaly is found, respond with "OK".
**Reasoning Hint:** Think step by step to come to your conclusion. This is very important.
"""
try:
response = ollama.generate(model="llama3.1:8b", prompt=prompt)
return response['response'].strip()
except Exception as e:
print(f"Error interacting with LLM: {e}")
return None
# --- Alerting Functions ---
def send_discord_alert(message):
"""Sends an alert to Discord."""
webhook = DiscordWebhook(url=config.DISCORD_WEBHOOK_URL, content=message)
try:
response = webhook.execute()
if response.status_code == 200:
print("Discord alert sent successfully.")
else:
print(f"Error sending Discord alert: {response.status_code} - {response.content}")
except Exception as e:
print(f"Error sending Discord alert: {e}")
def send_google_home_alert(message):
"""Sends an alert to a Google Home speaker via Home Assistant."""
# Simplify the message for better TTS delivery
simplified_message = message.split('.')[0] # Take the first sentence
url = f"{config.HOME_ASSISTANT_URL}/api/services/tts/speak"
headers = {
"Authorization": f"Bearer {config.HOME_ASSISTANT_TOKEN}",
"Content-Type": "application/json",
}
data = {
"entity_id": "tts.google_en_com",
"media_player_entity_id": config.GOOGLE_HOME_SPEAKER_ID,
"message": simplified_message,
}
try:
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
print("Google Home alert sent successfully.")
else:
print(f"Error sending Google Home alert: {response.status_code} - {response.text}")
except Exception as e:
print(f"Error sending Google Home alert: {e}")
# --- Main Script Logic ---
daily_events = []
if __name__ == "__main__":
if config.TEST_MODE:
print("Running in test mode...")
system_logs = get_system_logs()
network_metrics = get_network_metrics()
cpu_temp = get_cpu_temperature()
gpu_temp = get_gpu_temperature()
login_attempts = get_login_attempts()
if system_logs and network_metrics:
combined_data = {
"system_logs": system_logs,
"network_metrics": network_metrics,
"cpu_temperature": cpu_temp,
"gpu_temperature": gpu_temp,
"login_attempts": login_attempts
}
data_storage.store_data(combined_data)
llm_response = analyze_data_with_llm(combined_data, data_storage.calculate_baselines())
if llm_response and llm_response != "OK":
print(f"Anomaly detected: {llm_response}")
if "high" in llm_response.lower():
send_discord_alert(llm_response)
send_google_home_alert(llm_response)
else:
print("No anomaly detected.")
else:
while True:
print("Running monitoring cycle...")
system_logs = get_system_logs()
network_metrics = get_network_metrics()
cpu_temp = get_cpu_temperature()
gpu_temp = get_gpu_temperature()
login_attempts = get_login_attempts()
if system_logs and network_metrics:
combined_data = {
"system_logs": system_logs,
"network_metrics": network_metrics,
"cpu_temperature": cpu_temp,
"gpu_temperature": gpu_temp,
"login_attempts": login_attempts
}
data_storage.store_data(combined_data)
llm_response = analyze_data_with_llm(combined_data, data_storage.calculate_baselines())
if llm_response and llm_response != "OK":
daily_events.append(llm_response)
if "high" in llm_response.lower():
send_discord_alert(llm_response)
send_google_home_alert(llm_response)
# Daily Recap Logic
current_time = time.strftime("%H:%M")
if current_time == config.DAILY_RECAP_TIME and daily_events:
recap_message = "\n".join(daily_events)
send_discord_alert(f"**Daily Recap:**\n{recap_message}")
daily_events = [] # Reset for the next day
time.sleep(300) # Run every 5 minutes