""" Base Worker Module Base class for all worker threads in the DuDONG system. Provides common functionality for async processing with Qt signals. """ from typing import Any import logging from PyQt5.QtCore import QRunnable, QObject, pyqtSignal, pyqtSlot logger = logging.getLogger(__name__) class WorkerSignals(QObject): """ Base signals for worker threads. All workers emit these common signals for UI updates. Signals: started: Emitted when worker starts processing finished: Emitted when worker completes (success or failure) error: Emitted when an error occurs (error_message: str) progress: Emitted to report progress (percentage: int, message: str) """ started = pyqtSignal() finished = pyqtSignal() error = pyqtSignal(str) # error message progress = pyqtSignal(int, str) # percentage, message class BaseWorker(QRunnable): """ Base class for worker threads. All worker classes should inherit from this and implement the process() method. Workers run in QThreadPool for non-blocking UI updates. Attributes: signals: WorkerSignals instance for emitting signals """ def __init__(self): """Initialize the base worker.""" super().__init__() self.signals = WorkerSignals() self._is_cancelled = False def process(self) -> Any: """ Process the worker's task. This method must be implemented by all subclasses. Should perform the actual work and return results. Returns: Any: Processing results (format depends on worker type) Raises: NotImplementedError: If not overridden by subclass Exception: Any errors during processing """ raise NotImplementedError("Subclasses must implement process() method") @pyqtSlot() def run(self): """ Run the worker. This is called by QThreadPool. It handles the workflow: 1. Emit started signal 2. Call process() method 3. Emit finished signal 4. Catch and emit any errors """ try: logger.info(f"{self.__class__.__name__} started") self.signals.started.emit() # Call the subclass's process method self.process() logger.info(f"{self.__class__.__name__} finished") except Exception as e: error_msg = f"{self.__class__.__name__} error: {str(e)}" logger.error(error_msg) self.signals.error.emit(error_msg) finally: self.signals.finished.emit() def cancel(self): """ Request cancellation of the worker. Note: This sets a flag - the process() method must check self._is_cancelled periodically to actually stop. """ logger.info(f"{self.__class__.__name__} cancellation requested") self._is_cancelled = True def is_cancelled(self) -> bool: """ Check if worker has been cancelled. Returns: bool: True if cancelled, False otherwise """ return self._is_cancelled def emit_progress(self, percentage: int, message: str = ""): """ Emit a progress update. Args: percentage: Progress percentage (0-100) message: Optional progress message """ self.signals.progress.emit(percentage, message)