AP/detection.py

154 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import cv2
import numpy as np
import multiprocessing
from PyQt5 import QtWidgets, QtGui, QtCore
from PyQt5.QtCore import QTimer
from ultralytics import YOLO
from Detection_window import Ui_MainWindow
from camera.camera_process import CameraProcess
from read_ini import ConfigReader
from log_handler import LogHandler
from Yolo_predict import YoloPredict
class DetectionApp(QtWidgets.QMainWindow, Ui_MainWindow):
def __init__(self):
super(DetectionApp, self).__init__()
self.setupUi(self)
self.log_handler = LogHandler() # ✅ 建立 LogHandler 物件
self.log_handler.write_log("程式啟動") # ✅ 寫入 log: 程式啟動
# ✅ 讀取 `exposure_time`
self.config_reader = ConfigReader()
self.exposure_time = self.config_reader.get_exposure_time()
# ✅ 先初始化 image_queue再啟動相機擷取
self.image_queue = multiprocessing.Queue(maxsize=1)
self.camera_process = None # 相機進程尚未啟動
self.latest_image = None # 存儲最新影像
# ✅ 載入 YOLO模型
try:
self.model = YOLO("model/best.pt")
self.log_handler.write_log("YOLO 模型載入成功")
except Exception as e:
self.log_handler.write_log(f"⚠️ YOLO 模型載入失敗: {e}")
self.model = None
# ✅ 連接按鈕事件
self.bt_KeepShot.clicked.connect(self.KeepShot)
self.bt_StopKeepShot.clicked.connect(self.StopKeepShot)
self.bt_detection.clicked.connect(self.detection)
# 鎖定檢測按鈕及停止相機連續取像
self.bt_detection.setEnabled(False)
self.bt_StopKeepShot.setEnabled(False)
# ✅ 設定 QTimer每 100ms 更新影像
self.timer = QTimer(self)
self.timer.timeout.connect(self.update_view_origin)
self.timer.start(100) # 每 100ms 更新一次影像
def KeepShot(self):
""" 啟動相機擷取 """
if self.camera_process is None or not self.camera_process.is_alive():
self.StopKeepShot() # 確保先停止舊的 Process避免衝突
# ✅ 重新建立 Queue避免殘留影像影響新擷取
self.image_queue = multiprocessing.Queue(maxsize=1)
# ✅ 使用 `exposure_time` 啟動相機
self.camera_process = CameraProcess(self.image_queue, exposure_time=self.exposure_time)
self.camera_process.start()
self.log_handler.write_log("相機啟動") # ✅ 寫入 log: 相機啟動
self.statusbar.showMessage(f"開始擷取影像 (曝光時間: {self.exposure_time} 微秒)")
# 按鈕setEnable
self.bt_KeepShot.setEnabled(False)
self.bt_StopKeepShot.setEnabled(True)
self.bt_detection.setEnabled(True)
else:
print("相機已經在擷取")
def StopKeepShot(self):
""" 停止相機擷取 """
if self.camera_process and self.camera_process.is_alive():
self.camera_process.stop() # 停止擷取
self.camera_process.join() # 等待進程完全結束
self.camera_process = None
print("已停止影像擷取")
self.log_handler.write_log("相機停止") # ✅ 寫入 log: 相機停止
# 按鈕setEnable
self.bt_KeepShot.setEnabled(True)
self.bt_StopKeepShot.setEnabled(False)
self.bt_detection.setEnabled(False)
# ✅ 清空 Queue確保新擷取不會讀取到舊影像
while not self.image_queue.empty():
try:
self.image_queue.get_nowait()
except:
break
def update_view_origin(self):
""" 從 Queue 獲取影像並顯示在 QLabel (view_origin) 上 """
if not self.image_queue.empty():
image = self.image_queue.get() # 取得最新影像
self.latest_image = image # 存儲最新影像
self.display_image(image, self.view_origin)
def display_image(self, image, label):
""" 顯示影像到 QLabel """
try:
image_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) if len(image.shape) == 2 else image
height, width, channel = image_bgr.shape
bytes_per_line = 3 * width
qimage = QtGui.QImage(image_bgr.data, width, height, bytes_per_line, QtGui.QImage.Format_BGR888)
pixmap = QtGui.QPixmap.fromImage(qimage).scaled(label.size(), QtCore.Qt.KeepAspectRatio)
label.setPixmap(pixmap)
except Exception as e:
self.log_handler.write_log(f"⚠️ 顯示影像時發生錯誤: {e}")
def detection(self):
""" 啟動 YOLO 推論執行緒 """
if self.latest_image is None:
self.log_handler.write_log("⚠️ 無影像可進行推論")
return
# ✅ 啟動 YOLO 推論執行緒
self.yolo_thread = YoloPredict(
image=self.latest_image,
model_path="model/best.pt",
callback=self.on_detection_complete # ✅ 設定回呼函數
)
self.yolo_thread.start()
self.log_handler.write_log("YOLO 推論執行中...")
def on_detection_complete(self, result_image, save_path):
""" YOLO 推論完成後的回呼函數 """
self.display_image(result_image, self.view_predict)
self.statusbar.showMessage(f"推論結果已儲存: {save_path}")
self.log_handler.write_log(f"推論結果儲存至: {save_path}")
def closeEvent(self, event):
reply = QtWidgets.QMessageBox.question(
self, "確認", "確定要關閉應用程式嗎?",
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No, QtWidgets.QMessageBox.No
)
if reply == QtWidgets.QMessageBox.Yes:
self.StopKeepShot() # 停止相機
self.log_handler.write_log("程式關閉") # ✅ 寫入 log: 程式關閉
event.accept() # 允許視窗關閉
else:
event.ignore() # 阻止視窗關閉
if __name__ == "__main__":
multiprocessing.freeze_support() # 在 Windows 上執行 multiprocessing 需要這行
app = QtWidgets.QApplication(sys.argv)
window = DetectionApp()
window.show()
sys.exit(app.exec_())