import sys import time from PyQt5 import QtWidgets, QtGui, QtCore from PyQt5.QtCore import QThread, pyqtSignal from pypylon import pylon import cv2 import numpy as np from camera_window import Ui_MainWindow class CameraThread(QThread): update_image_signal = pyqtSignal(np.ndarray) # 訊號:傳遞影像數據 def __init__(self, camera, exposure_time=5000, continuous=False): super().__init__() self.camera = camera self.exposure_time = exposure_time self.continuous = continuous self.running = True # 控制執行狀態 def run(self): """影像擷取執行緒""" try: if not self.camera or not self.camera.IsOpen(): return # 設定曝光時間 self.camera.ExposureTime.SetValue(float(self.exposure_time)) if not self.continuous: # 單張擷取 self.camera.StartGrabbing(1) else: # 連續擷取 self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly) while self.running and self.camera.IsGrabbing(): grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException) if grab_result.GrabSucceeded(): self.update_image_signal.emit(grab_result.Array) # 傳遞影像數據到 GUI grab_result.Release() if not self.continuous: # 單張擷取結束 break time.sleep(0.05) # 限制擷取速度,避免 CPU 過載 except Exception as e: print(f"影像擷取錯誤: {e}") def stop(self): """停止執行緒""" self.running = False if self.camera and self.camera.IsGrabbing(): self.camera.StopGrabbing() self.quit() self.wait() # 確保執行緒完全停止 class Camera(QtWidgets.QMainWindow, Ui_MainWindow): def __init__(self): super(Camera, self).__init__() self.setupUi(self) # 連接按鈕事件 self.bt_camera_connect.clicked.connect(self.connect_camera) self.bt_OneShot.clicked.connect(self.one_shot_capture) self.bt_KeetShot.clicked.connect(self.keep_shot_capture) self.bt_camera_close.clicked.connect(self.close_camera) # ⚠️ 使用 QLineEdit 取代 QTextEdit,並限制只能輸入數字 self.Ex_time.setValidator(QtGui.QIntValidator(1, 1000000, self)) # 限制 1~1000000 微秒 self.Ex_time.textChanged.connect(self.update_exposure_time) # 監聽輸入變更 # 初始化變數 self.camera = None self.camera_thread = None self.current_image = None self.latest_image = None # 用於存儲最新影像 self.last_exposure_time = None # 記錄最後的曝光時間,避免重複更新 def connect_camera(self): """連接相機""" try: self.camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice()) self.camera.Open() print("相機連線成功") return True except Exception as e: print(f"相機連線失敗: {e}") return False def get_exposure_time(self): """取得曝光時間,若為空則使用預設值""" exposure_time_text = self.Ex_time.text().strip() if exposure_time_text.isdigit(): return int(exposure_time_text) self.statusbar.showMessage("請輸入有效的曝光時間(整數),使用預設值 5000 微秒") return 5000 # 預設曝光時間 def update_exposure_time(self): """當曝光時間變更時,立即更新相機設定並重新擷取影像""" if not (self.camera and self.camera.IsOpen()): return try: new_exposure_time = self.get_exposure_time() # 確保曝光時間在相機允許的範圍內 min_exp = self.camera.ExposureTime.GetMin() max_exp = self.camera.ExposureTime.GetMax() if new_exposure_time < min_exp or new_exposure_time > max_exp: self.statusbar.showMessage(f"曝光時間超出範圍 ({min_exp} ~ {max_exp}),請輸入有效值") return # 避免重複設定相同的曝光時間 if new_exposure_time == self.last_exposure_time: return self.last_exposure_time = new_exposure_time self.camera.ExposureTime.SetValue(float(new_exposure_time)) # 設定曝光時間 self.statusbar.showMessage(f"曝光時間更新為 {new_exposure_time} 微秒") # 重新擷取影像 if self.camera_thread and self.camera_thread.isRunning(): self.stop_keep_shot() self.keep_shot_capture() else: self.one_shot_capture() except Exception as e: self.statusbar.showMessage(f"設定曝光時間失敗: {e}") def one_shot_capture(self): """單張擷取""" if not (self.camera and self.camera.IsOpen()): self.statusbar.showMessage("請先連接相機") return self.stop_keep_shot() exposure_time = self.get_exposure_time() self.camera_thread = CameraThread(self.camera, exposure_time, continuous=False) self.camera_thread.update_image_signal.connect(self.process_image) self.camera_thread.start() def keep_shot_capture(self): """連續擷取""" if not (self.camera and self.camera.IsOpen()): self.statusbar.showMessage("請先連接相機") return # 檢查 `camera_thread` 是否已存在並運行 if self.camera_thread and self.camera_thread.isRunning(): self.stop_keep_shot() self.statusbar.showMessage("停止連續取像") else: exposure_time = self.get_exposure_time() self.camera_thread = CameraThread(self.camera, exposure_time, continuous=True) self.camera_thread.update_image_signal.connect(self.process_image) self.camera_thread.start() self.statusbar.showMessage("開始連續取像") def stop_keep_shot(self): """停止連續擷取""" if self.camera_thread: self.camera_thread.stop() self.camera_thread.wait() # 確保執行緒完全終止 self.camera_thread = None def process_image(self, image): """處理擷取的影像""" self.current_image = image self.display_original_image() def display_original_image(self): """顯示原始影像""" if self.current_image is not None: image_bgr = cv2.cvtColor(self.current_image, cv2.COLOR_BayerBG2BGR) if len( self.current_image.shape) == 2 else self.current_image height, width, channel = image_bgr.shape bytes_per_line = 3 * width qimage = QtGui.QImage(image_bgr.data, width, height, bytes_per_line, QtGui.QImage.Format_BGR888) pixmap = QtGui.QPixmap.fromImage(qimage).scaled(self.label.size(), QtCore.Qt.KeepAspectRatio) self.label.setPixmap(pixmap) def close_camera(self): """關閉相機""" if self.camera and self.camera.IsOpen(): self.stop_keep_shot() # 確保先停止擷取 self.camera.Close() self.statusbar.showMessage("相機已關閉") self.camera = None def closeEvent(self, event): """關閉程式時確保相機與執行緒完全停止""" self.stop_keep_shot() # 確保相機執行緒已經停止 self.close_camera() # 確保相機關閉 event.accept() if __name__ == "__main__": app = QtWidgets.QApplication(sys.argv) window = Camera() window.show() sys.exit(app.exec_())