Files
LLM-Powered-Monitoring-Agent/data_storage.py
Spencer e119bc7194 feat: Update baseline calculations and LLM prompts
- Change baseline calculations to use integers instead of floats to simplify data.
- Update LLM constraints and prompt for more accurate anomaly detection.
- Refine known_issues to reduce false positives.
- Update PROGRESS.md with new TODO items.
2025-08-21 12:12:15 -05:00

61 lines
2.2 KiB
Python

import json
import os
from datetime import datetime, timedelta, timezone
DATA_FILE = 'monitoring_data.json'
def load_data():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, 'r') as f:
return json.load(f)
return []
def store_data(new_data):
data = load_data()
data.append(new_data)
with open(DATA_FILE, 'w') as f:
json.dump(data, f, indent=4)
def _calculate_average(data, key1, key2):
"""Helper function to calculate the average of a nested key in a list of dicts."""
values = [d[key1][key2] for d in data if key1 in d and key2 in d[key1] and d[key1][key2] != "N/A"]
return int(sum(values) / len(values)) if values else 0
def calculate_baselines():
data = load_data()
if not data:
return {}
# For simplicity, we'll average the last 24 hours of data
# More complex logic can be added here
recent_data = [d for d in data if 'timestamp' in d and datetime.fromisoformat(d['timestamp'].replace('Z', '')).replace(tzinfo=timezone.utc) > datetime.now(timezone.utc) - timedelta(hours=24)]
if not recent_data:
return {}
baseline_metrics = {
'avg_rtt': _calculate_average(recent_data, 'network_metrics', 'rtt_avg'),
'packet_loss': _calculate_average(recent_data, 'network_metrics', 'packet_loss_rate'),
'avg_cpu_temp': _calculate_average(recent_data, 'cpu_temperature', 'cpu_temperature'),
'avg_gpu_temp': _calculate_average(recent_data, 'gpu_temperature', 'gpu_temperature'),
}
# Baseline for open ports from nmap scans
host_ports = {}
for d in recent_data:
if 'nmap_results' in d and 'hosts' in d.get('nmap_results', {}):
for host_info in d['nmap_results']['hosts']:
host_ip = host_info['ip']
if host_ip not in host_ports:
host_ports[host_ip] = set()
for port_info in host_info.get('open_ports', []):
host_ports[host_ip].add(port_info['port'])
# Convert sets to sorted lists for JSON serialization
for host, ports in host_ports.items():
host_ports[host] = sorted(list(ports))
baseline_metrics['host_ports'] = host_ports
return baseline_metrics