154 lines
6.2 KiB
Python
154 lines
6.2 KiB
Python
import sys
|
||
import cv2
|
||
import numpy as np
|
||
import multiprocessing
|
||
from PyQt5 import QtWidgets, QtGui, QtCore
|
||
from PyQt5.QtCore import QTimer
|
||
from ultralytics import YOLO
|
||
|
||
from Detection_window import Ui_MainWindow
|
||
from camera.camera_process import CameraProcess
|
||
from read_ini import ConfigReader
|
||
from log_handler import LogHandler
|
||
from Yolo_predict import YoloPredict
|
||
|
||
class DetectionApp(QtWidgets.QMainWindow, Ui_MainWindow):
|
||
def __init__(self):
|
||
super(DetectionApp, self).__init__()
|
||
self.setupUi(self)
|
||
|
||
self.log_handler = LogHandler() # ✅ 建立 LogHandler 物件
|
||
self.log_handler.write_log("程式啟動") # ✅ 寫入 log: 程式啟動
|
||
|
||
# ✅ 讀取 `exposure_time`
|
||
self.config_reader = ConfigReader()
|
||
self.exposure_time = self.config_reader.get_exposure_time()
|
||
|
||
# ✅ 先初始化 image_queue,再啟動相機擷取
|
||
self.image_queue = multiprocessing.Queue(maxsize=1)
|
||
self.camera_process = None # 相機進程尚未啟動
|
||
self.latest_image = None # 存儲最新影像
|
||
|
||
# ✅ 載入 YOLO模型
|
||
try:
|
||
self.model = YOLO("model/best.pt")
|
||
self.log_handler.write_log("YOLO 模型載入成功")
|
||
except Exception as e:
|
||
self.log_handler.write_log(f"⚠️ YOLO 模型載入失敗: {e}")
|
||
self.model = None
|
||
|
||
# ✅ 連接按鈕事件
|
||
self.bt_KeepShot.clicked.connect(self.KeepShot)
|
||
self.bt_StopKeepShot.clicked.connect(self.StopKeepShot)
|
||
self.bt_detection.clicked.connect(self.detection)
|
||
|
||
# 鎖定檢測按鈕及停止相機連續取像
|
||
self.bt_detection.setEnabled(False)
|
||
self.bt_StopKeepShot.setEnabled(False)
|
||
|
||
# ✅ 設定 QTimer,每 100ms 更新影像
|
||
self.timer = QTimer(self)
|
||
self.timer.timeout.connect(self.update_view_origin)
|
||
self.timer.start(100) # 每 100ms 更新一次影像
|
||
|
||
def KeepShot(self):
|
||
""" 啟動相機擷取 """
|
||
if self.camera_process is None or not self.camera_process.is_alive():
|
||
self.StopKeepShot() # 確保先停止舊的 Process,避免衝突
|
||
|
||
# ✅ 重新建立 Queue,避免殘留影像影響新擷取
|
||
self.image_queue = multiprocessing.Queue(maxsize=1)
|
||
|
||
# ✅ 使用 `exposure_time` 啟動相機
|
||
self.camera_process = CameraProcess(self.image_queue, exposure_time=self.exposure_time)
|
||
self.camera_process.start()
|
||
self.log_handler.write_log("相機啟動") # ✅ 寫入 log: 相機啟動
|
||
self.statusbar.showMessage(f"開始擷取影像 (曝光時間: {self.exposure_time} 微秒)")
|
||
|
||
# 按鈕setEnable
|
||
self.bt_KeepShot.setEnabled(False)
|
||
self.bt_StopKeepShot.setEnabled(True)
|
||
self.bt_detection.setEnabled(True)
|
||
else:
|
||
print("相機已經在擷取")
|
||
|
||
def StopKeepShot(self):
|
||
""" 停止相機擷取 """
|
||
if self.camera_process and self.camera_process.is_alive():
|
||
self.camera_process.stop() # 停止擷取
|
||
self.camera_process.join() # 等待進程完全結束
|
||
self.camera_process = None
|
||
print("已停止影像擷取")
|
||
self.log_handler.write_log("相機停止") # ✅ 寫入 log: 相機停止
|
||
|
||
# 按鈕setEnable
|
||
self.bt_KeepShot.setEnabled(True)
|
||
self.bt_StopKeepShot.setEnabled(False)
|
||
self.bt_detection.setEnabled(False)
|
||
# ✅ 清空 Queue,確保新擷取不會讀取到舊影像
|
||
while not self.image_queue.empty():
|
||
try:
|
||
self.image_queue.get_nowait()
|
||
except:
|
||
break
|
||
|
||
def update_view_origin(self):
|
||
""" 從 Queue 獲取影像並顯示在 QLabel (view_origin) 上 """
|
||
if not self.image_queue.empty():
|
||
image = self.image_queue.get() # 取得最新影像
|
||
self.latest_image = image # 存儲最新影像
|
||
self.display_image(image, self.view_origin)
|
||
|
||
def display_image(self, image, label):
|
||
""" 顯示影像到 QLabel """
|
||
try:
|
||
image_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) if len(image.shape) == 2 else image
|
||
height, width, channel = image_bgr.shape
|
||
bytes_per_line = 3 * width
|
||
qimage = QtGui.QImage(image_bgr.data, width, height, bytes_per_line, QtGui.QImage.Format_BGR888)
|
||
pixmap = QtGui.QPixmap.fromImage(qimage).scaled(label.size(), QtCore.Qt.KeepAspectRatio)
|
||
label.setPixmap(pixmap)
|
||
except Exception as e:
|
||
self.log_handler.write_log(f"⚠️ 顯示影像時發生錯誤: {e}")
|
||
|
||
def detection(self):
|
||
""" 啟動 YOLO 推論執行緒 """
|
||
if self.latest_image is None:
|
||
self.log_handler.write_log("⚠️ 無影像可進行推論")
|
||
return
|
||
|
||
# ✅ 啟動 YOLO 推論執行緒
|
||
self.yolo_thread = YoloPredict(
|
||
image=self.latest_image,
|
||
model_path="model/best.pt",
|
||
callback=self.on_detection_complete # ✅ 設定回呼函數
|
||
)
|
||
self.yolo_thread.start()
|
||
self.log_handler.write_log("YOLO 推論執行中...")
|
||
|
||
def on_detection_complete(self, result_image, save_path):
|
||
""" YOLO 推論完成後的回呼函數 """
|
||
self.display_image(result_image, self.view_predict)
|
||
self.statusbar.showMessage(f"推論結果已儲存: {save_path}")
|
||
self.log_handler.write_log(f"推論結果儲存至: {save_path}")
|
||
|
||
def closeEvent(self, event):
|
||
reply = QtWidgets.QMessageBox.question(
|
||
self, "確認", "確定要關閉應用程式嗎?",
|
||
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No, QtWidgets.QMessageBox.No
|
||
)
|
||
|
||
if reply == QtWidgets.QMessageBox.Yes:
|
||
self.StopKeepShot() # 停止相機
|
||
self.log_handler.write_log("程式關閉") # ✅ 寫入 log: 程式關閉
|
||
event.accept() # 允許視窗關閉
|
||
else:
|
||
event.ignore() # 阻止視窗關閉
|
||
|
||
if __name__ == "__main__":
|
||
multiprocessing.freeze_support() # 在 Windows 上執行 multiprocessing 需要這行
|
||
app = QtWidgets.QApplication(sys.argv)
|
||
window = DetectionApp()
|
||
window.show()
|
||
sys.exit(app.exec_())
|