172 lines
6.1 KiB
Python
172 lines
6.1 KiB
Python
import json
|
|
import subprocess
|
|
import time
|
|
from syslog_rfc5424_parser import parse
|
|
import jc
|
|
import ollama
|
|
from discord_webhook import DiscordWebhook, DiscordEmbed
|
|
import requests
|
|
import config
|
|
|
|
# --- Data Ingestion & Parsing Functions ---
|
|
|
|
def get_system_logs():
|
|
"""
|
|
Simulates collecting and parsing a system log entry.
|
|
|
|
This function uses a mock syslog entry and parses it using the
|
|
syslog-rfc5424-parser library.
|
|
|
|
Returns:
|
|
dict: A dictionary representing the parsed log entry.
|
|
"""
|
|
mock_log_entry = '<165>1 2025-08-15T12:00:00Z my-host app-name - - [meta sequenceId="1"] { "log": "Failed login attempt for user \'root\' from 10.0.0.1" }'
|
|
parsed_log = parse(mock_log_entry)
|
|
# The message part is a string, so we need to parse it as JSON
|
|
# In a real scenario, you might need to handle non-json messages
|
|
if parsed_log.message:
|
|
try:
|
|
log_content = json.loads(parsed_log.message)
|
|
# We can merge the log content with the parsed log for a more complete picture
|
|
# For now, we'll just return the content of the log message
|
|
return log_content
|
|
except json.JSONDecodeError:
|
|
return {"log": parsed_log.message}
|
|
return {}
|
|
|
|
def get_network_metrics():
|
|
"""
|
|
Simulates collecting and parsing network data by running the ping command.
|
|
|
|
This function uses the `ping` command to generate network statistics
|
|
and the `jc` library to parse the output into a structured format.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the parsed network metrics.
|
|
"""
|
|
# We ping a reliable address, like Google's DNS, 3 times.
|
|
# The '-c 3' argument is for Linux/macOS. For Windows, it would be '-n 3'.
|
|
# Since the target is an Ubuntu server, we'll use '-c'.
|
|
try:
|
|
ping_output = subprocess.run(['ping', '-c', '3', '8.8.8.8'], capture_output=True, text=True, check=True).stdout
|
|
parsed_metrics = jc.parse('ping', ping_output)
|
|
# We're interested in the summary statistics
|
|
if parsed_metrics:
|
|
return parsed_metrics[0]
|
|
except (subprocess.CalledProcessError, FileNotFoundError) as e:
|
|
# Handle cases where ping fails or is not installed
|
|
return {"error": str(e)}
|
|
return {}
|
|
|
|
|
|
# --- LLM Interaction Function ---
|
|
|
|
def analyze_data_with_llm(data):
|
|
"""
|
|
Analyzes the given data with a local LLM to detect anomalies.
|
|
|
|
Args:
|
|
data (dict): The structured data to analyze.
|
|
|
|
Returns:
|
|
str: The raw response text from the LLM.
|
|
"""
|
|
structured_data_as_string = json.dumps(data, indent=2)
|
|
prompt = f"""Role: You are a dedicated and expert system administrator. Your primary role is to identify anomalies and provide concise, actionable reports.
|
|
Instruction: Analyze the following system and network data for any activity that appears out of place or different. Consider unusual values, errors, or unexpected patterns as anomalies.
|
|
Context: Here is the system data in JSON format for your analysis: {structured_data_as_string}
|
|
Output Request: If you find an anomaly, provide a report as a single, coherent, natural language paragraph. The report must clearly state the anomaly, its potential cause, and its severity (e.g., high, medium, low). If no anomaly is found, respond with \"OK\".
|
|
Reasoning Hint: Think step by step to come to your conclusion. This is very important."""
|
|
|
|
try:
|
|
response = ollama.generate(
|
|
model="llama3.1:8b",
|
|
prompt=prompt
|
|
)
|
|
return response['response'].strip()
|
|
except Exception as e:
|
|
return f"Error communicating with Ollama: {e}"
|
|
|
|
# --- Alerting Functions ---
|
|
|
|
def send_discord_alert(message):
|
|
"""
|
|
Sends an alert message to a Discord webhook.
|
|
|
|
Args:
|
|
message (str): The message to send.
|
|
"""
|
|
if config.DISCORD_WEBHOOK_URL == "YOUR_DISCORD_WEBHOOK_URL_HERE":
|
|
print("Skipping Discord alert: Webhook URL not configured.")
|
|
return
|
|
|
|
webhook = DiscordWebhook(url=config.DISCORD_WEBHOOK_URL)
|
|
embed = DiscordEmbed(title="Anomaly Detected!", description=message, color='FF0000')
|
|
webhook.add_embed(embed)
|
|
try:
|
|
response = webhook.execute()
|
|
print("Discord alert sent.")
|
|
except Exception as e:
|
|
print(f"Error sending Discord alert: {e}")
|
|
|
|
def send_google_home_alert(message):
|
|
"""
|
|
Sends an alert message to a Google Home speaker via Home Assistant.
|
|
|
|
Args:
|
|
message (str): The message to be spoken.
|
|
"""
|
|
# Long or complex messages should be simplified for better Text-to-Speech delivery.
|
|
if config.HOME_ASSISTANT_URL == "http://YOUR_HOME_ASSISTANT_IP:8123":
|
|
print("Skipping Google Home alert: Home Assistant URL not configured.")
|
|
return
|
|
|
|
url = f"{config.HOME_ASSISTANT_URL}/api/services/tts/speak"
|
|
headers = {
|
|
"Authorization": f"Bearer {config.HOME_ASSISTANT_TOKEN}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
payload = {
|
|
"entity_id": "tts.google_en_com",
|
|
"media_player_entity_id": config.GOOGLE_HOME_SPEAKER_ID,
|
|
"message": message,
|
|
}
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload)
|
|
response.raise_for_status() # Raise an exception for bad status codes
|
|
print("Google Home alert sent.")
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error sending Google Home alert: {e}")
|
|
|
|
# --- Main Script Logic ---
|
|
|
|
def main():
|
|
"""
|
|
The main execution loop for the monitoring agent.
|
|
"""
|
|
while True:
|
|
print("--- Running Monitoring Cycle ---")
|
|
system_logs = get_system_logs()
|
|
network_metrics = get_network_metrics()
|
|
|
|
combined_data = {
|
|
"system_logs": system_logs,
|
|
"network_metrics": network_metrics
|
|
}
|
|
|
|
llm_response = analyze_data_with_llm(combined_data)
|
|
print(f"LLM Response: {llm_response}")
|
|
|
|
if llm_response != "OK":
|
|
print("Anomaly detected, sending alerts...")
|
|
send_discord_alert(llm_response)
|
|
send_google_home_alert(llm_response)
|
|
|
|
print("--- Cycle Complete, sleeping for 5 minutes ---")
|
|
time.sleep(300) # 300 seconds = 5 minutes
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|