Files
LLM-Powered-Monitoring-Agent/monitor_agent.py

184 lines
6.6 KiB
Python

import json
import platform
import subprocess
import time
import logging
from syslog_rfc5424_parser import SyslogMessage
import jc
import ollama
from discord_webhook import DiscordWebhook, DiscordEmbed
import requests
import config
# --- Logging Configuration ---
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("monitor_agent.log"),
logging.StreamHandler()
])
# --- Data Ingestion & Parsing Functions ---
def get_system_logs():
"""
Simulates collecting and parsing a system log entry.
This function uses a mock syslog entry and parses it using the
syslog-rfc5424-parser library.
Returns:
dict: A dictionary representing the parsed log entry.
"""
mock_log_entry = '<165>1 2025-08-15T12:00:00Z my-host app-name - - [meta sequenceId="1"] { "log": "Failed login attempt for user \'root\' from 10.0.0.1" }'
try:
parsed_log = SyslogMessage.parse(mock_log_entry)
if parsed_log.msg:
try:
log_content = json.loads(parsed_log.msg)
return log_content
except json.JSONDecodeError:
logging.warning(f"Could not parse log message as JSON: {parsed_log.msg}")
return {"log": parsed_log.msg}
except Exception as e:
logging.error(f"Error parsing syslog message: {e}")
return {}
def get_network_metrics():
"""
Simulates collecting and parsing network data by running the ping command.
This function uses the `ping` command to generate network statistics
and the `jc` library to parse the output into a structured format.
Returns:
dict: A dictionary containing the parsed network metrics.
"""
ping_param = '-n' if platform.system() == "Windows" else '-c'
try:
ping_output = subprocess.run(['ping', ping_param, '3', '8.8.8.8'], capture_output=True, text=True, check=True).stdout
parsed_metrics = jc.parse('ping', ping_output)
if parsed_metrics and isinstance(parsed_metrics, list):
return parsed_metrics[0]
else:
logging.warning("Could not parse ping output with jc. Returning raw output.")
return {"ping_output": ping_output}
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logging.error(f"Error running ping command: {e}")
return {"error": str(e)}
return {}
# --- LLM Interaction Function ---
def analyze_data_with_llm(data):
"""
Analyzes the given data with a local LLM to detect anomalies.
Args:
data (dict): The structured data to analyze.
Returns:
str: The raw response text from the LLM.
"""
structured_data_as_string = json.dumps(data, indent=2)
prompt = f"""Role: You are a dedicated and expert system administrator. Your primary role is to identify anomalies and provide concise, actionable reports.
Instruction: Analyze the following system and network data for any activity that appears out of place or different. Consider unusual values, errors, or unexpected patterns as anomalies.
Context: Here is the system data in JSON format for your analysis: {structured_data_as_string}
Output Request: If you find an anomaly, provide a report as a single, coherent, natural language paragraph. The report must clearly state the anomaly, its potential cause, and its severity (e.g., high, medium, low). If no anomaly is found, respond with \"OK\".
Reasoning Hint: Think step by step to come to your conclusion. This is very important."""
try:
client = ollama.Client(host=config.OLLAMA_HOST)
response = client.generate(
model="llama3.1:8b",
prompt=prompt
)
return response['response'].strip()
except Exception as e:
logging.error(f"Error communicating with Ollama: {e}")
return f"Error communicating with Ollama: {e}"
# --- Alerting Functions ---
def send_discord_alert(message):
"""
Sends an alert message to a Discord webhook.
Args:
message (str): The message to send.
"""
if config.DISCORD_WEBHOOK_URL == "YOUR_DISCORD_WEBHOOK_URL_HERE":
logging.info("Skipping Discord alert: Webhook URL not configured.")
return
webhook = DiscordWebhook(url=config.DISCORD_WEBHOOK_URL)
embed = DiscordEmbed(title="Anomaly Detected!", description=message, color='FF0000')
webhook.add_embed(embed)
try:
response = webhook.execute()
logging.info("Discord alert sent.")
except Exception as e:
logging.error(f"Error sending Discord alert: {e}")
def send_google_home_alert(message):
"""
Sends an alert message to a Google Home speaker via Home Assistant.
Args:
message (str): The message to be spoken.
"""
if config.HOME_ASSISTANT_URL == "http://YOUR_HOME_ASSISTANT_IP:8123":
logging.info("Skipping Google Home alert: Home Assistant URL not configured.")
return
url = f"{config.HOME_ASSISTANT_URL}/api/services/tts/speak"
headers = {
"Authorization": f"Bearer {config.HOME_ASSISTANT_TOKEN}",
"Content-Type": "application/json",
}
payload = {
"entity_id": "tts.google_en_com",
"media_player_entity_id": config.GOOGLE_HOME_SPEAKER_ID,
"message": message,
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
logging.info("Google Home alert sent.")
except requests.exceptions.RequestException as e:
logging.error(f"Error sending Google Home alert: {e}")
# --- Main Script Logic ---
def main():
"""
The main execution loop for the monitoring agent.
"""
while True:
logging.info("--- Running Monitoring Cycle ---")
system_logs = get_system_logs()
logging.info(f"System Logs: {system_logs}")
network_metrics = get_network_metrics()
logging.info(f"Network Metrics: {network_metrics}")
combined_data = {
"system_logs": system_logs,
"network_metrics": network_metrics
}
logging.info(f"Combined Data: {json.dumps(combined_data, indent=2)}")
llm_response = analyze_data_with_llm(combined_data)
logging.info(f"LLM Response: {llm_response}")
if llm_response != "OK":
logging.info("Anomaly detected, sending alerts...")
send_discord_alert(llm_response)
send_google_home_alert(llm_response)
logging.info("--- Cycle Complete, sleeping for 5 minutes ---")
time.sleep(300) # 300 seconds = 5 minutes
if __name__ == "__main__":
main()