AP/camera/camera_process.py

55 lines
2.1 KiB
Python
Raw Permalink Normal View History

2025-03-10 11:30:13 +08:00
import time
import numpy as np
import cv2
import multiprocessing
from pypylon import pylon
2025-03-10 21:06:08 +08:00
from read_ini import ConfigReader
2025-03-10 11:30:13 +08:00
class CameraProcess(multiprocessing.Process):
""" 相機擷取獨立進程 """
def __init__(self, image_queue, exposure_time=20000):
super(CameraProcess, self).__init__()
self.image_queue = image_queue
2025-03-10 21:06:08 +08:00
self.config_reader = ConfigReader() # ✅ 創建 `ConfigReader` 物件
self.exposure_time = exposure_time if exposure_time is not None else self.config_reader.get_exposure_time()
2025-03-10 11:30:13 +08:00
self.running = multiprocessing.Value('b', True) # 控制進程是否運行
self.camera = None
def connect_camera(self):
""" 連接相機 """
try:
self.camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
self.camera.Open()
self.camera.ExposureTime.SetValue(float(self.exposure_time))
2025-03-10 21:06:08 +08:00
print(f"相機連線成功,曝光時間: {self.exposure_time} 微秒")
2025-03-10 11:30:13 +08:00
return True
except Exception as e:
print(f"相機連線失敗: {e}")
return False
def run(self):
""" 進程啟動後執行影像擷取 """
if not self.connect_camera():
return
self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
while self.running.value and self.camera.IsGrabbing():
2025-03-10 21:06:08 +08:00
grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
2025-03-10 11:30:13 +08:00
if grab_result.GrabSucceeded():
image = grab_result.Array # 取得影像數據
if not self.image_queue.full():
self.image_queue.put(image) # 將影像放入 Queue
grab_result.Release()
time.sleep(0.05) # 控制擷取速度,避免 CPU 過載
self.camera.Close()
print("相機關閉")
def stop(self):
""" 停止相機擷取進程 """
2025-03-10 13:45:43 +08:00
self.running.value = False # 設定為 False讓 while 迴圈停止
self.terminate() # 強制終止進程
print("相機擷取進程已終止")