205 lines
7.7 KiB
Python
205 lines
7.7 KiB
Python
import sys
|
||
import time
|
||
from PyQt5 import QtWidgets, QtGui, QtCore
|
||
from PyQt5.QtCore import QThread, pyqtSignal
|
||
from pypylon import pylon
|
||
import cv2
|
||
import numpy as np
|
||
from camera_window import Ui_MainWindow
|
||
|
||
class CameraThread(QThread):
|
||
update_image_signal = pyqtSignal(np.ndarray) # 訊號:傳遞影像數據
|
||
|
||
def __init__(self, camera, exposure_time=5000, continuous=False):
|
||
super().__init__()
|
||
self.camera = camera
|
||
self.exposure_time = exposure_time
|
||
self.continuous = continuous
|
||
self.running = True # 控制執行狀態
|
||
|
||
def run(self):
|
||
"""影像擷取執行緒"""
|
||
try:
|
||
if not self.camera or not self.camera.IsOpen():
|
||
return
|
||
|
||
# 設定曝光時間
|
||
self.camera.ExposureTime.SetValue(float(self.exposure_time))
|
||
|
||
if not self.continuous: # 單張擷取
|
||
self.camera.StartGrabbing(1)
|
||
else: # 連續擷取
|
||
self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
|
||
|
||
while self.running and self.camera.IsGrabbing():
|
||
grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
|
||
if grab_result.GrabSucceeded():
|
||
self.update_image_signal.emit(grab_result.Array) # 傳遞影像數據到 GUI
|
||
grab_result.Release()
|
||
|
||
if not self.continuous: # 單張擷取結束
|
||
break
|
||
time.sleep(0.05) # 限制擷取速度,避免 CPU 過載
|
||
|
||
except Exception as e:
|
||
print(f"影像擷取錯誤: {e}")
|
||
|
||
def stop(self):
|
||
"""停止執行緒"""
|
||
self.running = False
|
||
if self.camera and self.camera.IsGrabbing():
|
||
self.camera.StopGrabbing()
|
||
self.quit()
|
||
self.wait() # 確保執行緒完全停止
|
||
|
||
|
||
class Camera(QtWidgets.QMainWindow, Ui_MainWindow):
|
||
def __init__(self):
|
||
super(Camera, self).__init__()
|
||
self.setupUi(self)
|
||
|
||
# 連接按鈕事件
|
||
self.bt_camera_connect.clicked.connect(self.connect_camera)
|
||
self.bt_OneShot.clicked.connect(self.one_shot_capture)
|
||
self.bt_KeetShot.clicked.connect(self.keep_shot_capture)
|
||
self.bt_camera_close.clicked.connect(self.close_camera)
|
||
|
||
# ⚠️ 使用 QLineEdit 取代 QTextEdit,並限制只能輸入數字
|
||
self.Ex_time.setValidator(QtGui.QIntValidator(1, 1000000, self)) # 限制 1~1000000 微秒
|
||
self.Ex_time.textChanged.connect(self.update_exposure_time) # 監聽輸入變更
|
||
|
||
# 初始化變數
|
||
self.camera = None
|
||
self.camera_thread = None
|
||
self.current_image = None
|
||
self.latest_image = None # 用於存儲最新影像
|
||
self.last_exposure_time = None # 記錄最後的曝光時間,避免重複更新
|
||
|
||
def connect_camera(self):
|
||
"""連接相機"""
|
||
try:
|
||
self.camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
|
||
self.camera.Open()
|
||
print("相機連線成功")
|
||
return True
|
||
except Exception as e:
|
||
print(f"相機連線失敗: {e}")
|
||
return False
|
||
|
||
def get_exposure_time(self):
|
||
"""取得曝光時間,若為空則使用預設值"""
|
||
exposure_time_text = self.Ex_time.text().strip()
|
||
|
||
if exposure_time_text.isdigit():
|
||
return int(exposure_time_text)
|
||
|
||
self.statusbar.showMessage("請輸入有效的曝光時間(整數),使用預設值 5000 微秒")
|
||
return 5000 # 預設曝光時間
|
||
|
||
def update_exposure_time(self):
|
||
"""當曝光時間變更時,立即更新相機設定並重新擷取影像"""
|
||
if not (self.camera and self.camera.IsOpen()):
|
||
return
|
||
|
||
try:
|
||
new_exposure_time = self.get_exposure_time()
|
||
|
||
# 確保曝光時間在相機允許的範圍內
|
||
min_exp = self.camera.ExposureTime.GetMin()
|
||
max_exp = self.camera.ExposureTime.GetMax()
|
||
|
||
if new_exposure_time < min_exp or new_exposure_time > max_exp:
|
||
self.statusbar.showMessage(f"曝光時間超出範圍 ({min_exp} ~ {max_exp}),請輸入有效值")
|
||
return
|
||
|
||
# 避免重複設定相同的曝光時間
|
||
if new_exposure_time == self.last_exposure_time:
|
||
return
|
||
|
||
self.last_exposure_time = new_exposure_time
|
||
self.camera.ExposureTime.SetValue(float(new_exposure_time)) # 設定曝光時間
|
||
self.statusbar.showMessage(f"曝光時間更新為 {new_exposure_time} 微秒")
|
||
|
||
# 重新擷取影像
|
||
if self.camera_thread and self.camera_thread.isRunning():
|
||
self.stop_keep_shot()
|
||
self.keep_shot_capture()
|
||
else:
|
||
self.one_shot_capture()
|
||
|
||
except Exception as e:
|
||
self.statusbar.showMessage(f"設定曝光時間失敗: {e}")
|
||
|
||
def one_shot_capture(self):
|
||
"""單張擷取"""
|
||
if not (self.camera and self.camera.IsOpen()):
|
||
self.statusbar.showMessage("請先連接相機")
|
||
return
|
||
|
||
self.stop_keep_shot()
|
||
exposure_time = self.get_exposure_time()
|
||
self.camera_thread = CameraThread(self.camera, exposure_time, continuous=False)
|
||
self.camera_thread.update_image_signal.connect(self.process_image)
|
||
self.camera_thread.start()
|
||
|
||
def keep_shot_capture(self):
|
||
"""連續擷取"""
|
||
if not (self.camera and self.camera.IsOpen()):
|
||
self.statusbar.showMessage("請先連接相機")
|
||
return
|
||
|
||
# 檢查 `camera_thread` 是否已存在並運行
|
||
if self.camera_thread and self.camera_thread.isRunning():
|
||
self.stop_keep_shot()
|
||
self.statusbar.showMessage("停止連續取像")
|
||
else:
|
||
exposure_time = self.get_exposure_time()
|
||
self.camera_thread = CameraThread(self.camera, exposure_time, continuous=True)
|
||
self.camera_thread.update_image_signal.connect(self.process_image)
|
||
self.camera_thread.start()
|
||
self.statusbar.showMessage("開始連續取像")
|
||
|
||
def stop_keep_shot(self):
|
||
"""停止連續擷取"""
|
||
if self.camera_thread:
|
||
self.camera_thread.stop()
|
||
self.camera_thread.wait() # 確保執行緒完全終止
|
||
self.camera_thread = None
|
||
|
||
def process_image(self, image):
|
||
"""處理擷取的影像"""
|
||
self.current_image = image
|
||
self.display_original_image()
|
||
|
||
def display_original_image(self):
|
||
"""顯示原始影像"""
|
||
if self.current_image is not None:
|
||
image_bgr = cv2.cvtColor(self.current_image, cv2.COLOR_BayerBG2BGR) if len(
|
||
self.current_image.shape) == 2 else self.current_image
|
||
height, width, channel = image_bgr.shape
|
||
bytes_per_line = 3 * width
|
||
qimage = QtGui.QImage(image_bgr.data, width, height, bytes_per_line, QtGui.QImage.Format_BGR888)
|
||
pixmap = QtGui.QPixmap.fromImage(qimage).scaled(self.label.size(), QtCore.Qt.KeepAspectRatio)
|
||
self.label.setPixmap(pixmap)
|
||
|
||
def close_camera(self):
|
||
"""關閉相機"""
|
||
if self.camera and self.camera.IsOpen():
|
||
self.stop_keep_shot() # 確保先停止擷取
|
||
self.camera.Close()
|
||
self.statusbar.showMessage("相機已關閉")
|
||
self.camera = None
|
||
|
||
def closeEvent(self, event):
|
||
"""關閉程式時確保相機與執行緒完全停止"""
|
||
self.stop_keep_shot() # 確保相機執行緒已經停止
|
||
self.close_camera() # 確保相機關閉
|
||
event.accept()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app = QtWidgets.QApplication(sys.argv)
|
||
window = Camera()
|
||
window.show()
|
||
sys.exit(app.exec_())
|