""" Data Manager Module Handles all data persistence operations for analysis results. Manages database operations and file storage for analysis data. """ import json import logging import sqlite3 import shutil from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, List, Any from .db_schema import get_database_connection, close_database_connection, init_database from .config import DATABASE_PATH, ANALYSES_DIR logger = logging.getLogger(__name__) class DataManager: """ Manages persistent storage of analysis data. Handles: - Database operations for metadata - File system storage for binary files - Analysis record lifecycle """ def __init__(self): """Initialize the data manager.""" self.db_path = DATABASE_PATH self.analyses_dir = ANALYSES_DIR # Initialize database init_database(self.db_path) logger.info(f"DataManager initialized with database: {self.db_path}") def create_analysis(self, report_id: str, device_id: str) -> Optional[int]: """ Create a new analysis record. Args: report_id: Unique report identifier (e.g., "DUR-20250115-143022") device_id: Device identifier Returns: int: Analysis ID or None if failed """ try: conn = get_database_connection(self.db_path) if not conn: logger.error("Failed to connect to database") return None cursor = conn.cursor() cursor.execute(""" INSERT INTO analyses (report_id, timestamp, device_id) VALUES (?, ?, ?) """, (report_id, datetime.now(), device_id)) analysis_id = cursor.lastrowid conn.commit() close_database_connection(conn) # Create directory structure for this analysis analysis_dir = self.analyses_dir / report_id (analysis_dir / "inputs").mkdir(parents=True, exist_ok=True) (analysis_dir / "results").mkdir(parents=True, exist_ok=True) (analysis_dir / "reports").mkdir(parents=True, exist_ok=True) logger.info(f"Created analysis record: {report_id} (ID: {analysis_id})") return analysis_id except Exception as e: logger.error(f"Error creating analysis: {e}") return None def save_input_file(self, analysis_id: int, input_type: str, original_path: str) -> bool: """ Save an input file and record it in the database. Args: analysis_id: Analysis ID input_type: Type of input ('dslr_side', 'dslr_top', 'multispectral', 'thermal', 'audio') original_path: Path to the original file Returns: bool: True if successful """ try: original_file = Path(original_path) if not original_file.exists(): logger.error(f"Input file not found: {original_path}") return False # Get analysis record to find report_id conn = get_database_connection(self.db_path) if not conn: logger.error("Failed to connect to database") return False cursor = conn.cursor() cursor.execute("SELECT report_id FROM analyses WHERE id = ?", (analysis_id,)) result = cursor.fetchone() close_database_connection(conn) if not result: logger.error(f"Analysis not found: {analysis_id}") return False report_id = result[0] # Determine destination filename # Map input_type to file extension if needed filename = original_file.name dest_dir = self.analyses_dir / report_id / "inputs" dest_path = dest_dir / filename # Copy file shutil.copy2(original_file, dest_path) file_size = dest_path.stat().st_size # Record in database conn = get_database_connection(self.db_path) if not conn: logger.error("Failed to connect to database") return False cursor = conn.cursor() relative_path = f"inputs/{filename}" cursor.execute(""" INSERT INTO analysis_inputs (analysis_id, input_type, original_path, saved_path, file_size) VALUES (?, ?, ?, ?, ?) """, (analysis_id, input_type, original_path, relative_path, file_size)) conn.commit() close_database_connection(conn) logger.info(f"Saved input file: {input_type} -> {relative_path}") return True except Exception as e: logger.error(f"Error saving input file: {e}") return False def save_result(self, analysis_id: int, model_type: str, result_dict: Dict[str, Any]) -> bool: """ Save model result to database. Args: analysis_id: Analysis ID model_type: Type of model ('defect', 'locule', 'maturity', 'shape', 'audio') result_dict: Dictionary containing result data: - predicted_class: Predicted class name - confidence: Confidence value (0-1) - probabilities: Dict of class probabilities - processing_time: Time in seconds - metadata: Additional model-specific data (optional) Returns: bool: True if successful """ try: predicted_class = result_dict.get('predicted_class', 'Unknown') confidence = result_dict.get('confidence', 0.0) probabilities = result_dict.get('probabilities', {}) processing_time = result_dict.get('processing_time', 0.0) metadata = result_dict.get('metadata', {}) # Ensure confidence is 0-1 range if confidence > 1.0: confidence = confidence / 100.0 # Convert to JSON strings probabilities_json = json.dumps(probabilities) metadata_json = json.dumps(metadata) conn = get_database_connection(self.db_path) if not conn: logger.error("Failed to connect to database") return False cursor = conn.cursor() cursor.execute(""" INSERT INTO analysis_results (analysis_id, model_type, predicted_class, confidence, probabilities, processing_time, metadata) VALUES (?, ?, ?, ?, ?, ?, ?) """, (analysis_id, model_type, predicted_class, confidence, probabilities_json, processing_time, metadata_json)) conn.commit() close_database_connection(conn) logger.info(f"Saved result: {model_type} -> {predicted_class} ({confidence*100:.1f}%)") return True except Exception as e: logger.error(f"Error saving result: {e}") return False def save_visualization(self, analysis_id: int, viz_type: str, image_data: Any, format_ext: str = 'png') -> bool: """ Save visualization image to file system. Args: analysis_id: Analysis ID viz_type: Type of visualization ('defect_annotated', 'locule_annotated', etc.) image_data: Image data (QImage, numpy array, or file path) format_ext: File extension for saved image Returns: bool: True if successful """ try: from PyQt5.QtGui import QImage, QPixmap import numpy as np # Get report_id conn = get_database_connection(self.db_path) if not conn: logger.error("Failed to connect to database") return False cursor = conn.cursor() cursor.execute("SELECT report_id FROM analyses WHERE id = ?", (analysis_id,)) result = cursor.fetchone() close_database_connection(conn) if not result: logger.error(f"Analysis not found: {analysis_id}") return False report_id = result[0] results_dir = self.analyses_dir / report_id / "results" # Generate filename filename = f"{viz_type}.{format_ext}" file_path = results_dir / filename # Save based on image_data type if isinstance(image_data, str): # It's a file path, copy it shutil.copy2(image_data, file_path) elif isinstance(image_data, QImage): # PyQt QImage pixmap = QPixmap.fromImage(image_data) pixmap.save(str(file_path)) elif isinstance(image_data, QPixmap): # PyQt QPixmap image_data.save(str(file_path)) elif isinstance(image_data, np.ndarray): # Numpy array - save as PNG using OpenCV import cv2 # Convert RGB to BGR if needed if len(image_data.shape) == 3 and image_data.shape[2] == 3: image_data = cv2.cvtColor(image_data, cv2.COLOR_RGB2BGR) cv2.imwrite(str(file_path), image_data) else: logger.error(f"Unsupported image data type: {type(image_data)}") return False # Record in database conn = get_database_connection(self.db_path) if not conn: logger.error("Failed to connect to database") return False cursor = conn.cursor() relative_path = f"results/{filename}" cursor.execute(""" INSERT INTO analysis_visualizations (analysis_id, visualization_type, file_path) VALUES (?, ?, ?) """, (analysis_id, viz_type, relative_path)) conn.commit() close_database_connection(conn) logger.info(f"Saved visualization: {viz_type} -> {relative_path}") return True except Exception as e: logger.error(f"Error saving visualization: {e}") return False def finalize_analysis(self, analysis_id: int, overall_grade: str, grade_description: str, total_time: float) -> bool: """ Finalize an analysis record with results. Args: analysis_id: Analysis ID overall_grade: Grade ('A', 'B', or 'C') grade_description: Description of the grade total_time: Total processing time in seconds Returns: bool: True if successful """ try: conn = get_database_connection(self.db_path) if not conn: logger.error("Failed to connect to database") return False cursor = conn.cursor() cursor.execute(""" UPDATE analyses SET overall_grade = ?, grade_description = ?, processing_time = ? WHERE id = ? """, (overall_grade, grade_description, total_time, analysis_id)) conn.commit() close_database_connection(conn) logger.info(f"Finalized analysis: ID={analysis_id}, Grade={overall_grade}") return True except Exception as e: logger.error(f"Error finalizing analysis: {e}") return False def get_analysis(self, report_id: str) -> Optional[Dict[str, Any]]: """ Retrieve complete analysis data. Args: report_id: Report ID to retrieve Returns: Dict with analysis data or None if not found """ try: conn = get_database_connection(self.db_path) if not conn: logger.error("Failed to connect to database") return None cursor = conn.cursor() # Get analysis record cursor.execute(""" SELECT id, report_id, timestamp, device_id, overall_grade, grade_description, processing_time, created_at FROM analyses WHERE report_id = ? """, (report_id,)) analysis_row = cursor.fetchone() if not analysis_row: logger.warning(f"Analysis not found: {report_id}") close_database_connection(conn) return None analysis_id = analysis_row[0] # Get inputs cursor.execute(""" SELECT input_type, original_path, saved_path, file_size FROM analysis_inputs WHERE analysis_id = ? ORDER BY created_at """, (analysis_id,)) inputs = [dict(zip(['input_type', 'original_path', 'saved_path', 'file_size'], row)) for row in cursor.fetchall()] # Get results cursor.execute(""" SELECT model_type, predicted_class, confidence, probabilities, processing_time, metadata FROM analysis_results WHERE analysis_id = ? ORDER BY created_at """, (analysis_id,)) results = [] for row in cursor.fetchall(): result_dict = { 'model_type': row[0], 'predicted_class': row[1], 'confidence': row[2], 'probabilities': json.loads(row[3] or '{}'), 'processing_time': row[4], 'metadata': json.loads(row[5] or '{}'), } results.append(result_dict) # Get visualizations cursor.execute(""" SELECT visualization_type, file_path FROM analysis_visualizations WHERE analysis_id = ? ORDER BY created_at """, (analysis_id,)) visualizations = [dict(zip(['visualization_type', 'file_path'], row)) for row in cursor.fetchall()] close_database_connection(conn) return { 'id': analysis_id, 'report_id': analysis_row[1], 'timestamp': analysis_row[2], 'device_id': analysis_row[3], 'overall_grade': analysis_row[4], 'grade_description': analysis_row[5], 'processing_time': analysis_row[6], 'created_at': analysis_row[7], 'inputs': inputs, 'results': results, 'visualizations': visualizations, } except Exception as e: logger.error(f"Error retrieving analysis: {e}") return None def list_recent_analyses(self, limit: int = 50) -> List[Dict[str, Any]]: """ Get list of recent analyses. Args: limit: Maximum number of analyses to return Returns: List of analysis records with created_at formatted in local timezone """ try: conn = get_database_connection(self.db_path) if not conn: logger.error("Failed to connect to database") return [] cursor = conn.cursor() cursor.execute(""" SELECT id, report_id, timestamp, device_id, overall_grade, processing_time, created_at FROM analyses ORDER BY created_at DESC LIMIT ? """, (limit,)) analyses = [] for row in cursor.fetchall(): # Convert created_at from UTC to local timezone created_at = row[6] if created_at: try: # Parse the UTC datetime string from database utc_datetime = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S") # Mark it as UTC utc_datetime = utc_datetime.replace(tzinfo=timezone.utc) # Convert to local timezone local_datetime = utc_datetime.astimezone() # Format for display created_at = local_datetime.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: # If parsing fails, keep original value logger.debug(f"Failed to parse datetime: {e}") pass analyses.append({ 'id': row[0], 'report_id': row[1], 'timestamp': row[2], 'device_id': row[3], 'overall_grade': row[4], 'processing_time': row[5], 'created_at': created_at, }) close_database_connection(conn) return analyses except Exception as e: logger.error(f"Error listing analyses: {e}") return [] def get_analysis_file_path(self, report_id: str, relative_path: str) -> Optional[Path]: """ Get full path to a file stored in analysis folder. Args: report_id: Report ID relative_path: Relative path (e.g., 'inputs/image.jpg' or 'results/defect_annotated.png') Returns: Full file path or None if not found """ file_path = self.analyses_dir / report_id / relative_path if file_path.exists(): return file_path return None def export_analysis_to_dict(self, report_id: str) -> Optional[Dict[str, Any]]: """ Export analysis data for reports display. Converts file paths to full paths for use in UI. Args: report_id: Report ID to export Returns: Analysis data with full file paths or None if not found """ analysis = self.get_analysis(report_id) if not analysis: return None # Convert relative paths to full paths for input_item in analysis['inputs']: if input_item['saved_path']: full_path = self.get_analysis_file_path(report_id, input_item['saved_path']) input_item['full_path'] = str(full_path) if full_path else None for viz_item in analysis['visualizations']: if viz_item['file_path']: full_path = self.get_analysis_file_path(report_id, viz_item['file_path']) viz_item['full_path'] = str(full_path) if full_path else None return analysis def get_daily_analysis_count(self) -> int: """ Get count of analyses created today. Returns: int: Number of analyses created today (UTC date) """ try: conn = get_database_connection(self.db_path) if not conn: logger.error("Failed to connect to database") return 0 cursor = conn.cursor() # Get today's date in UTC today = datetime.now(timezone.utc).date() # Query analyses created today cursor.execute(""" SELECT COUNT(*) FROM analyses WHERE DATE(created_at) = ? """, (str(today),)) result = cursor.fetchone() close_database_connection(conn) count = result[0] if result else 0 logger.info(f"Daily analysis count: {count}") return count except Exception as e: logger.error(f"Error getting daily analysis count: {e}") return 0 def get_average_processing_time(self, limit: int = 100) -> float: """ Get average processing time from recent analyses. Args: limit: Maximum number of recent analyses to consider Returns: float: Average processing time in seconds (0.0 if no data) """ try: conn = get_database_connection(self.db_path) if not conn: logger.error("Failed to connect to database") return 0.0 cursor = conn.cursor() # Query average processing time from recent analyses cursor.execute(""" SELECT AVG(processing_time) FROM ( SELECT processing_time FROM analyses WHERE processing_time IS NOT NULL AND processing_time > 0 ORDER BY created_at DESC LIMIT ? ) """, (limit,)) result = cursor.fetchone() close_database_connection(conn) avg_time = float(result[0]) if result and result[0] else 0.0 logger.info(f"Average processing time: {avg_time:.2f}s") return avg_time except Exception as e: logger.error(f"Error getting average processing time: {e}") return 0.0 def get_model_accuracy_stats(self, limit: int = 100) -> Dict[str, float]: """ Get average confidence scores per model type from recent results. This calculates the average confidence (as a proxy for accuracy) for each model from the most recent analyses. Args: limit: Maximum number of recent analyses to consider Returns: Dict mapping model type to average confidence percentage (0-100) Example: {'audio': 94.2, 'defect': 87.5, 'locule': 91.8, 'maturity': 88.3, 'shape': 89.1} """ try: conn = get_database_connection(self.db_path) if not conn: logger.error("Failed to connect to database") return {} cursor = conn.cursor() # Get most recent analysis IDs cursor.execute(""" SELECT id FROM analyses ORDER BY created_at DESC LIMIT ? """, (limit,)) recent_analysis_ids = [row[0] for row in cursor.fetchall()] if not recent_analysis_ids: close_database_connection(conn) logger.info("No analyses found for accuracy calculation") return {} # Format IDs for SQL IN clause ids_placeholder = ','.join('?' * len(recent_analysis_ids)) # Query average confidence per model type cursor.execute(f""" SELECT model_type, AVG(confidence * 100) FROM analysis_results WHERE analysis_id IN ({ids_placeholder}) GROUP BY model_type """, recent_analysis_ids) results = cursor.fetchall() close_database_connection(conn) # Build result dictionary accuracy_stats = {} for model_type, avg_confidence in results: if model_type and avg_confidence is not None: accuracy_stats[model_type] = round(avg_confidence, 1) logger.info(f"Model accuracy stats: {accuracy_stats}") return accuracy_stats except Exception as e: logger.error(f"Error getting model accuracy stats: {e}") return {}