import sqlite3 import json from datetime import datetime, timedelta, timezone import logging logger = logging.getLogger(__name__) DATABASE_FILE = 'monitoring.db' def initialize_database(): """Initializes the database and creates tables if they don't exist.""" try: conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() # Main table for monitoring data cursor.execute(""" CREATE TABLE IF NOT EXISTS monitoring_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL ) """) # Table for network metrics cursor.execute(""" CREATE TABLE IF NOT EXISTS network_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, monitoring_data_id INTEGER, rtt_avg REAL, packet_loss_rate REAL, FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id) ) """) # Table for temperatures cursor.execute(""" CREATE TABLE IF NOT EXISTS temperatures ( id INTEGER PRIMARY KEY AUTOINCREMENT, monitoring_data_id INTEGER, cpu_temp REAL, gpu_temp REAL, FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id) ) """) # Table for login attempts cursor.execute(""" CREATE TABLE IF NOT EXISTS login_attempts ( id INTEGER PRIMARY KEY AUTOINCREMENT, monitoring_data_id INTEGER, log_line TEXT, FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id) ) """) # Table for Nmap scans cursor.execute(""" CREATE TABLE IF NOT EXISTS nmap_scans ( id INTEGER PRIMARY KEY AUTOINCREMENT, monitoring_data_id INTEGER, scan_data TEXT, FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id) ) """) # Table for Docker status cursor.execute(""" CREATE TABLE IF NOT EXISTS docker_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, monitoring_data_id INTEGER, container_name TEXT, status TEXT, FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id) ) """) # Table for syslog cursor.execute(""" CREATE TABLE IF NOT EXISTS syslog ( id INTEGER PRIMARY KEY AUTOINCREMENT, monitoring_data_id INTEGER, log_data TEXT, FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id) ) """) # Table for ufw logs cursor.execute(""" CREATE TABLE IF NOT EXISTS ufw_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, monitoring_data_id INTEGER, log_line TEXT, FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id) ) """) conn.commit() conn.close() logger.info("Database initialized successfully.") except sqlite3.Error as e: logger.error(f"Error initializing database: {e}") def store_data(new_data): """Stores new monitoring data in the database.""" try: conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() # Insert into main table cursor.execute("INSERT INTO monitoring_data (timestamp) VALUES (?)", (new_data['timestamp'],)) monitoring_data_id = cursor.lastrowid # Insert into network_metrics if 'network_metrics' in new_data: nm = new_data['network_metrics'] cursor.execute("INSERT INTO network_metrics (monitoring_data_id, rtt_avg, packet_loss_rate) VALUES (?, ?, ?)", (monitoring_data_id, nm.get('rtt_avg'), nm.get('packet_loss_rate'))) # Insert into temperatures if 'cpu_temperature' in new_data or 'gpu_temperature' in new_data: cpu_temp = new_data.get('cpu_temperature', {}).get('cpu_temperature') gpu_temp = new_data.get('gpu_temperature', {}).get('gpu_temperature') cursor.execute("INSERT INTO temperatures (monitoring_data_id, cpu_temp, gpu_temp) VALUES (?, ?, ?)", (monitoring_data_id, cpu_temp, gpu_temp)) # Insert into login_attempts if 'login_attempts' in new_data and new_data['login_attempts'].get('failed_login_attempts'): for line in new_data['login_attempts']['failed_login_attempts']: cursor.execute("INSERT INTO login_attempts (monitoring_data_id, log_line) VALUES (?, ?)", (monitoring_data_id, line)) # Insert into nmap_scans if 'nmap_results' in new_data: cursor.execute("INSERT INTO nmap_scans (monitoring_data_id, scan_data) VALUES (?, ?)", (monitoring_data_id, json.dumps(new_data['nmap_results']))) # Insert into docker_status if 'docker_container_status' in new_data: for name, status in new_data['docker_container_status'].get('docker_container_status', {}).items(): cursor.execute("INSERT INTO docker_status (monitoring_data_id, container_name, status) VALUES (?, ?, ?)", (monitoring_data_id, name, status)) # Insert into syslog if 'system_logs' in new_data: for log in new_data['system_logs'].get('syslog', []): cursor.execute("INSERT INTO syslog (monitoring_data_id, log_data) VALUES (?, ?)", (monitoring_data_id, json.dumps(log))) # Insert into ufw_logs if 'ufw_logs' in new_data: for line in new_data['ufw_logs']: cursor.execute("INSERT INTO ufw_logs (monitoring_data_id, log_line) VALUES (?, ?)", (monitoring_data_id, line)) conn.commit() conn.close() except sqlite3.Error as e: logger.error(f"Error storing data: {e}") def calculate_baselines(): """Calculates baseline metrics from data in the last 24 hours.""" try: conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() twenty_four_hours_ago = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat() # Calculate average RTT and packet loss cursor.execute(""" SELECT AVG(nm.rtt_avg), AVG(nm.packet_loss_rate) FROM network_metrics nm JOIN monitoring_data md ON nm.monitoring_data_id = md.id WHERE md.timestamp > ? """, (twenty_four_hours_ago,)) avg_rtt, avg_packet_loss = cursor.fetchone() # Calculate average temperatures cursor.execute(""" SELECT AVG(t.cpu_temp), AVG(t.gpu_temp) FROM temperatures t JOIN monitoring_data md ON t.monitoring_data_id = md.id WHERE md.timestamp > ? """, (twenty_four_hours_ago,)) avg_cpu_temp, avg_gpu_temp = cursor.fetchone() # Get baseline open ports cursor.execute(""" SELECT ns.scan_data FROM nmap_scans ns JOIN monitoring_data md ON ns.monitoring_data_id = md.id WHERE md.timestamp > ? ORDER BY md.timestamp DESC LIMIT 1 """, (twenty_four_hours_ago,)) latest_nmap_scan = cursor.fetchone() host_ports = {} if latest_nmap_scan: scan_data = json.loads(latest_nmap_scan[0]) if 'hosts' in scan_data: for host_info in scan_data['hosts']: host_ip = host_info['ip'] if host_ip not in host_ports: host_ports[host_ip] = set() for port_info in host_info.get('open_ports', []): host_ports[host_ip].add(port_info['port']) for host, ports in host_ports.items(): host_ports[host] = sorted(list(ports)) conn.close() return { 'avg_rtt': avg_rtt or 0, 'packet_loss': avg_packet_loss or 0, 'avg_cpu_temp': avg_cpu_temp or 0, 'avg_gpu_temp': avg_gpu_temp or 0, 'host_ports': host_ports } except sqlite3.Error as e: logger.error(f"Error calculating baselines: {e}") return {} def enforce_retention_policy(retention_days=7): """Enforces the data retention policy by deleting old data.""" try: conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() retention_cutoff = (datetime.now(timezone.utc) - timedelta(days=retention_days)).isoformat() # Find old monitoring_data IDs cursor.execute("SELECT id FROM monitoring_data WHERE timestamp < ?", (retention_cutoff,)) old_ids = [row[0] for row in cursor.fetchall()] if not old_ids: logger.info("No old data to delete.") conn.close() return # Create a placeholder string for the IN clause placeholders = ','.join('?' for _ in old_ids) # Delete from child tables cursor.execute(f"DELETE FROM network_metrics WHERE monitoring_data_id IN ({placeholders})", old_ids) cursor.execute(f"DELETE FROM temperatures WHERE monitoring_data_id IN ({placeholders})", old_ids) cursor.execute(f"DELETE FROM login_attempts WHERE monitoring_data_id IN ({placeholders})", old_ids) cursor.execute(f"DELETE FROM nmap_scans WHERE monitoring_data_id IN ({placeholders})", old_ids) cursor.execute(f"DELETE FROM docker_status WHERE monitoring_data_id IN ({placeholders})", old_ids) cursor.execute(f"DELETE FROM syslog WHERE monitoring_data_id IN ({placeholders})", old_ids) cursor.execute(f"DELETE FROM ufw_logs WHERE monitoring_data_id IN ({placeholders})", old_ids) # Delete from the main table cursor.execute(f"DELETE FROM monitoring_data WHERE id IN ({placeholders})", old_ids) conn.commit() conn.close() logger.info(f"Deleted {len(old_ids)} old records.") except sqlite3.Error as e: logger.error(f"Error enforcing retention policy: {e}")