AP/camera/camera_process.py
2025-03-10 21:06:08 +08:00

55 lines
2.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import numpy as np
import cv2
import multiprocessing
from pypylon import pylon
from read_ini import ConfigReader
class CameraProcess(multiprocessing.Process):
""" 相機擷取獨立進程 """
def __init__(self, image_queue, exposure_time=20000):
super(CameraProcess, self).__init__()
self.image_queue = image_queue
self.config_reader = ConfigReader() # ✅ 創建 `ConfigReader` 物件
self.exposure_time = exposure_time if exposure_time is not None else self.config_reader.get_exposure_time()
self.running = multiprocessing.Value('b', True) # 控制進程是否運行
self.camera = None
def connect_camera(self):
""" 連接相機 """
try:
self.camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
self.camera.Open()
self.camera.ExposureTime.SetValue(float(self.exposure_time))
print(f"相機連線成功,曝光時間: {self.exposure_time} 微秒")
return True
except Exception as e:
print(f"相機連線失敗: {e}")
return False
def run(self):
""" 進程啟動後執行影像擷取 """
if not self.connect_camera():
return
self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
while self.running.value and self.camera.IsGrabbing():
grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grab_result.GrabSucceeded():
image = grab_result.Array # 取得影像數據
if not self.image_queue.full():
self.image_queue.put(image) # 將影像放入 Queue
grab_result.Release()
time.sleep(0.05) # 控制擷取速度,避免 CPU 過載
self.camera.Close()
print("相機關閉")
def stop(self):
""" 停止相機擷取進程 """
self.running.value = False # 設定為 False讓 while 迴圈停止
self.terminate() # 強制終止進程
print("相機擷取進程已終止")