feat: Implement data retention policy

- Replaced `data_storage.py` with `database.py` to use SQLite instead of a JSON file for data storage.
- Added an `enforce_retention_policy` function to `database.py` to delete data older than 7 days.
- Called this function in the main monitoring loop in `monitor_agent.py`.
- Added Docker container monitoring.
- Updated `.gitignore` to ignore `monitoring.db`.
This commit is contained in:
2025-09-15 13:12:05 -05:00
parent 0b64f2ed03
commit 07c768a4cf
16 changed files with 1750356 additions and 74 deletions

245
database.py Executable file
View File

@@ -0,0 +1,245 @@
import sqlite3
import json
from datetime import datetime, timedelta, timezone
import logging
logger = logging.getLogger(__name__)
DATABASE_FILE = 'monitoring.db'
def initialize_database():
"""Initializes the database and creates tables if they don't exist."""
try:
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
# Main table for monitoring data
cursor.execute("""
CREATE TABLE IF NOT EXISTS monitoring_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL
)
""")
# Table for network metrics
cursor.execute("""
CREATE TABLE IF NOT EXISTS network_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
monitoring_data_id INTEGER,
rtt_avg REAL,
packet_loss_rate REAL,
FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id)
)
""")
# Table for temperatures
cursor.execute("""
CREATE TABLE IF NOT EXISTS temperatures (
id INTEGER PRIMARY KEY AUTOINCREMENT,
monitoring_data_id INTEGER,
cpu_temp REAL,
gpu_temp REAL,
FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id)
)
""")
# Table for login attempts
cursor.execute("""
CREATE TABLE IF NOT EXISTS login_attempts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
monitoring_data_id INTEGER,
log_line TEXT,
FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id)
)
""")
# Table for Nmap scans
cursor.execute("""
CREATE TABLE IF NOT EXISTS nmap_scans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
monitoring_data_id INTEGER,
scan_data TEXT,
FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id)
)
""")
# Table for Docker status
cursor.execute("""
CREATE TABLE IF NOT EXISTS docker_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
monitoring_data_id INTEGER,
container_name TEXT,
status TEXT,
FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id)
)
""")
# Table for syslog
cursor.execute("""
CREATE TABLE IF NOT EXISTS syslog (
id INTEGER PRIMARY KEY AUTOINCREMENT,
monitoring_data_id INTEGER,
log_data TEXT,
FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id)
)
""")
conn.commit()
conn.close()
logger.info("Database initialized successfully.")
except sqlite3.Error as e:
logger.error(f"Error initializing database: {e}")
def store_data(new_data):
"""Stores new monitoring data in the database."""
try:
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
# Insert into main table
cursor.execute("INSERT INTO monitoring_data (timestamp) VALUES (?)", (new_data['timestamp'],))
monitoring_data_id = cursor.lastrowid
# Insert into network_metrics
if 'network_metrics' in new_data:
nm = new_data['network_metrics']
cursor.execute("INSERT INTO network_metrics (monitoring_data_id, rtt_avg, packet_loss_rate) VALUES (?, ?, ?)",
(monitoring_data_id, nm.get('rtt_avg'), nm.get('packet_loss_rate')))
# Insert into temperatures
if 'cpu_temperature' in new_data or 'gpu_temperature' in new_data:
cpu_temp = new_data.get('cpu_temperature', {}).get('cpu_temperature')
gpu_temp = new_data.get('gpu_temperature', {}).get('gpu_temperature')
cursor.execute("INSERT INTO temperatures (monitoring_data_id, cpu_temp, gpu_temp) VALUES (?, ?, ?)",
(monitoring_data_id, cpu_temp, gpu_temp))
# Insert into login_attempts
if 'login_attempts' in new_data and new_data['login_attempts'].get('failed_login_attempts'):
for line in new_data['login_attempts']['failed_login_attempts']:
cursor.execute("INSERT INTO login_attempts (monitoring_data_id, log_line) VALUES (?, ?)",
(monitoring_data_id, line))
# Insert into nmap_scans
if 'nmap_results' in new_data:
cursor.execute("INSERT INTO nmap_scans (monitoring_data_id, scan_data) VALUES (?, ?)",
(monitoring_data_id, json.dumps(new_data['nmap_results'])))
# Insert into docker_status
if 'docker_container_status' in new_data:
for name, status in new_data['docker_container_status'].get('docker_container_status', {}).items():
cursor.execute("INSERT INTO docker_status (monitoring_data_id, container_name, status) VALUES (?, ?, ?)",
(monitoring_data_id, name, status))
# Insert into syslog
if 'system_logs' in new_data:
for log in new_data['system_logs'].get('syslog', []):
cursor.execute("INSERT INTO syslog (monitoring_data_id, log_data) VALUES (?, ?)",
(monitoring_data_id, json.dumps(log)))
conn.commit()
conn.close()
except sqlite3.Error as e:
logger.error(f"Error storing data: {e}")
def calculate_baselines():
"""Calculates baseline metrics from data in the last 24 hours."""
try:
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
twenty_four_hours_ago = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat()
# Calculate average RTT and packet loss
cursor.execute("""
SELECT AVG(nm.rtt_avg), AVG(nm.packet_loss_rate)
FROM network_metrics nm
JOIN monitoring_data md ON nm.monitoring_data_id = md.id
WHERE md.timestamp > ?
""", (twenty_four_hours_ago,))
avg_rtt, avg_packet_loss = cursor.fetchone()
# Calculate average temperatures
cursor.execute("""
SELECT AVG(t.cpu_temp), AVG(t.gpu_temp)
FROM temperatures t
JOIN monitoring_data md ON t.monitoring_data_id = md.id
WHERE md.timestamp > ?
""", (twenty_four_hours_ago,))
avg_cpu_temp, avg_gpu_temp = cursor.fetchone()
# Get baseline open ports
cursor.execute("""
SELECT ns.scan_data
FROM nmap_scans ns
JOIN monitoring_data md ON ns.monitoring_data_id = md.id
WHERE md.timestamp > ?
ORDER BY md.timestamp DESC
LIMIT 1
""", (twenty_four_hours_ago,))
latest_nmap_scan = cursor.fetchone()
host_ports = {}
if latest_nmap_scan:
scan_data = json.loads(latest_nmap_scan[0])
if 'hosts' in scan_data:
for host_info in scan_data['hosts']:
host_ip = host_info['ip']
if host_ip not in host_ports:
host_ports[host_ip] = set()
for port_info in host_info.get('open_ports', []):
host_ports[host_ip].add(port_info['port'])
for host, ports in host_ports.items():
host_ports[host] = sorted(list(ports))
conn.close()
return {
'avg_rtt': avg_rtt or 0,
'packet_loss': avg_packet_loss or 0,
'avg_cpu_temp': avg_cpu_temp or 0,
'avg_gpu_temp': avg_gpu_temp or 0,
'host_ports': host_ports
}
except sqlite3.Error as e:
logger.error(f"Error calculating baselines: {e}")
return {}
def enforce_retention_policy(retention_days=7):
"""Enforces the data retention policy by deleting old data."""
try:
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
retention_cutoff = (datetime.now(timezone.utc) - timedelta(days=retention_days)).isoformat()
# Find old monitoring_data IDs
cursor.execute("SELECT id FROM monitoring_data WHERE timestamp < ?", (retention_cutoff,))
old_ids = [row[0] for row in cursor.fetchall()]
if not old_ids:
logger.info("No old data to delete.")
conn.close()
return
# Create a placeholder string for the IN clause
placeholders = ','.join('?' for _ in old_ids)
# Delete from child tables
cursor.execute(f"DELETE FROM network_metrics WHERE monitoring_data_id IN ({placeholders})", old_ids)
cursor.execute(f"DELETE FROM temperatures WHERE monitoring_data_id IN ({placeholders})", old_ids)
cursor.execute(f"DELETE FROM login_attempts WHERE monitoring_data_id IN ({placeholders})", old_ids)
cursor.execute(f"DELETE FROM nmap_scans WHERE monitoring_data_id IN ({placeholders})", old_ids)
cursor.execute(f"DELETE FROM docker_status WHERE monitoring_data_id IN ({placeholders})", old_ids)
cursor.execute(f"DELETE FROM syslog WHERE monitoring_data_id IN ({placeholders})", old_ids)
# Delete from the main table
cursor.execute(f"DELETE FROM monitoring_data WHERE id IN ({placeholders})", old_ids)
conn.commit()
conn.close()
logger.info(f"Deleted {len(old_ids)} old records.")
except sqlite3.Error as e:
logger.error(f"Error enforcing retention policy: {e}")