Added Test Mode
This commit is contained in:
269
monitor_agent.py
269
monitor_agent.py
@@ -1,184 +1,165 @@
|
||||
import json
|
||||
import platform
|
||||
import subprocess
|
||||
import time
|
||||
import logging
|
||||
from syslog_rfc5424_parser import SyslogMessage
|
||||
import jc
|
||||
import ollama
|
||||
from discord_webhook import DiscordWebhook, DiscordEmbed
|
||||
import requests
|
||||
import config
|
||||
# LLM-Powered Monitoring Agent
|
||||
|
||||
# --- Logging Configuration ---
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler("monitor_agent.log"),
|
||||
logging.StreamHandler()
|
||||
])
|
||||
import time
|
||||
import json
|
||||
import subprocess
|
||||
import ollama
|
||||
from discord_webhook import DiscordWebhook
|
||||
import requests
|
||||
import data_storage
|
||||
|
||||
# Load configuration
|
||||
import config
|
||||
|
||||
# --- Data Ingestion & Parsing Functions ---
|
||||
|
||||
def get_system_logs():
|
||||
"""
|
||||
Simulates collecting and parsing a system log entry.
|
||||
|
||||
This function uses a mock syslog entry and parses it using the
|
||||
syslog-rfc5424-parser library.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary representing the parsed log entry.
|
||||
"""
|
||||
mock_log_entry = '<165>1 2025-08-15T12:00:00Z my-host app-name - - [meta sequenceId="1"] { "log": "Failed login attempt for user \'root\' from 10.0.0.1" }'
|
||||
"""Simulates collecting and parsing system logs."""
|
||||
# Mock log entry for demonstration
|
||||
mock_log_entry = '{"timestamp": "2025-08-15T12:00:00Z", "log": "Failed login attempt for user \'root\' from 10.0.0.1"}'
|
||||
try:
|
||||
parsed_log = SyslogMessage.parse(mock_log_entry)
|
||||
if parsed_log.msg:
|
||||
try:
|
||||
log_content = json.loads(parsed_log.msg)
|
||||
return log_content
|
||||
except json.JSONDecodeError:
|
||||
logging.warning(f"Could not parse log message as JSON: {parsed_log.msg}")
|
||||
return {"log": parsed_log.msg}
|
||||
except Exception as e:
|
||||
logging.error(f"Error parsing syslog message: {e}")
|
||||
return {}
|
||||
parsed_log = json.loads(mock_log_entry)
|
||||
return parsed_log
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Error parsing system log: {e}")
|
||||
return None
|
||||
|
||||
def get_network_metrics():
|
||||
"""
|
||||
Simulates collecting and parsing network data by running the ping command.
|
||||
|
||||
This function uses the `ping` command to generate network statistics
|
||||
and the `jc` library to parse the output into a structured format.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the parsed network metrics.
|
||||
"""
|
||||
ping_param = '-n' if platform.system() == "Windows" else '-c'
|
||||
"""Simulates collecting and parsing network data."""
|
||||
# Mock ping output for demonstration
|
||||
mock_ping_output = '''{"destination_ip":"8.8.8.8","data_bytes":56,"pattern":null,"destination":"8.8.8.8","duplicates":0,"packets_transmitted":3,"packets_received":3,"packet_loss_percent":0.0,"time_ms":2003.0,"round_trip_ms_min":18.79,"round_trip_ms_avg":21.212,"round_trip_ms_max":22.787,"round_trip_ms_stddev":1.738,"responses":[{"type":"reply","timestamp":null,"bytes":64,"response_ip":"8.8.8.8","icmp_seq":1,"ttl":111,"time_ms":18.8,"duplicate":false},{"type":"reply","timestamp":null,"bytes":64,"response_ip":"8.8.8.8","icmp_seq":2,"ttl":111,"time_ms":22.8,"duplicate":false},{"type":"reply","timestamp":null,"bytes":64,"response_ip":"8.8.8.8","icmp_seq":3,"ttl":111,"time_ms":22.1,"duplicate":false}]}'''
|
||||
try:
|
||||
ping_output = subprocess.run(['ping', ping_param, '3', '8.8.8.8'], capture_output=True, text=True, check=True).stdout
|
||||
parsed_metrics = jc.parse('ping', ping_output)
|
||||
if parsed_metrics and isinstance(parsed_metrics, list):
|
||||
return parsed_metrics[0]
|
||||
else:
|
||||
logging.warning("Could not parse ping output with jc. Returning raw output.")
|
||||
return {"ping_output": ping_output}
|
||||
except (subprocess.CalledProcessError, FileNotFoundError) as e:
|
||||
logging.error(f"Error running ping command: {e}")
|
||||
return {"error": str(e)}
|
||||
return {}
|
||||
|
||||
parsed_ping = json.loads(mock_ping_output)
|
||||
if parsed_ping:
|
||||
return {
|
||||
"packets_transmitted": parsed_ping.get("packets_transmitted"),
|
||||
"packets_received": parsed_ping.get("packets_received"),
|
||||
"packet_loss_percent": parsed_ping.get("packet_loss_percent"),
|
||||
"round_trip_ms_avg": parsed_ping.get("round_trip_ms_avg"),
|
||||
}
|
||||
return None
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Error parsing network metrics: {e}")
|
||||
return None
|
||||
|
||||
# --- LLM Interaction Function ---
|
||||
|
||||
def analyze_data_with_llm(data):
|
||||
def analyze_data_with_llm(data, baselines):
|
||||
"""Analyzes data with the local LLM."""
|
||||
prompt = f"""
|
||||
**Role:** You are a dedicated and expert system administrator. Your primary role is to identify anomalies and provide concise, actionable reports.
|
||||
|
||||
**Instruction:** Analyze the following system and network data for any activity that appears out of place or different. Consider unusual values, errors, or unexpected patterns as anomalies. Compare the current data with the historical baseline data to identify significant deviations.
|
||||
|
||||
**Context:**
|
||||
Here is the system data in JSON format for your analysis: {json.dumps(data, indent=2)}
|
||||
|
||||
**Historical Baseline Data:**
|
||||
{json.dumps(baselines, indent=2)}
|
||||
|
||||
**Output Request:** If you find an anomaly, provide a report as a single, coherent, natural language paragraph. The report must clearly state the anomaly, its potential cause, and its severity (e.g., high, medium, low). If no anomaly is found, respond with "OK".
|
||||
|
||||
**Reasoning Hint:** Think step by step to come to your conclusion. This is very important.
|
||||
"""
|
||||
Analyzes the given data with a local LLM to detect anomalies.
|
||||
|
||||
Args:
|
||||
data (dict): The structured data to analyze.
|
||||
|
||||
Returns:
|
||||
str: The raw response text from the LLM.
|
||||
"""
|
||||
structured_data_as_string = json.dumps(data, indent=2)
|
||||
prompt = f"""Role: You are a dedicated and expert system administrator. Your primary role is to identify anomalies and provide concise, actionable reports.
|
||||
Instruction: Analyze the following system and network data for any activity that appears out of place or different. Consider unusual values, errors, or unexpected patterns as anomalies.
|
||||
Context: Here is the system data in JSON format for your analysis: {structured_data_as_string}
|
||||
Output Request: If you find an anomaly, provide a report as a single, coherent, natural language paragraph. The report must clearly state the anomaly, its potential cause, and its severity (e.g., high, medium, low). If no anomaly is found, respond with \"OK\".
|
||||
Reasoning Hint: Think step by step to come to your conclusion. This is very important."""
|
||||
|
||||
try:
|
||||
client = ollama.Client(host=config.OLLAMA_HOST)
|
||||
response = client.generate(
|
||||
model="llama3.1:8b",
|
||||
prompt=prompt
|
||||
)
|
||||
response = ollama.generate(model="llama3.1:8b", prompt=prompt)
|
||||
return response['response'].strip()
|
||||
except Exception as e:
|
||||
logging.error(f"Error communicating with Ollama: {e}")
|
||||
return f"Error communicating with Ollama: {e}"
|
||||
print(f"Error interacting with LLM: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# --- Alerting Functions ---
|
||||
|
||||
def send_discord_alert(message):
|
||||
"""
|
||||
Sends an alert message to a Discord webhook.
|
||||
|
||||
Args:
|
||||
message (str): The message to send.
|
||||
"""
|
||||
if config.DISCORD_WEBHOOK_URL == "YOUR_DISCORD_WEBHOOK_URL_HERE":
|
||||
logging.info("Skipping Discord alert: Webhook URL not configured.")
|
||||
return
|
||||
|
||||
webhook = DiscordWebhook(url=config.DISCORD_WEBHOOK_URL)
|
||||
embed = DiscordEmbed(title="Anomaly Detected!", description=message, color='FF0000')
|
||||
webhook.add_embed(embed)
|
||||
"""Sends an alert to Discord."""
|
||||
webhook = DiscordWebhook(url=config.DISCORD_WEBHOOK_URL, content=message)
|
||||
try:
|
||||
response = webhook.execute()
|
||||
logging.info("Discord alert sent.")
|
||||
if response.status_code == 200:
|
||||
print("Discord alert sent successfully.")
|
||||
else:
|
||||
print(f"Error sending Discord alert: {response.status_code} - {response.content}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error sending Discord alert: {e}")
|
||||
print(f"Error sending Discord alert: {e}")
|
||||
|
||||
def send_google_home_alert(message):
|
||||
"""
|
||||
Sends an alert message to a Google Home speaker via Home Assistant.
|
||||
|
||||
Args:
|
||||
message (str): The message to be spoken.
|
||||
"""
|
||||
if config.HOME_ASSISTANT_URL == "http://YOUR_HOME_ASSISTANT_IP:8123":
|
||||
logging.info("Skipping Google Home alert: Home Assistant URL not configured.")
|
||||
return
|
||||
"""Sends an alert to a Google Home speaker via Home Assistant."""
|
||||
# Simplify the message for better TTS delivery
|
||||
simplified_message = message.split('.')[0] # Take the first sentence
|
||||
|
||||
url = f"{config.HOME_ASSISTANT_URL}/api/services/tts/speak"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {config.HOME_ASSISTANT_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
payload = {
|
||||
data = {
|
||||
"entity_id": "tts.google_en_com",
|
||||
"media_player_entity_id": config.GOOGLE_HOME_SPEAKER_ID,
|
||||
"message": message,
|
||||
"message": simplified_message,
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(url, headers=headers, json=payload)
|
||||
response.raise_for_status()
|
||||
logging.info("Google Home alert sent.")
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Error sending Google Home alert: {e}")
|
||||
response = requests.post(url, headers=headers, json=data)
|
||||
if response.status_code == 200:
|
||||
print("Google Home alert sent successfully.")
|
||||
else:
|
||||
print(f"Error sending Google Home alert: {response.status_code} - {response.text}")
|
||||
except Exception as e:
|
||||
print(f"Error sending Google Home alert: {e}")
|
||||
|
||||
# --- Main Script Logic ---
|
||||
|
||||
def main():
|
||||
"""
|
||||
The main execution loop for the monitoring agent.
|
||||
"""
|
||||
while True:
|
||||
logging.info("--- Running Monitoring Cycle ---")
|
||||
system_logs = get_system_logs()
|
||||
logging.info(f"System Logs: {system_logs}")
|
||||
network_metrics = get_network_metrics()
|
||||
logging.info(f"Network Metrics: {network_metrics}")
|
||||
|
||||
combined_data = {
|
||||
"system_logs": system_logs,
|
||||
"network_metrics": network_metrics
|
||||
}
|
||||
logging.info(f"Combined Data: {json.dumps(combined_data, indent=2)}")
|
||||
|
||||
llm_response = analyze_data_with_llm(combined_data)
|
||||
logging.info(f"LLM Response: {llm_response}")
|
||||
|
||||
if llm_response != "OK":
|
||||
logging.info("Anomaly detected, sending alerts...")
|
||||
send_discord_alert(llm_response)
|
||||
send_google_home_alert(llm_response)
|
||||
|
||||
logging.info("--- Cycle Complete, sleeping for 5 minutes ---")
|
||||
time.sleep(300) # 300 seconds = 5 minutes
|
||||
daily_events = []
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
if config.TEST_MODE:
|
||||
print("Running in test mode...")
|
||||
system_logs = get_system_logs()
|
||||
network_metrics = get_network_metrics()
|
||||
|
||||
if system_logs and network_metrics:
|
||||
combined_data = {
|
||||
"system_logs": system_logs,
|
||||
"network_metrics": network_metrics
|
||||
}
|
||||
data_storage.store_data(combined_data)
|
||||
|
||||
llm_response = analyze_data_with_llm(combined_data, data_storage.calculate_baselines())
|
||||
|
||||
if llm_response and llm_response != "OK":
|
||||
print(f"Anomaly detected: {llm_response}")
|
||||
if "high" in llm_response.lower():
|
||||
send_discord_alert(llm_response)
|
||||
send_google_home_alert(llm_response)
|
||||
else:
|
||||
print("No anomaly detected.")
|
||||
else:
|
||||
while True:
|
||||
print("Running monitoring cycle...")
|
||||
system_logs = get_system_logs()
|
||||
network_metrics = get_network_metrics()
|
||||
|
||||
if system_logs and network_metrics:
|
||||
combined_data = {
|
||||
"system_logs": system_logs,
|
||||
"network_metrics": network_metrics
|
||||
}
|
||||
|
||||
llm_response = analyze_data_with_llm(combined_data, data_storage.calculate_baselines())
|
||||
|
||||
if llm_response and llm_response != "OK":
|
||||
daily_events.append(llm_response)
|
||||
if "high" in llm_response.lower():
|
||||
send_discord_alert(llm_response)
|
||||
send_google_home_alert(llm_response)
|
||||
|
||||
# Daily Recap Logic
|
||||
current_time = time.strftime("%H:%M")
|
||||
if current_time == config.DAILY_RECAP_TIME and daily_events:
|
||||
recap_message = "\n".join(daily_events)
|
||||
send_discord_alert(f"**Daily Recap:**\n{recap_message}")
|
||||
daily_events = [] # Reset for the next day
|
||||
|
||||
time.sleep(300) # Run every 5 minutes
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user