| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661 |
- """
- Data Manager Module
- Handles all data persistence operations for analysis results.
- Manages database operations and file storage for analysis data.
- """
- import json
- import logging
- import sqlite3
- import shutil
- from datetime import datetime, timezone
- from pathlib import Path
- from typing import Optional, Dict, List, Any
- from .db_schema import get_database_connection, close_database_connection, init_database
- from .config import DATABASE_PATH, ANALYSES_DIR
- logger = logging.getLogger(__name__)
- class DataManager:
- """
- Manages persistent storage of analysis data.
-
- Handles:
- - Database operations for metadata
- - File system storage for binary files
- - Analysis record lifecycle
- """
-
- def __init__(self):
- """Initialize the data manager."""
- self.db_path = DATABASE_PATH
- self.analyses_dir = ANALYSES_DIR
-
- # Initialize database
- init_database(self.db_path)
-
- logger.info(f"DataManager initialized with database: {self.db_path}")
-
- def create_analysis(self, report_id: str, device_id: str) -> Optional[int]:
- """
- Create a new analysis record.
-
- Args:
- report_id: Unique report identifier (e.g., "DUR-20250115-143022")
- device_id: Device identifier
-
- Returns:
- int: Analysis ID or None if failed
- """
- try:
- conn = get_database_connection(self.db_path)
- if not conn:
- logger.error("Failed to connect to database")
- return None
-
- cursor = conn.cursor()
- cursor.execute("""
- INSERT INTO analyses (report_id, timestamp, device_id)
- VALUES (?, ?, ?)
- """, (report_id, datetime.now(), device_id))
-
- analysis_id = cursor.lastrowid
- conn.commit()
- close_database_connection(conn)
-
- # Create directory structure for this analysis
- analysis_dir = self.analyses_dir / report_id
- (analysis_dir / "inputs").mkdir(parents=True, exist_ok=True)
- (analysis_dir / "results").mkdir(parents=True, exist_ok=True)
- (analysis_dir / "reports").mkdir(parents=True, exist_ok=True)
-
- logger.info(f"Created analysis record: {report_id} (ID: {analysis_id})")
- return analysis_id
-
- except Exception as e:
- logger.error(f"Error creating analysis: {e}")
- return None
-
- def save_input_file(self, analysis_id: int, input_type: str, original_path: str) -> bool:
- """
- Save an input file and record it in the database.
-
- Args:
- analysis_id: Analysis ID
- input_type: Type of input ('dslr_side', 'dslr_top', 'multispectral', 'thermal', 'audio')
- original_path: Path to the original file
-
- Returns:
- bool: True if successful
- """
- try:
- original_file = Path(original_path)
- if not original_file.exists():
- logger.error(f"Input file not found: {original_path}")
- return False
-
- # Get analysis record to find report_id
- conn = get_database_connection(self.db_path)
- if not conn:
- logger.error("Failed to connect to database")
- return False
-
- cursor = conn.cursor()
- cursor.execute("SELECT report_id FROM analyses WHERE id = ?", (analysis_id,))
- result = cursor.fetchone()
- close_database_connection(conn)
-
- if not result:
- logger.error(f"Analysis not found: {analysis_id}")
- return False
-
- report_id = result[0]
-
- # Determine destination filename
- # Map input_type to file extension if needed
- filename = original_file.name
- dest_dir = self.analyses_dir / report_id / "inputs"
- dest_path = dest_dir / filename
-
- # Copy file
- shutil.copy2(original_file, dest_path)
- file_size = dest_path.stat().st_size
-
- # Record in database
- conn = get_database_connection(self.db_path)
- if not conn:
- logger.error("Failed to connect to database")
- return False
-
- cursor = conn.cursor()
- relative_path = f"inputs/{filename}"
- cursor.execute("""
- INSERT INTO analysis_inputs (analysis_id, input_type, original_path, saved_path, file_size)
- VALUES (?, ?, ?, ?, ?)
- """, (analysis_id, input_type, original_path, relative_path, file_size))
-
- conn.commit()
- close_database_connection(conn)
-
- logger.info(f"Saved input file: {input_type} -> {relative_path}")
- return True
-
- except Exception as e:
- logger.error(f"Error saving input file: {e}")
- return False
-
- def save_result(self, analysis_id: int, model_type: str, result_dict: Dict[str, Any]) -> bool:
- """
- Save model result to database.
-
- Args:
- analysis_id: Analysis ID
- model_type: Type of model ('defect', 'locule', 'maturity', 'shape', 'audio')
- result_dict: Dictionary containing result data:
- - predicted_class: Predicted class name
- - confidence: Confidence value (0-1)
- - probabilities: Dict of class probabilities
- - processing_time: Time in seconds
- - metadata: Additional model-specific data (optional)
-
- Returns:
- bool: True if successful
- """
- try:
- predicted_class = result_dict.get('predicted_class', 'Unknown')
- confidence = result_dict.get('confidence', 0.0)
- probabilities = result_dict.get('probabilities', {})
- processing_time = result_dict.get('processing_time', 0.0)
- metadata = result_dict.get('metadata', {})
-
- # Ensure confidence is 0-1 range
- if confidence > 1.0:
- confidence = confidence / 100.0
-
- # Convert to JSON strings
- probabilities_json = json.dumps(probabilities)
- metadata_json = json.dumps(metadata)
-
- conn = get_database_connection(self.db_path)
- if not conn:
- logger.error("Failed to connect to database")
- return False
-
- cursor = conn.cursor()
- cursor.execute("""
- INSERT INTO analysis_results
- (analysis_id, model_type, predicted_class, confidence, probabilities, processing_time, metadata)
- VALUES (?, ?, ?, ?, ?, ?, ?)
- """, (analysis_id, model_type, predicted_class, confidence, probabilities_json, processing_time, metadata_json))
-
- conn.commit()
- close_database_connection(conn)
-
- logger.info(f"Saved result: {model_type} -> {predicted_class} ({confidence*100:.1f}%)")
- return True
-
- except Exception as e:
- logger.error(f"Error saving result: {e}")
- return False
-
- def save_visualization(self, analysis_id: int, viz_type: str, image_data: Any, format_ext: str = 'png') -> bool:
- """
- Save visualization image to file system.
-
- Args:
- analysis_id: Analysis ID
- viz_type: Type of visualization ('defect_annotated', 'locule_annotated', etc.)
- image_data: Image data (QImage, numpy array, or file path)
- format_ext: File extension for saved image
-
- Returns:
- bool: True if successful
- """
- try:
- from PyQt5.QtGui import QImage, QPixmap
- import numpy as np
-
- # Get report_id
- conn = get_database_connection(self.db_path)
- if not conn:
- logger.error("Failed to connect to database")
- return False
-
- cursor = conn.cursor()
- cursor.execute("SELECT report_id FROM analyses WHERE id = ?", (analysis_id,))
- result = cursor.fetchone()
- close_database_connection(conn)
-
- if not result:
- logger.error(f"Analysis not found: {analysis_id}")
- return False
-
- report_id = result[0]
- results_dir = self.analyses_dir / report_id / "results"
-
- # Generate filename
- filename = f"{viz_type}.{format_ext}"
- file_path = results_dir / filename
-
- # Save based on image_data type
- if isinstance(image_data, str):
- # It's a file path, copy it
- shutil.copy2(image_data, file_path)
- elif isinstance(image_data, QImage):
- # PyQt QImage
- pixmap = QPixmap.fromImage(image_data)
- pixmap.save(str(file_path))
- elif isinstance(image_data, QPixmap):
- # PyQt QPixmap
- image_data.save(str(file_path))
- elif isinstance(image_data, np.ndarray):
- # Numpy array - save as PNG using OpenCV
- import cv2
- # Convert RGB to BGR if needed
- if len(image_data.shape) == 3 and image_data.shape[2] == 3:
- image_data = cv2.cvtColor(image_data, cv2.COLOR_RGB2BGR)
- cv2.imwrite(str(file_path), image_data)
- else:
- logger.error(f"Unsupported image data type: {type(image_data)}")
- return False
-
- # Record in database
- conn = get_database_connection(self.db_path)
- if not conn:
- logger.error("Failed to connect to database")
- return False
-
- cursor = conn.cursor()
- relative_path = f"results/{filename}"
- cursor.execute("""
- INSERT INTO analysis_visualizations (analysis_id, visualization_type, file_path)
- VALUES (?, ?, ?)
- """, (analysis_id, viz_type, relative_path))
-
- conn.commit()
- close_database_connection(conn)
-
- logger.info(f"Saved visualization: {viz_type} -> {relative_path}")
- return True
-
- except Exception as e:
- logger.error(f"Error saving visualization: {e}")
- return False
-
- def finalize_analysis(self, analysis_id: int, overall_grade: str, grade_description: str, total_time: float) -> bool:
- """
- Finalize an analysis record with results.
-
- Args:
- analysis_id: Analysis ID
- overall_grade: Grade ('A', 'B', or 'C')
- grade_description: Description of the grade
- total_time: Total processing time in seconds
-
- Returns:
- bool: True if successful
- """
- try:
- conn = get_database_connection(self.db_path)
- if not conn:
- logger.error("Failed to connect to database")
- return False
-
- cursor = conn.cursor()
- cursor.execute("""
- UPDATE analyses
- SET overall_grade = ?, grade_description = ?, processing_time = ?
- WHERE id = ?
- """, (overall_grade, grade_description, total_time, analysis_id))
-
- conn.commit()
- close_database_connection(conn)
-
- logger.info(f"Finalized analysis: ID={analysis_id}, Grade={overall_grade}")
- return True
-
- except Exception as e:
- logger.error(f"Error finalizing analysis: {e}")
- return False
-
- def get_analysis(self, report_id: str) -> Optional[Dict[str, Any]]:
- """
- Retrieve complete analysis data.
-
- Args:
- report_id: Report ID to retrieve
-
- Returns:
- Dict with analysis data or None if not found
- """
- try:
- conn = get_database_connection(self.db_path)
- if not conn:
- logger.error("Failed to connect to database")
- return None
-
- cursor = conn.cursor()
-
- # Get analysis record
- cursor.execute("""
- SELECT id, report_id, timestamp, device_id, overall_grade, grade_description, processing_time, created_at
- FROM analyses
- WHERE report_id = ?
- """, (report_id,))
-
- analysis_row = cursor.fetchone()
- if not analysis_row:
- logger.warning(f"Analysis not found: {report_id}")
- close_database_connection(conn)
- return None
-
- analysis_id = analysis_row[0]
-
- # Get inputs
- cursor.execute("""
- SELECT input_type, original_path, saved_path, file_size
- FROM analysis_inputs
- WHERE analysis_id = ?
- ORDER BY created_at
- """, (analysis_id,))
-
- inputs = [dict(zip(['input_type', 'original_path', 'saved_path', 'file_size'], row))
- for row in cursor.fetchall()]
-
- # Get results
- cursor.execute("""
- SELECT model_type, predicted_class, confidence, probabilities, processing_time, metadata
- FROM analysis_results
- WHERE analysis_id = ?
- ORDER BY created_at
- """, (analysis_id,))
-
- results = []
- for row in cursor.fetchall():
- result_dict = {
- 'model_type': row[0],
- 'predicted_class': row[1],
- 'confidence': row[2],
- 'probabilities': json.loads(row[3] or '{}'),
- 'processing_time': row[4],
- 'metadata': json.loads(row[5] or '{}'),
- }
- results.append(result_dict)
-
- # Get visualizations
- cursor.execute("""
- SELECT visualization_type, file_path
- FROM analysis_visualizations
- WHERE analysis_id = ?
- ORDER BY created_at
- """, (analysis_id,))
-
- visualizations = [dict(zip(['visualization_type', 'file_path'], row))
- for row in cursor.fetchall()]
-
- close_database_connection(conn)
-
- return {
- 'id': analysis_id,
- 'report_id': analysis_row[1],
- 'timestamp': analysis_row[2],
- 'device_id': analysis_row[3],
- 'overall_grade': analysis_row[4],
- 'grade_description': analysis_row[5],
- 'processing_time': analysis_row[6],
- 'created_at': analysis_row[7],
- 'inputs': inputs,
- 'results': results,
- 'visualizations': visualizations,
- }
-
- except Exception as e:
- logger.error(f"Error retrieving analysis: {e}")
- return None
-
- def list_recent_analyses(self, limit: int = 50) -> List[Dict[str, Any]]:
- """
- Get list of recent analyses.
-
- Args:
- limit: Maximum number of analyses to return
-
- Returns:
- List of analysis records with created_at formatted in local timezone
- """
- try:
- conn = get_database_connection(self.db_path)
- if not conn:
- logger.error("Failed to connect to database")
- return []
-
- cursor = conn.cursor()
- cursor.execute("""
- SELECT id, report_id, timestamp, device_id, overall_grade, processing_time, created_at
- FROM analyses
- ORDER BY created_at DESC
- LIMIT ?
- """, (limit,))
-
- analyses = []
- for row in cursor.fetchall():
- # Convert created_at from UTC to local timezone
- created_at = row[6]
- if created_at:
- try:
- # Parse the UTC datetime string from database
- utc_datetime = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S")
- # Mark it as UTC
- utc_datetime = utc_datetime.replace(tzinfo=timezone.utc)
- # Convert to local timezone
- local_datetime = utc_datetime.astimezone()
- # Format for display
- created_at = local_datetime.strftime("%Y-%m-%d %H:%M:%S")
- except Exception as e:
- # If parsing fails, keep original value
- logger.debug(f"Failed to parse datetime: {e}")
- pass
-
- analyses.append({
- 'id': row[0],
- 'report_id': row[1],
- 'timestamp': row[2],
- 'device_id': row[3],
- 'overall_grade': row[4],
- 'processing_time': row[5],
- 'created_at': created_at,
- })
-
- close_database_connection(conn)
- return analyses
-
- except Exception as e:
- logger.error(f"Error listing analyses: {e}")
- return []
-
- def get_analysis_file_path(self, report_id: str, relative_path: str) -> Optional[Path]:
- """
- Get full path to a file stored in analysis folder.
-
- Args:
- report_id: Report ID
- relative_path: Relative path (e.g., 'inputs/image.jpg' or 'results/defect_annotated.png')
-
- Returns:
- Full file path or None if not found
- """
- file_path = self.analyses_dir / report_id / relative_path
- if file_path.exists():
- return file_path
- return None
-
- def export_analysis_to_dict(self, report_id: str) -> Optional[Dict[str, Any]]:
- """
- Export analysis data for reports display.
- Converts file paths to full paths for use in UI.
-
- Args:
- report_id: Report ID to export
-
- Returns:
- Analysis data with full file paths or None if not found
- """
- analysis = self.get_analysis(report_id)
- if not analysis:
- return None
-
- # Convert relative paths to full paths
- for input_item in analysis['inputs']:
- if input_item['saved_path']:
- full_path = self.get_analysis_file_path(report_id, input_item['saved_path'])
- input_item['full_path'] = str(full_path) if full_path else None
-
- for viz_item in analysis['visualizations']:
- if viz_item['file_path']:
- full_path = self.get_analysis_file_path(report_id, viz_item['file_path'])
- viz_item['full_path'] = str(full_path) if full_path else None
-
- return analysis
-
- def get_daily_analysis_count(self) -> int:
- """
- Get count of analyses created today.
-
- Returns:
- int: Number of analyses created today (UTC date)
- """
- try:
- conn = get_database_connection(self.db_path)
- if not conn:
- logger.error("Failed to connect to database")
- return 0
-
- cursor = conn.cursor()
-
- # Get today's date in UTC
- today = datetime.now(timezone.utc).date()
-
- # Query analyses created today
- cursor.execute("""
- SELECT COUNT(*)
- FROM analyses
- WHERE DATE(created_at) = ?
- """, (str(today),))
-
- result = cursor.fetchone()
- close_database_connection(conn)
-
- count = result[0] if result else 0
- logger.info(f"Daily analysis count: {count}")
- return count
-
- except Exception as e:
- logger.error(f"Error getting daily analysis count: {e}")
- return 0
-
- def get_average_processing_time(self, limit: int = 100) -> float:
- """
- Get average processing time from recent analyses.
-
- Args:
- limit: Maximum number of recent analyses to consider
-
- Returns:
- float: Average processing time in seconds (0.0 if no data)
- """
- try:
- conn = get_database_connection(self.db_path)
- if not conn:
- logger.error("Failed to connect to database")
- return 0.0
-
- cursor = conn.cursor()
-
- # Query average processing time from recent analyses
- cursor.execute("""
- SELECT AVG(processing_time)
- FROM (
- SELECT processing_time
- FROM analyses
- WHERE processing_time IS NOT NULL AND processing_time > 0
- ORDER BY created_at DESC
- LIMIT ?
- )
- """, (limit,))
-
- result = cursor.fetchone()
- close_database_connection(conn)
-
- avg_time = float(result[0]) if result and result[0] else 0.0
- logger.info(f"Average processing time: {avg_time:.2f}s")
- return avg_time
-
- except Exception as e:
- logger.error(f"Error getting average processing time: {e}")
- return 0.0
-
- def get_model_accuracy_stats(self, limit: int = 100) -> Dict[str, float]:
- """
- Get average confidence scores per model type from recent results.
-
- This calculates the average confidence (as a proxy for accuracy) for each model
- from the most recent analyses.
-
- Args:
- limit: Maximum number of recent analyses to consider
-
- Returns:
- Dict mapping model type to average confidence percentage (0-100)
- Example: {'audio': 94.2, 'defect': 87.5, 'locule': 91.8, 'maturity': 88.3, 'shape': 89.1}
- """
- try:
- conn = get_database_connection(self.db_path)
- if not conn:
- logger.error("Failed to connect to database")
- return {}
-
- cursor = conn.cursor()
-
- # Get most recent analysis IDs
- cursor.execute("""
- SELECT id FROM analyses
- ORDER BY created_at DESC
- LIMIT ?
- """, (limit,))
-
- recent_analysis_ids = [row[0] for row in cursor.fetchall()]
-
- if not recent_analysis_ids:
- close_database_connection(conn)
- logger.info("No analyses found for accuracy calculation")
- return {}
-
- # Format IDs for SQL IN clause
- ids_placeholder = ','.join('?' * len(recent_analysis_ids))
-
- # Query average confidence per model type
- cursor.execute(f"""
- SELECT model_type, AVG(confidence * 100)
- FROM analysis_results
- WHERE analysis_id IN ({ids_placeholder})
- GROUP BY model_type
- """, recent_analysis_ids)
-
- results = cursor.fetchall()
- close_database_connection(conn)
-
- # Build result dictionary
- accuracy_stats = {}
- for model_type, avg_confidence in results:
- if model_type and avg_confidence is not None:
- accuracy_stats[model_type] = round(avg_confidence, 1)
-
- logger.info(f"Model accuracy stats: {accuracy_stats}")
- return accuracy_stats
-
- except Exception as e:
- logger.error(f"Error getting model accuracy stats: {e}")
- return {}
|