""" Main Window Module The main application window that integrates all components: - Dashboard with all panels - Tab navigation - Menu bar - Worker thread management - File dialogs and processing """ import sys from datetime import datetime from pathlib import Path from typing import Optional, Dict from PyQt5.QtWidgets import (QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QTabWidget, QFrame, QLabel, QFileDialog, QMessageBox, QPushButton, QStyle) from PyQt5.QtCore import Qt, QThreadPool, QTimer, QSize from PyQt5.QtGui import QFont, QPixmap, QIcon from ui.panels import (SystemStatusPanel, QuickActionsPanel, RecentResultsPanel, SystemInfoPanel, LiveFeedPanel) from ui.tabs import RipenessTab, QualityTab, MaturityTab, ParametersTab, ReportsTab from ui.dialogs import AboutDialog, HelpDialog, ManualInputDialog, CameraAppCheckDialog from models import AudioModel, DefectModel, LoculeModel, MaturityModel, ShapeModel from workers import AudioWorker, DefectWorker, LoculeWorker, MaturityWorker, ShapeWorker from utils.config import (WINDOW_TITLE, WINDOW_WIDTH, WINDOW_HEIGHT, DEVICE_ID, get_device, DEFAULT_DIRS, FILE_FILTERS, PROJECT_ROOT) from utils.data_manager import DataManager from utils.process_utils import get_missing_camera_apps, get_running_camera_apps from utils.camera_automation import SecondLookAutomation, EOSUtilityAutomation, AnalyzIRAutomation, CameraAutomationError from resources.styles import MAIN_WINDOW_STYLE, TAB_WIDGET_STYLE, HEADER_ICON_BUTTON_STYLE class DuDONGMainWindow(QMainWindow): """ Main application window for DuDONG Grading System. Integrates all UI components, manages worker threads, and handles user interactions for ripeness and quality classification. Attributes: thread_pool: QThreadPool for managing worker threads models: Dict of loaded AI models status_panels: Dict of UI panel references """ def __init__(self): """Initialize the main window.""" super().__init__() self.setWindowTitle(WINDOW_TITLE) self.setGeometry(100, 100, WINDOW_WIDTH, WINDOW_HEIGHT) # Initialize thread pool self.thread_pool = QThreadPool() print(f"Thread pool initialized with {self.thread_pool.maxThreadCount()} threads") # Initialize data manager for persistence self.data_manager = DataManager() print("Data manager initialized") # Initialize models (will be loaded on demand) self.models = { 'audio': None, 'defect': None, 'locule': None, 'maturity': None, 'shape': None } # Track processing state self.is_processing = False self.report_results = {} # Store results for reports tab self.current_analysis_id = None # Track current analysis for saving self.analysis_start_time = None # Track when analysis started # Track application start time for uptime calculation self.app_start_time = datetime.now() # Cache GPU status to avoid repeated checks self._gpu_status_cache = None self._last_model_count = 0 # Initialize UI self.init_ui() # Load models in background self.load_models() # Start timer for status updates self.init_timer() def init_ui(self): """Initialize the user interface.""" # Set window style self.setStyleSheet(MAIN_WINDOW_STYLE) # Central widget central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) main_layout.setContentsMargins(0, 0, 0, 0) main_layout.setSpacing(0) # Header header = self.create_header() main_layout.addWidget(header) # Tab widget for different views self.tab_widget = QTabWidget() self.tab_widget.setStyleSheet(TAB_WIDGET_STYLE) # Dashboard tab (main) dashboard = self.create_dashboard() self.tab_widget.addTab(dashboard, "Dashboard") # Processing tabs - HIDDEN FOR NOW self.ripeness_tab = RipenessTab() self.ripeness_tab.load_audio_requested.connect(self.on_ripeness_load_audio) ripeness_index = self.tab_widget.addTab(self.ripeness_tab, "Ripeness") self.tab_widget.setTabVisible(ripeness_index, False) self.quality_tab = QualityTab() self.quality_tab.load_image_requested.connect(self.on_quality_load_image) quality_index = self.tab_widget.addTab(self.quality_tab, "Quality") self.tab_widget.setTabVisible(quality_index, False) self.maturity_tab = MaturityTab() self.maturity_tab.load_tiff_requested.connect(self.on_maturity_load_tiff) maturity_index = self.tab_widget.addTab(self.maturity_tab, "Maturity") self.tab_widget.setTabVisible(maturity_index, False) # Placeholder tabs (to be implemented in future) - HIDDEN FOR NOW self.parameters_tab = ParametersTab() parameters_index = self.tab_widget.addTab(self.parameters_tab, "Parameters") self.tab_widget.setTabVisible(parameters_index, False) self.reports_tab = ReportsTab() self.reports_tab.go_to_dashboard.connect(self.on_go_to_dashboard) self.tab_widget.addTab(self.reports_tab, "Reports") main_layout.addWidget(self.tab_widget) # Status bar status_bar = self.create_status_bar() main_layout.addWidget(status_bar) # Initial update of system info panel with existing data self.update_system_info_panel() def create_header(self) -> QFrame: """ Create the application header. Returns: QFrame: Header widget """ header = QFrame() header.setFixedHeight(80) header.setStyleSheet("background-color: #2c3e50;") layout = QHBoxLayout(header) layout.setContentsMargins(20, 10, 20, 10) # Logo on the left logo_path = PROJECT_ROOT / "assets" / "logos" / "dudong_logo.png" if logo_path.exists(): logo_label = QLabel() logo_pixmap = QPixmap(str(logo_path)) # Scale logo to fit header height (80px - margins) scaled_logo = logo_pixmap.scaledToHeight(60, Qt.SmoothTransformation) logo_label.setPixmap(scaled_logo) logo_label.setFixedWidth(60) layout.addWidget(logo_label) layout.addSpacing(15) else: # Debug: print path not found print(f"[DEBUG] Logo not found at: {logo_path}") # Title title = QLabel(WINDOW_TITLE) title.setStyleSheet("color: white; font-size: 22px; font-weight: bold;") layout.addWidget(title) layout.addStretch() # Icon buttons on the right # Help/Support button help_btn = QPushButton() help_icon = self.style().standardIcon(QStyle.SP_MessageBoxQuestion) help_btn.setIcon(help_icon) help_btn.setIconSize(QSize(24, 24)) help_btn.setStyleSheet(HEADER_ICON_BUTTON_STYLE) help_btn.setToolTip("Help & Support (F1)") help_btn.clicked.connect(self.show_help) layout.addWidget(help_btn) # About/Info button about_btn = QPushButton() about_icon = self.style().standardIcon(QStyle.SP_MessageBoxInformation) about_btn.setIcon(about_icon) about_btn.setIconSize(QSize(24, 24)) about_btn.setStyleSheet(HEADER_ICON_BUTTON_STYLE) about_btn.setToolTip("About DuDONG") about_btn.clicked.connect(self.show_about) layout.addWidget(about_btn) # Exit button exit_btn = QPushButton() exit_icon = self.style().standardIcon(QStyle.SP_MessageBoxCritical) exit_btn.setIcon(exit_icon) exit_btn.setIconSize(QSize(24, 24)) exit_btn.setStyleSheet(HEADER_ICON_BUTTON_STYLE) exit_btn.setToolTip("Exit Application (Ctrl+Q)") exit_btn.clicked.connect(self.close) layout.addWidget(exit_btn) return header def create_dashboard(self) -> QWidget: """ Create the main dashboard view. Returns: QWidget: Dashboard widget """ dashboard = QWidget() layout = QVBoxLayout(dashboard) # Top row: Status, Actions, and Results top_layout = QHBoxLayout() # System Status Panel (left) self.status_panel = SystemStatusPanel() self.status_panel.setMinimumWidth(360) # Pass models reference to status panel self.status_panel.set_models_reference(self.models) top_layout.addWidget(self.status_panel, 1) # Middle column: Quick Actions and Recent Results middle_layout = QVBoxLayout() # Quick Actions Panel self.actions_panel = QuickActionsPanel() self.actions_panel.setMinimumWidth(380) self.actions_panel.setMaximumHeight(350) # Increased for new button # Connect signals self.actions_panel.analyze_durian_clicked.connect(self.on_analyze_durian_clicked) self.actions_panel.ripeness_clicked.connect(self.on_ripeness_clicked) self.actions_panel.quality_clicked.connect(self.on_quality_clicked) self.actions_panel.calibration_clicked.connect(self.on_calibration_clicked) self.actions_panel.batch_clicked.connect(self.on_batch_clicked) middle_layout.addWidget(self.actions_panel) # Recent Results Panel (pass DataManager for database integration) self.results_panel = RecentResultsPanel(data_manager=self.data_manager) self.results_panel.setMinimumWidth(380) # Connect view button signal to handler self.results_panel.view_analysis_requested.connect(self.on_view_analysis) middle_layout.addWidget(self.results_panel) top_layout.addLayout(middle_layout, 2) layout.addLayout(top_layout) # Bottom row: System Info and Live Feeds bottom_layout = QHBoxLayout() # System Information Panel self.info_panel = SystemInfoPanel() self.info_panel.setMinimumWidth(560) bottom_layout.addWidget(self.info_panel, 2) # Live Feed Panel self.feed_panel = LiveFeedPanel() bottom_layout.addWidget(self.feed_panel, 1) layout.addLayout(bottom_layout) return dashboard def create_status_bar(self) -> QFrame: """ Create the application status bar. Returns: QFrame: Status bar widget """ status_bar = QFrame() status_bar.setFixedHeight(40) status_bar.setStyleSheet("background-color: #34495e;") layout = QHBoxLayout(status_bar) layout.setContentsMargins(20, 0, 20, 0) # Left side: detailed status self.status_text = QLabel("Ripeness Classifier Active | Model: RipeNet | GPU: -- | Processing: IDLE") self.status_text.setStyleSheet("color: #ecf0f1; font-size: 12px;") layout.addWidget(self.status_text) layout.addStretch() # Right side: ready indicator self.ready_indicator = QLabel("● READY FOR TESTING") self.ready_indicator.setStyleSheet("color: #27ae60; font-size: 12px; font-weight: bold;") layout.addWidget(self.ready_indicator) return status_bar def init_timer(self): """Initialize update timer.""" self.timer = QTimer() self.timer.timeout.connect(self.update_status_bar) self.timer.start(1000) # Update every second def update_status_bar(self): """ Update status bar with current time and info. Optimized to minimize overhead: - GPU status is cached and only checked once at startup - Model count is cached until it changes - Only text updates when status actually changes """ # Only update if footer components exist if not hasattr(self, 'status_text') or not hasattr(self, 'ready_indicator'): return # Get model load status (lightweight check) model_status = self.get_model_load_status() loaded_count = sum(1 for status in model_status.values() if status) # Cache GPU status after first check (it won't change during runtime) if self._gpu_status_cache is None: try: import torch self._gpu_status_cache = "Active" if torch.cuda.is_available() else "N/A" except: self._gpu_status_cache = "N/A" gpu_status = self._gpu_status_cache # Get processing status - only show "Processing" when actually processing if self.is_processing: processing_status = "Processing" ready_text = "● PROCESSING" ready_color = "#f39c12" # Orange else: processing_status = "IDLE" ready_text = "● READY FOR TESTING" ready_color = "#27ae60" # Green # Build status text (only if something changed to reduce UI updates) models_info = f"{loaded_count}/5" status = f"DuDONG Active | Model: {models_info} | GPU: {gpu_status} | Processing: {processing_status}" # Only update text if it actually changed if self.status_text.text() != status: self.status_text.setText(status) if self.ready_indicator.text() != ready_text: self.ready_indicator.setText(ready_text) self.ready_indicator.setStyleSheet(f"color: {ready_color}; font-size: 12px; font-weight: bold;") def load_models(self): """Load AI models in background.""" device = get_device() try: # Audio model (CPU for TensorFlow) print("Loading audio model...") self.models['audio'] = AudioModel(device='cpu') if self.models['audio'].load(): self.status_panel.update_model_status('ripeness', 'online', 'Loaded') print("✓ Audio model loaded") else: self.status_panel.update_model_status('ripeness', 'offline', 'Failed') print("✗ Audio model failed to load") except Exception as e: print(f"Error loading audio model: {e}") self.status_panel.update_model_status('ripeness', 'offline', 'Error') try: # Defect model (GPU) print("Loading defect model...") self.models['defect'] = DefectModel(device=device) if self.models['defect'].load(): self.status_panel.update_model_status('quality', 'online', 'Loaded') print("✓ Defect model loaded") else: self.status_panel.update_model_status('quality', 'offline', 'Failed') print("✗ Defect model failed to load") except Exception as e: print(f"Error loading defect model: {e}") self.status_panel.update_model_status('quality', 'offline', 'Error') try: # Locule model (GPU) print("Loading locule model...") self.models['locule'] = LoculeModel(device=device) if self.models['locule'].load(): self.status_panel.update_model_status('defect', 'online', 'Loaded') print("✓ Locule model loaded") else: self.status_panel.update_model_status('defect', 'offline', 'Failed') print("✗ Locule model failed to load") except Exception as e: print(f"Error loading locule model: {e}") self.status_panel.update_model_status('defect', 'offline', 'Error') try: # Maturity model (GPU) print("Loading maturity model...") self.models['maturity'] = MaturityModel(device=device) if self.models['maturity'].load(): print("✓ Maturity model loaded") else: print("✗ Maturity model failed to load") except Exception as e: print(f"Error loading maturity model: {e}") try: # Shape model (GPU) print("Loading shape model...") shape_model_path = PROJECT_ROOT / "model_files" / "shape.pt" self.models['shape'] = ShapeModel(str(shape_model_path), device=device) if self.models['shape'].load(): print("✓ Shape model loaded") else: print("✗ Shape model failed to load") except Exception as e: print(f"Error loading shape model: {e}") # Refresh system status display after all models are loaded if hasattr(self, 'status_panel'): self.status_panel.refresh_status() print("✓ System status panel updated") def get_model_load_status(self) -> Dict[str, bool]: """ Get the load status of all AI models. Returns: Dict mapping model key to loaded status """ return { 'audio': self.models['audio'].is_loaded if self.models['audio'] else False, 'defect': self.models['defect'].is_loaded if self.models['defect'] else False, 'locule': self.models['locule'].is_loaded if self.models['locule'] else False, 'maturity': self.models['maturity'].is_loaded if self.models['maturity'] else False, 'shape': self.models['shape'].is_loaded if self.models['shape'] else False, } def update_system_info_panel(self): """ Update System Information Panel with real-time statistics. Gathers data from: - DataManager: daily count, average processing time, model accuracy stats - App state: uptime Then calls update methods on self.info_panel to display the values. Note: Memory info is displayed in SystemStatusPanel, not here. """ try: if not hasattr(self, 'info_panel'): return # Calculate uptime uptime_delta = datetime.now() - self.app_start_time uptime_hours = uptime_delta.seconds // 3600 uptime_minutes = (uptime_delta.seconds % 3600) // 60 # Get statistics from DataManager daily_count = self.data_manager.get_daily_analysis_count() avg_processing_time = self.data_manager.get_average_processing_time() accuracy_stats = self.data_manager.get_model_accuracy_stats() # Update panel with all values self.info_panel.update_uptime(uptime_hours, uptime_minutes) self.info_panel.update_throughput(daily_count) self.info_panel.update_processing_time(avg_processing_time) # Update model accuracy stats for model_name, accuracy in accuracy_stats.items(): self.info_panel.update_accuracy(model_name, accuracy) print("✓ System info panel updated") except Exception as e: print(f"Error updating system info panel: {e}") import traceback traceback.print_exc() # ==================== Quick Action Handlers ==================== def on_analyze_durian_clicked(self, manual_mode: bool): """ Handle analyze durian button click. Args: manual_mode: True if manual input mode is selected, False for auto mode """ if manual_mode: # Manual mode: Show dialog for file selection self._handle_manual_input_mode() else: # Auto mode: Check if camera apps are running self._handle_auto_mode() def _handle_auto_mode(self): """Handle automatic camera control mode.""" print("Checking for camera applications...") # Check which camera apps are running missing_apps = get_missing_camera_apps() running_apps = get_running_camera_apps() print(f"Running camera apps: {running_apps}") print(f"Missing camera apps: {missing_apps}") if missing_apps: # Show dialog about missing apps dialog = CameraAppCheckDialog(missing_apps, self) dialog.exec_() print("Auto mode requires all camera applications to be running.") return # All apps are running - proceed with automated capture print("All camera applications detected. Attempting automated capture...") # Start with 2nd Look (multispectral) automation self._attempt_second_look_capture() def _attempt_second_look_capture(self): """Attempt to capture multispectral image from 2nd Look.""" try: print("Initializing 2nd Look automation...") # Create automation instance second_look = SecondLookAutomation() # Check if window is open if not second_look.is_window_open(): QMessageBox.warning( self, "2nd Look Not Responsive", "2nd Look window is not open or not responding.\n\n" "Please ensure 2nd Look is properly running before attempting automated capture." ) print("2nd Look window not found or not responsive") return print("Capturing multispectral image from 2nd Look...") # Perform capture captured_file = second_look.capture() if captured_file: print(f"Successfully captured: {captured_file}") # Create inputs dict with captured multispectral file inputs = { 'dslr': '', 'multispectral': captured_file, 'thermal': '', 'audio': '' } # Process the capture self._process_manual_inputs(inputs) # Cleanup automation instance try: second_look.cleanup() except Exception as e: print(f"Warning during cleanup: {e}") else: QMessageBox.warning( self, "Capture Failed", "Failed to capture multispectral image from 2nd Look.\n\n" "Please verify:\n" "1. 2nd Look has an image loaded\n" "2. The camera is properly connected\n" "3. File system has write permissions" ) print("Capture failed - file not created") except CameraAutomationError as e: QMessageBox.critical( self, "Camera Automation Error", f"Error during automated capture:\n\n{str(e)}" ) print(f"Automation error: {e}") except Exception as e: QMessageBox.critical( self, "Unexpected Error", f"Unexpected error during automated capture:\n\n{str(e)}" ) print(f"Unexpected error: {e}") def _attempt_eos_capture(self): """Attempt to capture image from EOS Utility DSLR.""" try: print("Initializing EOS Utility automation...") # Create automation instance eos_utility = EOSUtilityAutomation() # Check if window is open if not eos_utility.is_window_open(): QMessageBox.warning( self, "EOS Utility Not Responsive", "EOS Utility window is not open or not responding.\n\n" "Please ensure EOS Utility is properly running before attempting automated capture." ) print("EOS Utility window not found or not responsive") return print("Capturing image from EOS Utility...") # Perform capture captured_file = eos_utility.capture() if captured_file: print(f"Successfully captured: {captured_file}") # Create inputs dict with captured DSLR file inputs = { 'dslr': captured_file, 'multispectral': '', 'thermal': '', 'audio': '' } # Process the capture self._process_manual_inputs(inputs) # Cleanup automation instance try: eos_utility.cleanup() except Exception as e: print(f"Warning during cleanup: {e}") else: QMessageBox.warning( self, "Capture Failed", "Failed to capture image from EOS Utility.\n\n" "Please verify:\n" "1. EOS Utility has a camera connected\n" "2. The camera is properly initialized\n" "3. File system has write permissions" ) print("Capture failed - file not created") except CameraAutomationError as e: QMessageBox.critical( self, "Camera Automation Error", f"Error during automated capture:\n\n{str(e)}" ) print(f"Automation error: {e}") except Exception as e: QMessageBox.critical( self, "Unexpected Error", f"Unexpected error during automated capture:\n\n{str(e)}" ) print(f"Unexpected error: {e}") def _attempt_analyzir_capture(self): """Attempt to capture thermal data from AnalyzIR.""" try: print("Initializing AnalyzIR automation...") # Create automation instance analyzir = AnalyzIRAutomation() # Check if window is open if not analyzir.is_window_open(): QMessageBox.warning( self, "AnalyzIR Not Responsive", "AnalyzIR Venus window is not open or not responding.\n\n" "Please ensure AnalyzIR is properly running before attempting automated capture." ) print("AnalyzIR window not found or not responsive") return print("Capturing thermal data from AnalyzIR...") # Perform capture captured_file = analyzir.capture() if captured_file: print(f"Successfully captured: {captured_file}") # Create inputs dict with captured thermal file inputs = { 'dslr': '', 'multispectral': '', 'thermal': captured_file, 'audio': '' } # Process the capture self._process_manual_inputs(inputs) # Cleanup automation instance try: analyzir.cleanup() except Exception as e: print(f"Warning during cleanup: {e}") else: QMessageBox.warning( self, "Capture Failed", "Failed to capture thermal data from AnalyzIR.\n\n" "Please verify:\n" "1. AnalyzIR has thermal image data loaded\n" "2. The IR camera (FOTRIC 323) is properly connected\n" "3. File system has write permissions" ) print("Capture failed - file not created") except CameraAutomationError as e: QMessageBox.critical( self, "Camera Automation Error", f"Error during automated capture:\n\n{str(e)}" ) print(f"Automation error: {e}") except Exception as e: QMessageBox.critical( self, "Unexpected Error", f"Unexpected error during automated capture:\n\n{str(e)}" ) print(f"Unexpected error: {e}") def _handle_manual_input_mode(self): """Handle manual input mode with file dialogs.""" print("Opening manual input dialog...") dialog = ManualInputDialog(self) dialog.inputs_confirmed.connect(self._process_manual_inputs) dialog.exec_() def _process_manual_inputs(self, inputs: dict): """ Process manual camera inputs with RGB and multispectral models. Args: inputs: Dictionary with keys 'dslr_side', 'dslr_top', 'multispectral', 'thermal', 'audio' """ print("Processing manual inputs:") print(f" DSLR Side: {inputs.get('dslr_side', 'Not provided')}") print(f" DSLR Top: {inputs.get('dslr_top', 'Not provided')}") print(f" Multispectral: {inputs.get('multispectral', 'Not provided')}") print(f" Thermal: {inputs.get('thermal', 'Not provided')}") print(f" Audio: {inputs.get('audio', 'Not provided')}") # Create analysis record report_id = f"DUR-{datetime.now().strftime('%Y%m%d-%H%M%S')}" self.current_analysis_id = self.data_manager.create_analysis(report_id, DEVICE_ID) self.analysis_start_time = datetime.now() if not self.current_analysis_id: QMessageBox.critical(self, "Error", "Failed to create analysis record. Cannot proceed.") return print(f"Created analysis record: {report_id} (ID: {self.current_analysis_id})") # Save input files for input_type, file_path in inputs.items(): if file_path: self.data_manager.save_input_file(self.current_analysis_id, input_type, file_path) # Navigate to Reports tab self.tab_widget.setCurrentIndex(5) # Store inputs in reports tab self.reports_tab.input_data = inputs self.reports_tab.current_analysis_id = self.current_analysis_id self.reports_tab.current_report_id = report_id # Store the actual report ID self.reports_tab.data_manager = self.data_manager # Reset results and tracking self.report_results = {} self.models_to_run = [] # Track which models we'll run # Determine which models to run based on inputs if inputs.get('dslr_side'): print("DSLR Side (Defect Model) detected...") self.models_to_run.append('defect') if inputs.get('dslr_top'): print("DSLR Top (Locule Model) detected...") self.models_to_run.append('locule') if inputs.get('multispectral'): print("Multispectral TIFF (Maturity Model) detected...") self.models_to_run.append('maturity') # Shape processing uses dslr_side if available AND model is loaded if inputs.get('dslr_side') and self.models.get('shape') and self.models['shape'].is_loaded: print("Shape Classification will be processed...") self.models_to_run.append('shape') # Audio ripeness processing uses audio file if available AND model is loaded if inputs.get('audio') and self.models.get('audio') and self.models['audio'].is_loaded: print("Audio Ripeness Classification will be processed...") self.models_to_run.append('audio') elif inputs.get('audio'): print("⚠️ Audio file provided but audio model not loaded!") # Start loading state if models to process if len(self.models_to_run) > 0: self.is_processing = True self.reports_tab.set_loading(True) else: # No models to run, just show the inputs self.reports_tab.generate_report(inputs) return # Process DSLR Side View with Defect Model if inputs.get('dslr_side'): worker = DefectWorker(inputs['dslr_side'], self.models['defect']) worker.signals.started.connect(lambda: self.on_worker_started("Defect Analysis")) worker.signals.result_ready.connect(self.on_defect_report_result) worker.signals.error.connect(lambda msg: self.on_worker_error_manual(msg, 'defect')) worker.signals.finished.connect(lambda: self.on_worker_finished()) worker.signals.progress.connect(self.on_worker_progress) self.thread_pool.start(worker) # Process DSLR Top View with Locule Model if inputs.get('dslr_top'): worker = LoculeWorker(inputs['dslr_top'], self.models['locule']) worker.signals.started.connect(lambda: self.on_worker_started("Locule Analysis")) worker.signals.result_ready.connect(self.on_locule_report_result) worker.signals.error.connect(lambda msg: self.on_worker_error_manual(msg, 'locule')) worker.signals.finished.connect(lambda: self.on_worker_finished()) worker.signals.progress.connect(self.on_worker_progress) self.thread_pool.start(worker) # Process Multispectral with Maturity Model if inputs.get('multispectral'): worker = MaturityWorker(inputs['multispectral'], self.models['maturity']) worker.signals.started.connect(lambda: self.on_worker_started("Maturity Classification")) worker.signals.result_ready.connect(self.on_maturity_report_result) worker.signals.error.connect(lambda msg: self.on_worker_error_manual(msg, 'maturity')) worker.signals.finished.connect(lambda: self.on_worker_finished()) worker.signals.progress.connect(self.on_worker_progress) self.thread_pool.start(worker) # Process DSLR Side View with Shape Model (uses same image as defect) if inputs.get('dslr_side') and self.models.get('shape') and self.models['shape'].is_loaded: worker = ShapeWorker(inputs['dslr_side'], self.models['shape']) worker.signals.started.connect(lambda: self.on_worker_started("Shape Classification")) worker.signals.result_ready.connect(self.on_shape_report_result) worker.signals.error.connect(lambda msg: self.on_worker_error_manual(msg, 'shape')) worker.signals.finished.connect(lambda: self.on_worker_finished()) worker.signals.progress.connect(self.on_worker_progress) self.thread_pool.start(worker) # Process Audio File with Audio Ripeness Model if inputs.get('audio') and self.models.get('audio') and self.models['audio'].is_loaded: print(f"Starting AudioWorker for: {inputs['audio']}") worker = AudioWorker(inputs['audio'], self.models['audio']) worker.signals.started.connect(lambda: self.on_worker_started("Audio Ripeness Classification")) worker.signals.result_ready.connect(self.on_audio_report_result) worker.signals.error.connect(lambda msg: self.on_worker_error_manual(msg, 'audio')) worker.signals.finished.connect(lambda: self.on_worker_finished()) worker.signals.progress.connect(self.on_worker_progress) self.thread_pool.start(worker) def on_ripeness_clicked(self): """Handle ripeness classifier button click - switch to Ripeness tab.""" # Switch to Ripeness tab (index 1) self.tab_widget.setCurrentIndex(1) # The tab will handle file loading via its own signal # Trigger file dialog immediately self.on_ripeness_load_audio() def on_ripeness_load_audio(self): """Handle audio file loading for ripeness classification.""" if self.is_processing: QMessageBox.warning(self, "Processing", "Please wait for current processing to complete.") return # Open file dialog for audio # Use non-native dialog to avoid Windows shell freezing issues file_path, _ = QFileDialog.getOpenFileName( self, "Select Audio File", DEFAULT_DIRS['audio'], FILE_FILTERS['audio'], options=QFileDialog.DontUseNativeDialog ) if not file_path: return print(f"Processing audio file: {file_path}") self.is_processing = True self.current_audio_file = file_path # Store for result handler # Set loading state on ripeness tab self.ripeness_tab.set_loading(True) # Create and start worker worker = AudioWorker(file_path, self.models['audio']) worker.signals.started.connect(lambda: self.on_worker_started("Audio Processing")) worker.signals.result_ready.connect(self.on_audio_result) worker.signals.error.connect(self.on_worker_error) worker.signals.finished.connect(lambda: self.on_worker_finished()) worker.signals.progress.connect(self.on_worker_progress) self.thread_pool.start(worker) def on_quality_clicked(self): """Handle quality classifier button click - switch to Quality tab.""" # Switch to Quality tab (index 2) self.tab_widget.setCurrentIndex(2) # Trigger file dialog via the control panel # Use QTimer to ensure the tab is fully visible before opening dialog if hasattr(self.quality_tab, 'control_panel'): QTimer.singleShot(100, self.quality_tab.control_panel._open_file_dialog) def on_quality_load_image(self): """Handle image file loading for quality classification.""" if self.is_processing: QMessageBox.warning(self, "Processing", "Please wait for current processing to complete.") return # Open file dialog for image # Use non-native dialog to avoid Windows shell freezing issues file_path, _ = QFileDialog.getOpenFileName( self, "Select Image File", DEFAULT_DIRS['image'], FILE_FILTERS['image'], options=QFileDialog.DontUseNativeDialog ) if not file_path: return print(f"Processing image file: {file_path}") self.is_processing = True self.current_image_file = file_path # Store for result handler # Set loading state on quality tab self.quality_tab.set_loading(True) # Create and start worker worker = DefectWorker(file_path, self.models['defect']) worker.signals.started.connect(lambda: self.on_worker_started("Defect Detection")) worker.signals.result_ready.connect(self.on_defect_result) worker.signals.error.connect(self.on_worker_error) worker.signals.finished.connect(lambda: self.on_worker_finished()) worker.signals.progress.connect(self.on_worker_progress) self.thread_pool.start(worker) def on_maturity_load_tiff(self): """Handle TIFF file loading for maturity classification.""" if self.is_processing: QMessageBox.warning(self, "Processing", "Please wait for current processing to complete.") return # Open file dialog for TIFF # Use non-native dialog to avoid Windows shell freezing issues file_path, _ = QFileDialog.getOpenFileName( self, "Select Multispectral TIFF File", DEFAULT_DIRS.get('image', str(Path.home())), FILE_FILTERS.get('tiff', "TIFF Files (*.tif *.tiff);;All Files (*.*)"), options=QFileDialog.DontUseNativeDialog ) if not file_path: return print(f"Processing TIFF file: {file_path}") self.is_processing = True self.current_tiff_file = file_path # Store for result handler # Set loading state on maturity tab self.maturity_tab.set_loading(True) # Create and start worker worker = MaturityWorker(file_path, self.models['maturity']) worker.signals.started.connect(lambda: self.on_worker_started("Maturity Classification")) worker.signals.result_ready.connect(self.on_maturity_result) worker.signals.error.connect(self.on_worker_error) worker.signals.finished.connect(lambda: self.on_worker_finished()) worker.signals.progress.connect(self.on_worker_progress) self.thread_pool.start(worker) def on_calibration_clicked(self): """Handle calibration button click.""" QMessageBox.information( self, "System Calibration", "Calibration feature coming soon!\n\n" "This will allow you to:\n" "- Adjust detection thresholds\n" "- Fine-tune model parameters\n" "- Calibrate camera settings" ) def on_batch_clicked(self): """Handle batch mode button click.""" QMessageBox.information( self, "Batch Processing", "Batch mode coming soon!\n\n" "This will allow you to:\n" "- Process multiple files at once\n" "- Export results to CSV/Excel\n" "- Generate batch reports" ) # ==================== Worker Signal Handlers ==================== def on_worker_started(self, task_name: str): """Handle worker started signal.""" print(f"{task_name} started") self.status_text.setText(f"Processing: {task_name}...") def on_worker_progress(self, percentage: int, message: str): """Handle worker progress signal.""" print(f"Progress: {percentage}% - {message}") self.status_text.setText(f"{message} ({percentage}%)") def on_worker_finished(self): """Handle worker finished signal.""" self.is_processing = False self.status_text.setText("Ready") print("Processing completed") def on_worker_error(self, error_msg: str): """Handle worker error signal.""" self.is_processing = False QMessageBox.critical(self, "Processing Error", f"An error occurred:\n\n{error_msg}") print(f"Error: {error_msg}") def on_worker_error_manual(self, error_msg: str, model_name: str): """ Handle worker error in manual input mode. Instead of crashing, record the error and continue with other models. """ print(f"Error in {model_name} model: {error_msg}") # Mark this model as completed with no result (error case) self.report_results[model_name] = { 'error': True, 'error_msg': error_msg } # Check if all models have answered (success or error) self._check_all_reports_ready() def on_audio_result(self, waveform_image, spectrogram_image, class_name, confidence, probabilities, knock_count): """Handle audio processing result for Ripeness tab (standalone mode).""" print(f"Audio result: {class_name} ({confidence:.2%}) - {knock_count} knocks") # Update ripeness tab with spectrogram results self.ripeness_tab.update_results( spectrogram_image, class_name.capitalize() if class_name else "Unknown", probabilities, self.current_audio_file ) # Map quality (for now, use confidence-based grading) if confidence >= 0.90: quality = "Grade A" elif confidence >= 0.75: quality = "Grade B" else: quality = "Grade C" # Add to results table self.results_panel.add_result(class_name.capitalize() if class_name else "Unknown", quality, confidence) def on_maturity_result(self, gradcam_image, class_name, confidence, probabilities): """Handle maturity processing result.""" print(f"Maturity result: {class_name} ({confidence:.2f}%)") # Update maturity tab with results self.maturity_tab.update_results( gradcam_image, class_name, probabilities, self.current_tiff_file ) # Add to results table (if available) # This can be extended later to add maturity results to dashboard def on_maturity_report_result(self, gradcam_image, class_name, confidence, probabilities): """Handle maturity processing result for Reports tab.""" print(f"Maturity report result: {class_name} ({confidence:.2f}%)") self.report_results['maturity'] = { 'gradcam_image': gradcam_image, 'class_name': class_name, 'confidence': confidence, 'probabilities': probabilities } # Save result to database if self.current_analysis_id: # Convert confidence from percentage to 0-1 if needed conf_value = confidence / 100.0 if confidence > 1.0 else confidence self.data_manager.save_result( self.current_analysis_id, 'maturity', { 'predicted_class': class_name, 'confidence': conf_value, 'probabilities': probabilities, 'processing_time': 0.0, 'metadata': {} } ) # Save Grad-CAM visualization if gradcam_image: self.data_manager.save_visualization( self.current_analysis_id, 'maturity_gradcam', gradcam_image, 'png' ) self._check_all_reports_ready() def on_defect_report_result(self, annotated_image, primary_class, class_counts, total_detections): """Handle defect processing result for Reports tab (side view).""" print(f"Defect report result: {primary_class} ({total_detections} detections)") self.report_results['defect'] = { 'annotated_image': annotated_image, 'primary_class': primary_class, 'class_counts': class_counts, 'total_detections': total_detections } # Save result to database if self.current_analysis_id: metadata = { 'total_detections': total_detections, 'class_counts': class_counts } self.data_manager.save_result( self.current_analysis_id, 'defect', { 'predicted_class': primary_class, 'confidence': 0.85, # Default confidence 'probabilities': {}, 'processing_time': 0.0, 'metadata': metadata } ) # Save visualization if annotated_image: self.data_manager.save_visualization( self.current_analysis_id, 'defect_annotated', annotated_image, 'jpg' ) self._check_all_reports_ready() def on_locule_report_result(self, annotated_image, locule_count): """Handle locule processing result for Reports tab (top view).""" print(f"Locule report result: {locule_count} locules detected") self.report_results['locule'] = { 'annotated_image': annotated_image, 'locule_count': locule_count } # Save result to database if self.current_analysis_id: metadata = { 'locule_count': locule_count } self.data_manager.save_result( self.current_analysis_id, 'locule', { 'predicted_class': f'{locule_count} locules', 'confidence': 0.90, # Default confidence 'probabilities': {}, 'processing_time': 0.0, 'metadata': metadata } ) # Save visualization if annotated_image: self.data_manager.save_visualization( self.current_analysis_id, 'locule_annotated', annotated_image, 'jpg' ) self._check_all_reports_ready() def on_shape_report_result(self, annotated_image, shape_class, class_id, confidence): """Handle shape classification result for Reports tab.""" print(f"Shape report result: {shape_class} (confidence: {confidence:.3f})") self.report_results['shape'] = { 'annotated_image': annotated_image, 'shape_class': shape_class, 'class_id': class_id, 'confidence': confidence } # Save result to database if self.current_analysis_id: self.data_manager.save_result( self.current_analysis_id, 'shape', { 'predicted_class': shape_class, 'confidence': confidence, 'probabilities': {}, 'processing_time': 0.0, 'metadata': {'class_id': class_id} } ) # Save visualization if annotated_image: self.data_manager.save_visualization( self.current_analysis_id, 'shape_annotated', annotated_image, 'jpg' ) self._check_all_reports_ready() def on_audio_report_result(self, waveform_image, spectrogram_image, ripeness_class, confidence, probabilities, knock_count): """Handle audio ripeness classification result for Reports tab.""" print(f"✓ Audio report result: {ripeness_class} ({confidence:.2%}) - {knock_count} knocks") print(f" Waveform image: {type(waveform_image)} size={waveform_image.size() if hasattr(waveform_image, 'size') else 'N/A'}") print(f" Spectrogram image: {type(spectrogram_image)} size={spectrogram_image.size() if hasattr(spectrogram_image, 'size') else 'N/A'}") print(f" Probabilities: {probabilities}") self.report_results['audio'] = { 'waveform_image': waveform_image, 'spectrogram_image': spectrogram_image, 'ripeness_class': ripeness_class, 'confidence': confidence, 'probabilities': probabilities, 'knock_count': knock_count } # Save result to database if self.current_analysis_id: # Convert confidence from percentage to 0-1 if needed conf_value = confidence / 100.0 if confidence > 1.0 else confidence metadata = { 'knock_count': knock_count } self.data_manager.save_result( self.current_analysis_id, 'audio', { 'predicted_class': ripeness_class, 'confidence': conf_value, 'probabilities': probabilities, 'processing_time': 0.0, 'metadata': metadata } ) # Save waveform visualization if waveform_image: self.data_manager.save_visualization( self.current_analysis_id, 'audio_waveform', waveform_image, 'png' ) # Save spectrogram visualization if spectrogram_image: self.data_manager.save_visualization( self.current_analysis_id, 'audio_spectrogram', spectrogram_image, 'png' ) self._check_all_reports_ready() def _check_all_reports_ready(self): """ Check if all pending reports are ready (success or error), then generate combined report with available data. """ if not hasattr(self, 'models_to_run'): return # Safety check # Check if all models have reported (success or error) models_answered = set(self.report_results.keys()) models_expected = set(self.models_to_run) print(f"Models to run: {models_expected}") print(f"Models answered: {models_answered}") if models_answered >= models_expected and len(models_expected) > 0: # All models have answered (success or failure) print("All models answered - generating report with available data...") self.reports_tab.generate_report_with_rgb_and_multispectral( self.reports_tab.input_data, self.report_results ) # Finalize analysis in database if self.current_analysis_id and hasattr(self.reports_tab, 'current_overall_grade'): grade = getattr(self.reports_tab, 'current_overall_grade', 'B') description = getattr(self.reports_tab, 'current_grade_description', '') if self.analysis_start_time: total_time = (datetime.now() - self.analysis_start_time).total_seconds() else: total_time = 0.0 self.data_manager.finalize_analysis( self.current_analysis_id, grade, description, total_time ) print(f"Finalized analysis with grade {grade}") # Always refresh recent results panel to show new analysis (after report is generated) self.results_panel.refresh_from_database() # Update system info panel with new statistics self.update_system_info_panel() self.reports_tab.set_loading(False) self.is_processing = False def on_defect_result(self, annotated_image, primary_class, class_counts, total_detections): """Handle defect detection result.""" print(f"Defect result: {primary_class} ({total_detections} detections)") # Update quality tab with results self.quality_tab.update_results( annotated_image, primary_class, class_counts, total_detections, self.current_image_file ) # Map to quality grade if primary_class == "No Defects": quality = "Grade A" confidence = 95.0 elif primary_class == "Minor Defects": quality = "Grade B" confidence = 80.0 else: # Reject quality = "Grade C" confidence = 70.0 # Add to results table (use "N/A" for ripeness since it's quality-only) self.results_panel.add_result("N/A", quality, confidence) # ==================== Menu Handlers ==================== def show_about(self): """Show the about dialog.""" dialog = AboutDialog(self) dialog.exec_() def show_help(self): """Show the help dialog.""" dialog = HelpDialog(self) dialog.exec_() def on_go_to_dashboard(self): """Handle request to go back to dashboard from reports tab.""" self.tab_widget.setCurrentIndex(0) # Switch to Dashboard (index 0) def on_view_analysis(self, report_id: str): """ Handle view analysis request from recent results panel. Args: report_id: The report ID to load and display """ # Validate report_id if not report_id or report_id == 'N/A': QMessageBox.warning(self, "Error", f"Invalid report ID: {report_id}") return # Ensure data manager is set if not self.data_manager: QMessageBox.warning(self, "Error", "Data manager not initialized") return self.reports_tab.data_manager = self.data_manager # Switch to Reports tab (index 5) self.tab_widget.setCurrentIndex(5) # Load the analysis from database success = self.reports_tab.load_analysis_from_db(report_id) if not success: QMessageBox.warning(self, "Error", f"Could not load analysis: {report_id}") def closeEvent(self, event): """Handle window close event.""" # Wait for all threads to finish if self.thread_pool.activeThreadCount() > 0: reply = QMessageBox.question( self, "Processing in Progress", "Processing is still running. Are you sure you want to exit?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No ) if reply == QMessageBox.No: event.ignore() return print("Application closing...") self.thread_pool.waitForDone(3000) # Wait up to 3 seconds event.accept()