import time import numpy as np import cv2 import multiprocessing from pypylon import pylon from read_ini import ConfigReader class CameraProcess(multiprocessing.Process): """ 相機擷取獨立進程 """ def __init__(self, image_queue, exposure_time=20000): super(CameraProcess, self).__init__() self.image_queue = image_queue self.config_reader = ConfigReader() # ✅ 創建 `ConfigReader` 物件 self.exposure_time = exposure_time if exposure_time is not None else self.config_reader.get_exposure_time() self.running = multiprocessing.Value('b', True) # 控制進程是否運行 self.camera = None def connect_camera(self): """ 連接相機 """ try: self.camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice()) self.camera.Open() self.camera.ExposureTime.SetValue(float(self.exposure_time)) print(f"相機連線成功,曝光時間: {self.exposure_time} 微秒") return True except Exception as e: print(f"相機連線失敗: {e}") return False def run(self): """ 進程啟動後執行影像擷取 """ if not self.connect_camera(): return self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly) while self.running.value and self.camera.IsGrabbing(): grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException) if grab_result.GrabSucceeded(): image = grab_result.Array # 取得影像數據 if not self.image_queue.full(): self.image_queue.put(image) # 將影像放入 Queue grab_result.Release() time.sleep(0.05) # 控制擷取速度,避免 CPU 過載 self.camera.Close() print("相機關閉") def stop(self): """ 停止相機擷取進程 """ self.running.value = False # 設定為 False,讓 while 迴圈停止 self.terminate() # 強制終止進程 print("相機擷取進程已終止")