""" Camera Automation Module Provides GUI automation for controlling external camera applications. Uses pywinauto for Windows-based window automation and keyboard/mouse control. Supports: - 2nd Look (Multispectral Camera) - EOS Utility (DSLR Camera) - AnalyzIR (Thermal Camera) - planned """ import os import sys import time import tempfile import shutil from pathlib import Path from abc import ABC, abstractmethod from typing import Optional, Tuple from datetime import datetime import logging # Configure logging logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) try: from pywinauto import Application, findwindows from pywinauto.keyboard import send_keys PYWINAUTO_AVAILABLE = True except ImportError: PYWINAUTO_AVAILABLE = False logger.warning("pywinauto not available - GUI automation will be limited") try: import pyautogui PYAUTOGUI_AVAILABLE = True except ImportError: PYAUTOGUI_AVAILABLE = False logger.warning("pyautogui not available - fallback automation unavailable") class CameraAutomationError(Exception): """Raised when camera automation fails.""" pass class CameraAutomation(ABC): """ Abstract base class for camera application automation. All camera automation implementations should inherit from this class and implement the required methods. """ def __init__(self, app_name: str, window_title: str = None): """ Initialize camera automation. Args: app_name: Name of the application (e.g., "2nd Look", "EOS Utility") window_title: Optional window title to search for """ self.app_name = app_name self.window_title = window_title or app_name self.app = None self.window = None self.last_capture_time = None logger.info(f"Initialized {self.__class__.__name__} for {app_name}") @abstractmethod def find_window(self) -> bool: """ Find and connect to the application window. Returns: bool: True if window found and connected, False otherwise """ pass @abstractmethod def is_window_open(self) -> bool: """ Check if the application window is currently open and responsive. Returns: bool: True if window is open and responsive, False otherwise """ pass @abstractmethod def capture(self, output_dir: str = None) -> Optional[str]: """ Perform automated capture from the application. Args: output_dir: Directory to save captured file (optional) Returns: str: Path to captured file if successful, None otherwise """ pass def get_last_image(self) -> Optional[str]: """ Retrieve the path to the last captured image. Returns: str: Path to last image if available, None otherwise """ # Default implementation - should be overridden by subclasses return None class SecondLookAutomation(CameraAutomation): """ Automation for 2nd Look multispectral camera application. Handles: - Finding and connecting to 2nd Look window - Clicking Record button and Trigger Software - Monitoring for TIFF file creation in Recordings directory - Stopping the recording - Cleanup of temporary files Recording directory: C:\\Users\\[USERNAME]\\Documents\\IO Industries\\2ndLook\\Recordings\\ File pattern: Recording_YYYY-MM-DD_HH_MM_SS\\TIFF\\Camera\\Image_XXXXXX.tif """ # Configuration constants RECORDING_BASE_DIR = Path.home() / "Documents" / "IO Industries" / "2ndLook" / "Recordings" TRIGGER_WAIT_TIME = 2 # seconds to wait after trigger FILE_WATCH_TIMEOUT = 15 # seconds to wait for file creation STOP_WAIT_TIME = 1 # seconds to wait after clicking stop # Button coordinates (calibrate with calibrate_2ndlook_buttons.py if needed) RECORD_BUTTON_POS = (64, 1422) # Red circle at bottom left TRIGGER_SOFTWARE_POS = (125, 357) # Toolbar button STOP_BUTTON_POS = (319, 1422) # Bottom stop button def __init__(self, default_save_dir: str = None): """ Initialize 2nd Look automation. Args: default_save_dir: Default directory where 2nd Look saves files """ super().__init__("2nd Look", "2ndLook - Control") self.default_save_dir = Path(default_save_dir) if default_save_dir else None self.last_image_path = None self.temp_dir = Path(tempfile.gettempdir()) / "dudong_2ndlook" self.temp_dir.mkdir(exist_ok=True) logger.info(f"2nd Look temp directory: {self.temp_dir}") logger.debug(f"Button coordinates - Record: {self.RECORD_BUTTON_POS}, Trigger: {self.TRIGGER_SOFTWARE_POS}, Stop: {self.STOP_BUTTON_POS}") def find_window(self) -> bool: """ Find and connect to 2nd Look window. Returns: bool: True if found, False otherwise """ if not PYWINAUTO_AVAILABLE: logger.error("pywinauto not available - cannot find window") return False try: logger.info(f"Searching for '{self.window_title}' window...") # Try to find window by title windows = findwindows.find_windows(title_re=f".*{self.window_title}.*") if windows: logger.info(f"Found {len(windows)} window(s) matching '{self.window_title}'") window_handle = windows[0] # Try to connect to application using the window handle try: # Use the window handle directly with pywinauto self.app = Application(backend="uia").connect(handle=window_handle) self.window = self.app.window(handle=window_handle) logger.info(f"Connected to 2nd Look (handle: {window_handle})") return True except Exception as e: logger.warning(f"Could not connect via UIA backend: {e}") try: # Fallback: try using top_level_only self.app = Application(backend="uia").connect(top_level_only=False) logger.info("Connected to application using fallback method") return True except Exception as e2: logger.warning(f"Fallback connection also failed: {e2}") return False else: logger.warning(f"No window found matching '{self.window_title}'") return False except Exception as e: logger.error(f"Error finding 2nd Look window: {e}") return False def is_window_open(self) -> bool: """ Check if 2nd Look is open and responsive. Returns: bool: True if open and responsive, False otherwise """ try: if not PYWINAUTO_AVAILABLE: logger.debug("pywinauto not available - cannot check window state") return False # Check for window existence windows = findwindows.find_windows(title_re=f".*{self.window_title}.*") if windows: logger.debug("2nd Look window is open") return True else: logger.debug("2nd Look window not found") return False except Exception as e: logger.warning(f"Error checking window state: {e}") return False def capture(self, output_dir: str = None) -> Optional[str]: """ Capture multispectral image from 2nd Look using Record and Trigger Software. Workflow: 1. Click Record button (red circle) 2. Click Trigger Software button 3. Wait for TIFF file in Recordings directory 4. Click Stop button 5. Copy captured file to output directory Args: output_dir: Directory to save TIFF (uses temp dir if not specified) Returns: str: Path to captured TIFF file, or None if capture failed """ output_dir = Path(output_dir) if output_dir else self.temp_dir output_dir.mkdir(parents=True, exist_ok=True) try: logger.info("Starting 2nd Look capture workflow...") # Step 1: Verify window is open if not self.is_window_open(): logger.error("2nd Look window is not open") raise CameraAutomationError("2nd Look application not found or not running") # Step 2: Reconnect to window if needed if self.app is None or self.window is None: if not self.find_window(): raise CameraAutomationError("Could not connect to 2nd Look window") # Step 3: Bring window to focus try: logger.debug("Bringing 2nd Look to focus...") self.window.set_focus() time.sleep(0.5) except Exception as e: logger.warning(f"Could not set focus: {e}") # Step 4: Click Record button (red circle at bottom left) logger.info("Clicking Record button...") self._click_record_button() time.sleep(1) # Step 5: Click Trigger Software button logger.info("Clicking Trigger Software button...") self._click_trigger_software_button() time.sleep(self.TRIGGER_WAIT_TIME) # Step 6: Wait for file creation in Recordings directory logger.info(f"Waiting for TIFF file in {self.RECORDING_BASE_DIR}...") captured_file = self._wait_for_recording_file() if not captured_file: logger.error("Timeout waiting for TIFF file creation") raise CameraAutomationError("Timeout waiting for file - capture may have failed") logger.info(f"File detected: {captured_file}") # Step 7: Click Stop button to end recording logger.info("Clicking Stop button...") self._click_stop_button() time.sleep(self.STOP_WAIT_TIME) # Step 8: Copy captured file to output directory logger.info(f"Copying file to output directory: {output_dir}") output_file = self._copy_captured_file(captured_file, output_dir) if output_file: logger.info(f"Successfully captured: {output_file}") self.last_image_path = str(output_file) self.last_capture_time = datetime.now() return str(output_file) else: logger.error("Failed to copy captured file") raise CameraAutomationError("Failed to copy captured file to output directory") except Exception as e: logger.error(f"Capture failed: {e}") return None def get_last_image(self) -> Optional[str]: """ Get the path to the last captured image. Returns: str: Path to last image if available, None otherwise """ if self.last_image_path and Path(self.last_image_path).exists(): logger.debug(f"Returning last image: {self.last_image_path}") return self.last_image_path logger.debug("No valid last image path found") return None def set_button_coordinates(self, record_pos: Tuple[int, int] = None, trigger_pos: Tuple[int, int] = None, stop_pos: Tuple[int, int] = None) -> None: """ Set button coordinates (for calibration). Args: record_pos: (x, y) coordinates for Record button trigger_pos: (x, y) coordinates for Trigger Software button stop_pos: (x, y) coordinates for Stop button """ if record_pos: self.RECORD_BUTTON_POS = record_pos logger.info(f"Record button coordinates set to {record_pos}") if trigger_pos: self.TRIGGER_SOFTWARE_POS = trigger_pos logger.info(f"Trigger Software button coordinates set to {trigger_pos}") if stop_pos: self.STOP_BUTTON_POS = stop_pos logger.info(f"Stop button coordinates set to {stop_pos}") def _click_record_button(self) -> bool: """ Click the Record button (red circle at bottom left). Uses coordinates from RECORD_BUTTON_POS (can be calibrated). Returns: bool: True if click was attempted, False otherwise """ try: if not PYAUTOGUI_AVAILABLE: logger.warning("pyautogui not available - cannot click buttons") return False record_x, record_y = self.RECORD_BUTTON_POS logger.debug(f"Clicking Record button at ({record_x}, {record_y})") pyautogui.click(record_x, record_y) return True except Exception as e: logger.error(f"Error clicking Record button: {e}") return False def _click_trigger_software_button(self) -> bool: """ Click the Trigger Software button in the toolbar. Uses coordinates from TRIGGER_SOFTWARE_POS (can be calibrated). Returns: bool: True if click was attempted, False otherwise """ try: if not PYAUTOGUI_AVAILABLE: logger.warning("pyautogui not available - cannot click buttons") return False trigger_x, trigger_y = self.TRIGGER_SOFTWARE_POS logger.debug(f"Clicking Trigger Software button at ({trigger_x}, {trigger_y})") pyautogui.click(trigger_x, trigger_y) return True except Exception as e: logger.error(f"Error clicking Trigger Software button: {e}") return False def _click_stop_button(self) -> bool: """ Click the Stop button (next to pause at bottom). Uses coordinates from STOP_BUTTON_POS (can be calibrated). Returns: bool: True if click was attempted, False otherwise """ try: if not PYAUTOGUI_AVAILABLE: logger.warning("pyautogui not available - cannot click buttons") return False stop_x, stop_y = self.STOP_BUTTON_POS logger.debug(f"Clicking Stop button at ({stop_x}, {stop_y})") pyautogui.click(stop_x, stop_y) return True except Exception as e: logger.error(f"Error clicking Stop button: {e}") return False def _wait_for_recording_file(self, timeout: int = None) -> Optional[Path]: """ Wait for a TIFF file to be created in the 2nd Look Recordings directory. File pattern: Recording_YYYY-MM-DD_HH_MM_SS/TIFF/Camera/Image_XXXXXX.tif Args: timeout: Maximum time to wait in seconds Returns: Path: Path to created TIFF file, or None if timeout """ timeout = timeout or self.FILE_WATCH_TIMEOUT start_time = time.time() if not self.RECORDING_BASE_DIR.exists(): logger.error(f"Recording directory not found: {self.RECORDING_BASE_DIR}") return None logger.debug(f"Monitoring for TIFF files in: {self.RECORDING_BASE_DIR}") while time.time() - start_time < timeout: try: # Search for TIFF files in subdirectories # Pattern: Recording_*/TIFF/Camera/*.tif tiff_files = list(self.RECORDING_BASE_DIR.glob("*/TIFF/Camera/*.tif")) if tiff_files: # Return the most recently created file newest_file = max(tiff_files, key=lambda p: p.stat().st_mtime) logger.info(f"Found TIFF file: {newest_file}") return newest_file time.sleep(0.5) except Exception as e: logger.warning(f"Error monitoring recording directory: {e}") time.sleep(1) logger.warning(f"No TIFF file created in {self.RECORDING_BASE_DIR} within {timeout} seconds") return None def _copy_captured_file(self, source_file: Path, output_dir: Path) -> Optional[Path]: """ Copy captured TIFF file to output directory. Args: source_file: Path to source TIFF file output_dir: Directory to copy file to Returns: Path: Path to copied file, or None if copy failed """ try: if not source_file.exists(): logger.error(f"Source file not found: {source_file}") return None # Create output filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"2ndlook_capture_{timestamp}.tif" output_file = output_dir / output_filename logger.debug(f"Copying {source_file} to {output_file}") shutil.copy2(source_file, output_file) logger.info(f"File copied successfully: {output_file}") return output_file except Exception as e: logger.error(f"Error copying file: {e}") return None def set_default_save_directory(self, directory: str) -> None: """ Set the default save directory for 2nd Look exports. Args: directory: Path to save directory """ self.default_save_dir = Path(directory) logger.info(f"Default save directory set to: {self.default_save_dir}") def cleanup(self) -> None: """Clean up resources and temporary files.""" try: if self.window: try: # Try to close the app gracefully logger.debug("Closing 2nd Look window...") self.window.close() except Exception as e: logger.debug(f"Could not close window: {e}") # Clean up old temp files (older than 1 hour) if self.temp_dir.exists(): now = time.time() for tiff_file in self.temp_dir.glob("*.tif*"): if now - tiff_file.stat().st_mtime > 3600: # 1 hour try: tiff_file.unlink() logger.debug(f"Cleaned up old file: {tiff_file.name}") except Exception as e: logger.warning(f"Could not delete {tiff_file}: {e}") logger.info("Cleanup completed") except Exception as e: logger.warning(f"Error during cleanup: {e}") class EOSUtilityAutomation(CameraAutomation): """ Automation for EOS Utility DSLR camera application. Handles: - Finding and connecting to EOS Utility window - Clicking the capture button (X: 318, Y: 134) - Monitoring for JPG file creation in Pictures\\EOS-Utility directory - Copying captured file to output directory Pictures directory structure: C:\\Users\\[USERNAME]\\Pictures\\EOS-Utility\\[DATE]\\IMG_[SEQUENCE].JPG Example: C:\\Users\\AIDurian\\Pictures\\EOS-Utility\\2025_12_04\\IMG_0001.JPG """ # Configuration constants PICTURES_BASE_DIR = Path.home() / "Pictures" / "EOS-Utility" CAPTURE_BUTTON_POS = (318, 134) # Click position for capture button FILE_WATCH_TIMEOUT = 10 # seconds to wait for file creation def __init__(self, default_save_dir: str = None): """ Initialize EOS Utility automation. Args: default_save_dir: Default directory where captures should be saved """ super().__init__("EOS Utility", "EOS M50m2") self.default_save_dir = Path(default_save_dir) if default_save_dir else None self.last_image_path = None self.temp_dir = Path(tempfile.gettempdir()) / "dudong_eos" self.temp_dir.mkdir(exist_ok=True) logger.info(f"EOS Utility temp directory: {self.temp_dir}") logger.debug(f"Capture button coordinates: {self.CAPTURE_BUTTON_POS}") def find_window(self) -> bool: """ Find and connect to EOS Utility window. Returns: bool: True if found, False otherwise """ if not PYWINAUTO_AVAILABLE: logger.error("pywinauto not available - cannot find window") return False try: logger.info(f"Searching for '{self.window_title}' window...") # Try to find window by title windows = findwindows.find_windows(title_re=f".*{self.window_title}.*") if windows: logger.info(f"Found {len(windows)} window(s) matching '{self.window_title}'") window_handle = windows[0] try: self.app = Application(backend="uia").connect(handle=window_handle) self.window = self.app.window(handle=window_handle) logger.info(f"Connected to EOS Utility (handle: {window_handle})") return True except Exception as e: logger.warning(f"Could not connect via UIA backend: {e}") try: self.app = Application(backend="uia").connect(top_level_only=False) logger.info("Connected to application using fallback method") return True except Exception as e2: logger.warning(f"Fallback connection also failed: {e2}") return False else: logger.warning(f"No window found matching '{self.window_title}'") return False except Exception as e: logger.error(f"Error finding EOS Utility window: {e}") return False def is_window_open(self) -> bool: """ Check if EOS Utility is open and responsive. Returns: bool: True if open and responsive, False otherwise """ try: if not PYWINAUTO_AVAILABLE: logger.debug("pywinauto not available - cannot check window state") return False windows = findwindows.find_windows(title_re=f".*{self.window_title}.*") if windows: logger.debug("EOS Utility window is open") return True else: logger.debug("EOS Utility window not found") return False except Exception as e: logger.warning(f"Error checking window state: {e}") return False def capture(self, output_dir: str = None) -> Optional[str]: """ Capture image from EOS Utility DSLR camera. Workflow: 1. Bring window to focus 2. Click capture button (X: 318, Y: 134) 3. Wait for JPG file in Pictures\\EOS-Utility\\[DATE] directory 4. Copy captured file to output directory Args: output_dir: Directory to save JPG (uses temp dir if not specified) Returns: str: Path to captured JPG file, or None if capture failed """ output_dir = Path(output_dir) if output_dir else self.temp_dir output_dir.mkdir(parents=True, exist_ok=True) try: logger.info("Starting EOS Utility capture workflow...") # Step 1: Verify window is open if not self.is_window_open(): logger.error("EOS Utility window is not open") raise CameraAutomationError("EOS Utility application not found or not running") # Step 2: Reconnect to window if needed if self.app is None or self.window is None: if not self.find_window(): raise CameraAutomationError("Could not connect to EOS Utility window") # Step 3: Bring window to focus try: logger.debug("Bringing EOS Utility to focus...") self.window.set_focus() time.sleep(0.5) except Exception as e: logger.warning(f"Could not set focus: {e}") # Step 4: Get the latest file timestamp before capture latest_file_before = self._get_latest_capture_file() logger.debug(f"Latest file before capture: {latest_file_before}") # Step 5: Click capture button logger.info("Clicking capture button...") self._click_capture_button() time.sleep(1) # Step 6: Wait for new file creation logger.info(f"Waiting for JPG file in {self.PICTURES_BASE_DIR}...") captured_file = self._wait_for_capture_file(latest_file_before) if not captured_file: logger.error("Timeout waiting for JPG file creation") raise CameraAutomationError("Timeout waiting for file - capture may have failed") logger.info(f"File detected: {captured_file}") # Step 7: Copy captured file to output directory logger.info(f"Copying file to output directory: {output_dir}") output_file = self._copy_captured_file(captured_file, output_dir) if output_file: logger.info(f"Successfully captured: {output_file}") self.last_image_path = str(output_file) self.last_capture_time = datetime.now() return str(output_file) else: logger.error("Failed to copy captured file") raise CameraAutomationError("Failed to copy captured file to output directory") except Exception as e: logger.error(f"Capture failed: {e}") return None def get_last_image(self) -> Optional[str]: """ Get the path to the last captured image. Returns: str: Path to last image if available, None otherwise """ if self.last_image_path and Path(self.last_image_path).exists(): logger.debug(f"Returning last image: {self.last_image_path}") return self.last_image_path logger.debug("No valid last image path found") return None def set_button_coordinates(self, capture_pos: Tuple[int, int] = None) -> None: """ Set capture button coordinates (for calibration). Args: capture_pos: (x, y) coordinates for capture button """ if capture_pos: self.CAPTURE_BUTTON_POS = capture_pos logger.info(f"Capture button coordinates set to {capture_pos}") def _click_capture_button(self) -> bool: """ Click the capture button in EOS Utility. Uses coordinates from CAPTURE_BUTTON_POS (can be calibrated). Returns: bool: True if click was attempted, False otherwise """ try: if not PYAUTOGUI_AVAILABLE: logger.warning("pyautogui not available - cannot click buttons") return False capture_x, capture_y = self.CAPTURE_BUTTON_POS logger.debug(f"Clicking capture button at ({capture_x}, {capture_y})") pyautogui.click(capture_x, capture_y) return True except Exception as e: logger.error(f"Error clicking capture button: {e}") return False def _get_latest_capture_file(self) -> Optional[Path]: """ Get the latest JPG file in the EOS Utility Pictures directory. Returns: Path: Path to latest JPG file, or None if no files exist """ try: if not self.PICTURES_BASE_DIR.exists(): logger.debug(f"EOS Utility Pictures directory not found yet: {self.PICTURES_BASE_DIR}") return None # Search for JPG files recursively jpg_files = list(self.PICTURES_BASE_DIR.glob("**/IMG_*.JPG")) if jpg_files: latest = max(jpg_files, key=lambda p: p.stat().st_mtime) logger.debug(f"Found latest capture file: {latest}") return latest logger.debug("No capture files found in Pictures directory") return None except Exception as e: logger.warning(f"Error getting latest capture file: {e}") return None def _wait_for_capture_file(self, file_before: Optional[Path] = None, timeout: int = None) -> Optional[Path]: """ Wait for a new JPG file to be created in the EOS Utility Pictures directory. File pattern: Pictures\\EOS-Utility\\[DATE]\\IMG_[SEQUENCE].JPG Args: file_before: Previous file to compare against (detect new file) timeout: Maximum time to wait in seconds Returns: Path: Path to newly created JPG file, or None if timeout """ timeout = timeout or self.FILE_WATCH_TIMEOUT start_time = time.time() if not self.PICTURES_BASE_DIR.exists(): logger.debug(f"Creating Pictures directory: {self.PICTURES_BASE_DIR}") return None logger.debug(f"Monitoring for JPG files in: {self.PICTURES_BASE_DIR}") while time.time() - start_time < timeout: try: # Search for JPG files jpg_files = list(self.PICTURES_BASE_DIR.glob("**/IMG_*.JPG")) if jpg_files: # Get the most recently created file newest_file = max(jpg_files, key=lambda p: p.stat().st_mtime) # If we had a previous file, ensure this is a new one if file_before is None or newest_file != file_before: logger.info(f"Found new JPG file: {newest_file}") return newest_file time.sleep(0.5) except Exception as e: logger.warning(f"Error monitoring Pictures directory: {e}") time.sleep(1) logger.warning(f"No new JPG file created in {self.PICTURES_BASE_DIR} within {timeout} seconds") return None def _copy_captured_file(self, source_file: Path, output_dir: Path) -> Optional[Path]: """ Copy captured JPG file to output directory. Args: source_file: Path to source JPG file output_dir: Directory to copy file to Returns: Path: Path to copied file, or None if copy failed """ try: if not source_file.exists(): logger.error(f"Source file not found: {source_file}") return None # Create output filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"eos_capture_{timestamp}.jpg" output_file = output_dir / output_filename logger.debug(f"Copying {source_file} to {output_file}") shutil.copy2(source_file, output_file) logger.info(f"File copied successfully: {output_file}") return output_file except Exception as e: logger.error(f"Error copying file: {e}") return None def set_default_save_directory(self, directory: str) -> None: """ Set the default save directory for EOS Utility exports. Args: directory: Path to save directory """ self.default_save_dir = Path(directory) logger.info(f"Default save directory set to: {self.default_save_dir}") def cleanup(self) -> None: """Clean up resources and temporary files.""" try: if self.window: try: logger.debug("Closing EOS Utility window...") self.window.close() except Exception as e: logger.debug(f"Could not close window: {e}") # Clean up old temp files (older than 1 hour) if self.temp_dir.exists(): now = time.time() for jpg_file in self.temp_dir.glob("*.jpg"): if now - jpg_file.stat().st_mtime > 3600: # 1 hour try: jpg_file.unlink() logger.debug(f"Cleaned up old file: {jpg_file.name}") except Exception as e: logger.warning(f"Could not delete {jpg_file}: {e}") logger.info("Cleanup completed") except Exception as e: logger.warning(f"Error during cleanup: {e}") class AnalyzIRAutomation(CameraAutomation): """ Automation for AnalyzIR thermal camera application (FOTRIC 323). Handles: - Finding and connecting to IR Camera and AnalyzIR Venus windows - Taking snapshot in IR Camera window - Performing automated click sequence to export thermal data - Monitoring for CSV file creation in Pictures\\AnalyzIR directory - Copying captured CSV file to output directory Workflow: 1. Check IR Camera window (SN:0803009169) 2. Left-click (X: 2515, Y: 898) to take snapshot in IR Camera 3. Switch to AnalyzIR Venus window 4. Right-click (X: 567, Y: 311) on latest data (from snapshot) 5. Left-click menu option (X: 694, Y: 450) 6. Left-click (X: 2370, Y: 109) - export/menu option 7. Left-click (X: 963, Y: 396) - confirmation/export 8. Left-click (X: 1713, Y: 1296) - button 9. Left-click (X: 1424, Y: 896) - save location 10. Left-click (X: 2522, Y: 26) - confirm 11. Left-click (X: 1497, Y: 892) - close/finalize File structure: C:\\Users\\[USERNAME]\\Pictures\\AnalyzIR\\YYYY-MM-DD_HHMMSS_CSV\\YYYY-MM-DD_HHMMSS.csv Example: C:\\Users\\AIDurian\\Pictures\\AnalyzIR\\2025-12-04_155903_CSV\\2025-12-04_155903.csv """ # Configuration constants PICTURES_BASE_DIR = Path.home() / "Pictures" / "AnalyzIR" FILE_WATCH_TIMEOUT = 20 # seconds to wait for file creation CLICK_DELAY = 0.3 # delay between clicks # IR Camera window detection IR_CAMERA_WINDOW_TITLE = "IR Camera(SN:0803009169)" ANALYZIR_VENUS_WINDOW_TITLE = "AnalyzIR Venus" # Click coordinates for IR Camera snapshot IR_CAMERA_SNAPSHOT_POS = (2515, 898) # Take snapshot button in IR Camera window # AnalyzIR Venus click sequence ANALYZIR_VENUS_CLICKS = [ (567, 311, "right"), # Right-click on latest data (694, 450, "left"), # Click menu option (2370, 109, "left"), # Export/menu option (963, 396, "left"), # Confirmation/export button (1713, 1296, "left"), # Button (1424, 896, "left"), # Save location (2522, 26, "left"), # Confirm (1497, 892, "left"), # Close/finalize ] def __init__(self, default_save_dir: str = None): """ Initialize AnalyzIR automation. Args: default_save_dir: Default directory where captures should be saved """ super().__init__("AnalyzIR", self.ANALYZIR_VENUS_WINDOW_TITLE) self.default_save_dir = Path(default_save_dir) if default_save_dir else None self.last_csv_path = None self.temp_dir = Path(tempfile.gettempdir()) / "dudong_analyzir" self.temp_dir.mkdir(exist_ok=True) self.ir_camera_window = None logger.info(f"AnalyzIR temp directory: {self.temp_dir}") logger.debug(f"IR Camera window title: {self.IR_CAMERA_WINDOW_TITLE}") logger.debug(f"AnalyzIR Venus window title: {self.ANALYZIR_VENUS_WINDOW_TITLE}") def find_window(self) -> bool: """ Find and connect to AnalyzIR Venus window. Returns: bool: True if found, False otherwise """ if not PYWINAUTO_AVAILABLE: logger.error("pywinauto not available - cannot find window") return False try: logger.info(f"Searching for '{self.ANALYZIR_VENUS_WINDOW_TITLE}' window...") # Try to find AnalyzIR Venus window with multiple patterns windows = [] try: # Try exact match first windows = findwindows.find_windows(title_re=f".*{self.ANALYZIR_VENUS_WINDOW_TITLE}.*") if not windows: # Try partial match windows = findwindows.find_windows(title_re=".*AnalyzIR.*") logger.debug("Using partial 'AnalyzIR' pattern match") except Exception as e: logger.warning(f"Error searching for windows: {e}") if windows: logger.info(f"Found {len(windows)} window(s) matching search criteria") # Find the best match - prioritize exact title match window_handle = None for i, win in enumerate(windows): try: app_temp = Application(backend="uia").connect(handle=win, timeout=2) window_temp = app_temp.window(handle=win) title = window_temp.window_text() if hasattr(window_temp, 'window_text') else "Unknown" logger.debug(f" Window {i}: Handle={win}, Title='{title}'") # Prioritize "AnalyzIR Venus" without extra suffixes/paths if "AnalyzIR Venus" in title and "File" not in title: window_handle = win logger.debug(f" -> Selected as best match (exact title)") break elif window_handle is None: # Keep as fallback if no exact match found window_handle = win except Exception as e: logger.debug(f" Window {i}: Handle={win}, (could not get title: {e})") if window_handle is None: window_handle = win # Use as fallback if window_handle is None: window_handle = windows[0] # Final fallback try: self.app = Application(backend="uia").connect(handle=window_handle) self.window = self.app.window(handle=window_handle) logger.info(f"Connected to AnalyzIR Venus (handle: {window_handle})") except Exception as e: logger.warning(f"Could not connect via UIA backend: {e}") try: self.app = Application(backend="uia").connect(top_level_only=False) logger.info("Connected to application using fallback method") except Exception as e2: logger.warning(f"Fallback connection also failed: {e2}") return False # Also try to find and store IR Camera window with multiple patterns logger.debug(f"Searching for IR Camera window...") ir_windows = [] try: # Try exact match ir_windows = findwindows.find_windows(title_re=f".*{self.IR_CAMERA_WINDOW_TITLE}.*") if not ir_windows: # Try partial match - just "IR Camera" ir_windows = findwindows.find_windows(title_re=".*IR Camera.*") logger.debug("Using partial 'IR Camera' pattern match") except Exception as e: logger.debug(f"Error searching for IR Camera window: {e}") if ir_windows: self.ir_camera_window = ir_windows[0] try: app_temp = Application(backend="uia").connect(handle=self.ir_camera_window, timeout=2) window_temp = app_temp.window(handle=self.ir_camera_window) title = window_temp.window_text() if hasattr(window_temp, 'window_text') else "Unknown" logger.info(f"Found IR Camera window (handle: {self.ir_camera_window}, title: '{title}')") except Exception as e: logger.info(f"Found IR Camera window (handle: {self.ir_camera_window})") else: logger.debug("IR Camera window not found (it may not be open yet)") return True else: logger.warning(f"No window found matching AnalyzIR criteria") logger.debug("Make sure AnalyzIR Venus is open and visible") return False except Exception as e: logger.error(f"Error finding AnalyzIR window: {e}") import traceback logger.debug(traceback.format_exc()) return False def is_window_open(self) -> bool: """ Check if AnalyzIR Venus window is open and responsive. Returns: bool: True if open and responsive, False otherwise """ try: if not PYWINAUTO_AVAILABLE: logger.debug("pywinauto not available - cannot check window state") return False windows = findwindows.find_windows(title_re=f".*{self.ANALYZIR_VENUS_WINDOW_TITLE}.*") if windows: logger.debug("AnalyzIR Venus window is open") return True else: logger.debug("AnalyzIR Venus window not found") return False except Exception as e: logger.warning(f"Error checking window state: {e}") return False def capture(self, output_dir: str = None) -> Optional[str]: """ Capture thermal data from AnalyzIR and export as CSV. Workflow: 1. Find and focus IR Camera window 2. Click close button in IR Camera window 3. Find and focus AnalyzIR Venus window 4. Perform automated click sequence to export data 5. Wait for CSV file creation in Pictures\\AnalyzIR directory 6. Copy captured CSV file to output directory Args: output_dir: Directory to save CSV (uses temp dir if not specified) Returns: str: Path to captured CSV file, or None if capture failed """ output_dir = Path(output_dir) if output_dir else self.temp_dir output_dir.mkdir(parents=True, exist_ok=True) try: logger.info("Starting AnalyzIR capture workflow...") # Step 1: Verify windows are open if not self.is_window_open(): logger.error("AnalyzIR Venus window is not open") raise CameraAutomationError("AnalyzIR Venus application not found or not running") # Step 2: Reconnect to windows if needed if self.app is None or self.window is None: if not self.find_window(): raise CameraAutomationError("Could not connect to AnalyzIR window") # Step 3: Take snapshot in IR Camera window self._take_ir_snapshot() time.sleep(2) # Wait for snapshot to be processed and data to reach AnalyzIR # Step 4: Bring AnalyzIR Venus to focus try: logger.debug("Bringing AnalyzIR Venus to focus...") self.window.set_focus() time.sleep(0.5) # Move mouse to the AnalyzIR window to ensure it's active if PYAUTOGUI_AVAILABLE: # Move to center of expected window area pyautogui.moveTo(1000, 400) time.sleep(0.2) except Exception as e: logger.warning(f"Could not set focus: {e}") # Step 5: Get the latest file timestamp before capture latest_file_before = self._get_latest_csv_file() logger.debug(f"Latest CSV file before capture: {latest_file_before}") # Step 6: Perform automated click sequence to export data logger.info("Performing automated click sequence to export thermal data...") self._perform_export_sequence() # Step 7: Wait for new CSV file creation logger.info(f"Waiting for CSV file in {self.PICTURES_BASE_DIR}...") captured_file = self._wait_for_csv_file(latest_file_before) if not captured_file: logger.error("Timeout waiting for CSV file creation") raise CameraAutomationError("Timeout waiting for CSV file - capture may have failed") logger.info(f"CSV file detected: {captured_file}") # Step 8: Copy captured file to output directory logger.info(f"Copying file to output directory: {output_dir}") output_file = self._copy_captured_file(captured_file, output_dir) if output_file: logger.info(f"Successfully captured: {output_file}") self.last_csv_path = str(output_file) self.last_capture_time = datetime.now() return str(output_file) else: logger.error("Failed to copy captured file") raise CameraAutomationError("Failed to copy captured file to output directory") except Exception as e: logger.error(f"Capture failed: {e}") return None def get_last_image(self) -> Optional[str]: """ Get the path to the last captured CSV file. Returns: str: Path to last CSV file if available, None otherwise """ if self.last_csv_path and Path(self.last_csv_path).exists(): logger.debug(f"Returning last CSV: {self.last_csv_path}") return self.last_csv_path logger.debug("No valid last CSV path found") return None def _take_ir_snapshot(self) -> bool: """ Take a snapshot in the IR Camera window. Steps: 1. Focus/activate IR Camera window 2. Click the snapshot button at coordinates (2515, 898) 3. Wait for snapshot to be processed Returns: bool: True if click was attempted, False otherwise """ try: if not PYAUTOGUI_AVAILABLE: logger.warning("pyautogui not available - cannot click buttons") return False # Step 1: Try to focus IR Camera window if found if self.ir_camera_window: try: logger.debug(f"Focusing IR Camera window (handle: {self.ir_camera_window})...") ir_app = Application(backend="uia").connect(handle=self.ir_camera_window, timeout=2) ir_window = ir_app.window(handle=self.ir_camera_window) # Bring window to foreground and focus ir_window.set_focus() time.sleep(0.3) logger.info("IR Camera window focused") except Exception as e: logger.debug(f"Could not focus IR Camera window: {e}") # Continue anyway - click might still work else: logger.debug("IR Camera window handle not available - clicking anyway") # Step 2: Click snapshot button snapshot_x, snapshot_y = self.IR_CAMERA_SNAPSHOT_POS logger.debug(f"Clicking IR Camera snapshot button at ({snapshot_x}, {snapshot_y})") pyautogui.click(snapshot_x, snapshot_y) time.sleep(0.5) # Increased wait logger.info("Snapshot taken in IR Camera") return True except Exception as e: logger.warning(f"Could not take IR Camera snapshot (non-critical): {e}") return False def _perform_export_sequence(self) -> bool: """ Perform the automated click sequence to export thermal data from AnalyzIR Venus. Sequence: 1. Right-click at (567, 311) on latest snapshot data 2. Left-click menu option at (694, 450) 3. Left-click export/menu at (2370, 109) 4. Left-click confirmation at (963, 396) 5. Left-click button at (1713, 1296) 6. Left-click save location at (1424, 896) 7. Left-click confirm at (2522, 26) 8. Left-click close/finalize at (1497, 892) Returns: bool: True if sequence completed, False otherwise """ try: if not PYAUTOGUI_AVAILABLE: logger.warning("pyautogui not available - cannot perform click sequence") return False for i, (x, y, click_type) in enumerate(self.ANALYZIR_VENUS_CLICKS, 1): logger.debug(f"Click {i}/{len(self.ANALYZIR_VENUS_CLICKS)}: {click_type}-click at ({x}, {y})") if click_type == "right": pyautogui.click(x, y, button='right') else: # left pyautogui.click(x, y) time.sleep(self.CLICK_DELAY) logger.info("Export sequence completed successfully") return True except Exception as e: logger.error(f"Error performing export sequence: {e}") return False def _get_latest_csv_file(self) -> Optional[Path]: """ Get the latest CSV file in the AnalyzIR Pictures directory. Returns: Path: Path to latest CSV file, or None if no files exist """ try: if not self.PICTURES_BASE_DIR.exists(): logger.debug(f"AnalyzIR Pictures directory not found yet: {self.PICTURES_BASE_DIR}") return None # Search for CSV files in subdirectories (pattern: YYYY-MM-DD_HHMMSS_CSV/*.csv) csv_files = list(self.PICTURES_BASE_DIR.glob("*_CSV/*.csv")) if csv_files: latest = max(csv_files, key=lambda p: p.stat().st_mtime) logger.debug(f"Found latest CSV file: {latest}") return latest logger.debug("No CSV files found in Pictures directory") return None except Exception as e: logger.warning(f"Error getting latest CSV file: {e}") return None def _wait_for_csv_file(self, file_before: Optional[Path] = None, timeout: int = None) -> Optional[Path]: """ Wait for a new CSV file to be created in the AnalyzIR Pictures directory. File pattern: Pictures\\AnalyzIR\\YYYY-MM-DD_HHMMSS_CSV\\YYYY-MM-DD_HHMMSS.csv Args: file_before: Previous file to compare against (detect new file) timeout: Maximum time to wait in seconds Returns: Path: Path to newly created CSV file, or None if timeout """ timeout = timeout or self.FILE_WATCH_TIMEOUT start_time = time.time() if not self.PICTURES_BASE_DIR.exists(): logger.debug(f"Creating Pictures/AnalyzIR directory: {self.PICTURES_BASE_DIR}") self.PICTURES_BASE_DIR.mkdir(parents=True, exist_ok=True) logger.debug(f"Monitoring for CSV files in: {self.PICTURES_BASE_DIR}") while time.time() - start_time < timeout: try: # Search for CSV files in subdirectories csv_files = list(self.PICTURES_BASE_DIR.glob("*_CSV/*.csv")) if csv_files: # Get the most recently created file newest_file = max(csv_files, key=lambda p: p.stat().st_mtime) # If we had a previous file, ensure this is a new one if file_before is None or newest_file != file_before: logger.info(f"Found new CSV file: {newest_file}") return newest_file time.sleep(0.5) except Exception as e: logger.warning(f"Error monitoring AnalyzIR directory: {e}") time.sleep(1) logger.warning(f"No new CSV file created in {self.PICTURES_BASE_DIR} within {timeout} seconds") return None def _copy_captured_file(self, source_file: Path, output_dir: Path) -> Optional[Path]: """ Copy captured CSV file to output directory. Args: source_file: Path to source CSV file output_dir: Directory to copy file to Returns: Path: Path to copied file, or None if copy failed """ try: if not source_file.exists(): logger.error(f"Source file not found: {source_file}") return None # Create output filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"analyzir_thermal_{timestamp}.csv" output_file = output_dir / output_filename logger.debug(f"Copying {source_file} to {output_file}") shutil.copy2(source_file, output_file) logger.info(f"File copied successfully: {output_file}") return output_file except Exception as e: logger.error(f"Error copying file: {e}") return None def set_default_save_directory(self, directory: str) -> None: """ Set the default save directory for AnalyzIR exports. Args: directory: Path to save directory """ self.default_save_dir = Path(directory) logger.info(f"Default save directory set to: {self.default_save_dir}") def cleanup(self) -> None: """Clean up resources and temporary files.""" try: if self.window: try: logger.debug("Closing AnalyzIR window...") self.window.close() except Exception as e: logger.debug(f"Could not close window: {e}") # Clean up old temp files (older than 1 hour) if self.temp_dir.exists(): now = time.time() for csv_file in self.temp_dir.glob("*.csv"): if now - csv_file.stat().st_mtime > 3600: # 1 hour try: csv_file.unlink() logger.debug(f"Cleaned up old file: {csv_file.name}") except Exception as e: logger.warning(f"Could not delete {csv_file}: {e}") logger.info("Cleanup completed") except Exception as e: logger.warning(f"Error during cleanup: {e}") # Debug utility function def debug_analyzir_windows(): """ Debug function to find and display all open windows that match AnalyzIR patterns. Useful for troubleshooting window detection issues. """ if not PYWINAUTO_AVAILABLE: print("pywinauto not available") return print("\n" + "=" * 70) print("AnalyzIR Window Detection Debug") print("=" * 70) try: # Find all windows print("\nSearching for AnalyzIR-related windows...") patterns = [ ("AnalyzIR Venus", ".*AnalyzIR Venus.*"), ("AnalyzIR (any)", ".*AnalyzIR.*"), ("IR Camera (exact)", ".*IR Camera\\(SN:0803009169\\).*"), ("IR Camera (partial)", ".*IR Camera.*"), ] for pattern_name, pattern in patterns: print(f"\nPattern: {pattern_name}") print(f"Regex: {pattern}") try: windows = findwindows.find_windows(title_re=pattern) if windows: print(f" Found {len(windows)} window(s):") for i, win_handle in enumerate(windows): try: app = Application(backend="uia").connect(handle=win_handle, timeout=1) window = app.window(handle=win_handle) title = window.window_text() if hasattr(window, 'window_text') else "Unknown" print(f" {i+1}. Handle: {win_handle}, Title: '{title}'") except Exception as e: print(f" {i+1}. Handle: {win_handle}, (could not get title)") else: print(" No windows found") except Exception as e: print(f" Error: {e}") print("\n" + "=" * 70) except Exception as e: print(f"Error during debug: {e}") import traceback traceback.print_exc() # Factory function for creating camera automation instances def create_camera_automation(camera_type: str, **kwargs) -> Optional[CameraAutomation]: """ Factory function to create camera automation instances. Args: camera_type: Type of camera ("2nd_look", "eos_utility", "analyzir") **kwargs: Additional arguments passed to the automation class Returns: CameraAutomation: Automation instance, or None if type not recognized """ camera_type = camera_type.lower().strip() if camera_type in ("2nd_look", "2ndlook", "multispectral"): return SecondLookAutomation(**kwargs) elif camera_type in ("eos_utility", "eosutility", "dslr"): return EOSUtilityAutomation(**kwargs) elif camera_type in ("analyzir", "thermal", "fotric"): return AnalyzIRAutomation(**kwargs) else: logger.error(f"Unknown camera type: {camera_type}") return None