55 lines
2.1 KiB
Python
55 lines
2.1 KiB
Python
import time
|
||
import numpy as np
|
||
import cv2
|
||
import multiprocessing
|
||
from pypylon import pylon
|
||
|
||
from read_ini import ConfigReader
|
||
|
||
class CameraProcess(multiprocessing.Process):
|
||
""" 相機擷取獨立進程 """
|
||
def __init__(self, image_queue, exposure_time=20000):
|
||
super(CameraProcess, self).__init__()
|
||
self.image_queue = image_queue
|
||
self.config_reader = ConfigReader() # ✅ 創建 `ConfigReader` 物件
|
||
self.exposure_time = exposure_time if exposure_time is not None else self.config_reader.get_exposure_time()
|
||
self.running = multiprocessing.Value('b', True) # 控制進程是否運行
|
||
self.camera = None
|
||
|
||
def connect_camera(self):
|
||
""" 連接相機 """
|
||
try:
|
||
self.camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
|
||
self.camera.Open()
|
||
self.camera.ExposureTime.SetValue(float(self.exposure_time))
|
||
print(f"相機連線成功,曝光時間: {self.exposure_time} 微秒")
|
||
return True
|
||
except Exception as e:
|
||
print(f"相機連線失敗: {e}")
|
||
return False
|
||
|
||
def run(self):
|
||
""" 進程啟動後執行影像擷取 """
|
||
if not self.connect_camera():
|
||
return
|
||
|
||
self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
|
||
|
||
while self.running.value and self.camera.IsGrabbing():
|
||
grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
|
||
if grab_result.GrabSucceeded():
|
||
image = grab_result.Array # 取得影像數據
|
||
if not self.image_queue.full():
|
||
self.image_queue.put(image) # 將影像放入 Queue
|
||
grab_result.Release()
|
||
time.sleep(0.05) # 控制擷取速度,避免 CPU 過載
|
||
|
||
self.camera.Close()
|
||
print("相機關閉")
|
||
|
||
def stop(self):
|
||
""" 停止相機擷取進程 """
|
||
self.running.value = False # 設定為 False,讓 while 迴圈停止
|
||
self.terminate() # 強制終止進程
|
||
print("相機擷取進程已終止")
|