base_worker.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. """
  2. Base Worker Module
  3. Base class for all worker threads in the DuDONG system.
  4. Provides common functionality for async processing with Qt signals.
  5. """
  6. from typing import Any
  7. import logging
  8. from PyQt5.QtCore import QRunnable, QObject, pyqtSignal, pyqtSlot
  9. logger = logging.getLogger(__name__)
  10. class WorkerSignals(QObject):
  11. """
  12. Base signals for worker threads.
  13. All workers emit these common signals for UI updates.
  14. Signals:
  15. started: Emitted when worker starts processing
  16. finished: Emitted when worker completes (success or failure)
  17. error: Emitted when an error occurs (error_message: str)
  18. progress: Emitted to report progress (percentage: int, message: str)
  19. """
  20. started = pyqtSignal()
  21. finished = pyqtSignal()
  22. error = pyqtSignal(str) # error message
  23. progress = pyqtSignal(int, str) # percentage, message
  24. class BaseWorker(QRunnable):
  25. """
  26. Base class for worker threads.
  27. All worker classes should inherit from this and implement the process() method.
  28. Workers run in QThreadPool for non-blocking UI updates.
  29. Attributes:
  30. signals: WorkerSignals instance for emitting signals
  31. """
  32. def __init__(self):
  33. """Initialize the base worker."""
  34. super().__init__()
  35. self.signals = WorkerSignals()
  36. self._is_cancelled = False
  37. def process(self) -> Any:
  38. """
  39. Process the worker's task.
  40. This method must be implemented by all subclasses.
  41. Should perform the actual work and return results.
  42. Returns:
  43. Any: Processing results (format depends on worker type)
  44. Raises:
  45. NotImplementedError: If not overridden by subclass
  46. Exception: Any errors during processing
  47. """
  48. raise NotImplementedError("Subclasses must implement process() method")
  49. @pyqtSlot()
  50. def run(self):
  51. """
  52. Run the worker.
  53. This is called by QThreadPool. It handles the workflow:
  54. 1. Emit started signal
  55. 2. Call process() method
  56. 3. Emit finished signal
  57. 4. Catch and emit any errors
  58. """
  59. try:
  60. logger.info(f"{self.__class__.__name__} started")
  61. self.signals.started.emit()
  62. # Call the subclass's process method
  63. self.process()
  64. logger.info(f"{self.__class__.__name__} finished")
  65. except Exception as e:
  66. error_msg = f"{self.__class__.__name__} error: {str(e)}"
  67. logger.error(error_msg)
  68. self.signals.error.emit(error_msg)
  69. finally:
  70. self.signals.finished.emit()
  71. def cancel(self):
  72. """
  73. Request cancellation of the worker.
  74. Note: This sets a flag - the process() method must check
  75. self._is_cancelled periodically to actually stop.
  76. """
  77. logger.info(f"{self.__class__.__name__} cancellation requested")
  78. self._is_cancelled = True
  79. def is_cancelled(self) -> bool:
  80. """
  81. Check if worker has been cancelled.
  82. Returns:
  83. bool: True if cancelled, False otherwise
  84. """
  85. return self._is_cancelled
  86. def emit_progress(self, percentage: int, message: str = ""):
  87. """
  88. Emit a progress update.
  89. Args:
  90. percentage: Progress percentage (0-100)
  91. message: Optional progress message
  92. """
  93. self.signals.progress.emit(percentage, message)