- Change baseline calculations to use integers instead of floats to simplify data. - Update LLM constraints and prompt for more accurate anomaly detection. - Refine known_issues to reduce false positives. - Update PROGRESS.md with new TODO items.
61 lines
2.2 KiB
Python
61 lines
2.2 KiB
Python
import json
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
DATA_FILE = 'monitoring_data.json'
|
|
|
|
def load_data():
|
|
if os.path.exists(DATA_FILE):
|
|
with open(DATA_FILE, 'r') as f:
|
|
return json.load(f)
|
|
return []
|
|
|
|
def store_data(new_data):
|
|
data = load_data()
|
|
data.append(new_data)
|
|
with open(DATA_FILE, 'w') as f:
|
|
json.dump(data, f, indent=4)
|
|
|
|
def _calculate_average(data, key1, key2):
|
|
"""Helper function to calculate the average of a nested key in a list of dicts."""
|
|
values = [d[key1][key2] for d in data if key1 in d and key2 in d[key1] and d[key1][key2] != "N/A"]
|
|
return int(sum(values) / len(values)) if values else 0
|
|
|
|
def calculate_baselines():
|
|
data = load_data()
|
|
if not data:
|
|
return {}
|
|
|
|
# For simplicity, we'll average the last 24 hours of data
|
|
# More complex logic can be added here
|
|
recent_data = [d for d in data if 'timestamp' in d and datetime.fromisoformat(d['timestamp'].replace('Z', '')).replace(tzinfo=timezone.utc) > datetime.now(timezone.utc) - timedelta(hours=24)]
|
|
|
|
if not recent_data:
|
|
return {}
|
|
|
|
baseline_metrics = {
|
|
'avg_rtt': _calculate_average(recent_data, 'network_metrics', 'rtt_avg'),
|
|
'packet_loss': _calculate_average(recent_data, 'network_metrics', 'packet_loss_rate'),
|
|
'avg_cpu_temp': _calculate_average(recent_data, 'cpu_temperature', 'cpu_temperature'),
|
|
'avg_gpu_temp': _calculate_average(recent_data, 'gpu_temperature', 'gpu_temperature'),
|
|
}
|
|
|
|
# Baseline for open ports from nmap scans
|
|
host_ports = {}
|
|
for d in recent_data:
|
|
if 'nmap_results' in d and 'hosts' in d.get('nmap_results', {}):
|
|
for host_info in d['nmap_results']['hosts']:
|
|
host_ip = host_info['ip']
|
|
if host_ip not in host_ports:
|
|
host_ports[host_ip] = set()
|
|
|
|
for port_info in host_info.get('open_ports', []):
|
|
host_ports[host_ip].add(port_info['port'])
|
|
|
|
# Convert sets to sorted lists for JSON serialization
|
|
for host, ports in host_ports.items():
|
|
host_ports[host] = sorted(list(ports))
|
|
|
|
baseline_metrics['host_ports'] = host_ports
|
|
|
|
return baseline_metrics |