新增 function 上傳 南側門進出資料-1

This commit is contained in:
威勝 張 2024-04-29 14:49:02 +08:00
parent 0fe01e5508
commit ef8c2ecc9c
3 changed files with 60 additions and 315 deletions

View File

@ -5,6 +5,9 @@
<orderEntry type="inheritedJdk" /> <orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
<component name="PackageRequirementsSettings">
<option name="requirementsPath" value="" />
</component>
<component name="TemplatesService"> <component name="TemplatesService">
<option name="TEMPLATE_CONFIGURATION" value="Jinja2" /> <option name="TEMPLATE_CONFIGURATION" value="Jinja2" />
<option name="TEMPLATE_FOLDERS"> <option name="TEMPLATE_FOLDERS">
@ -13,4 +16,7 @@
</list> </list>
</option> </option>
</component> </component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module> </module>

View File

@ -50,10 +50,10 @@ class Parking_spaces_API():
return return
class South_API():
class Start_API():
def __init__(self): def __init__(self):
self.token = self.User_token()['token'] self.token = self.User_token()['token']
def User_token(self): def User_token(self):
API = "/Users/authenticate" API = "/Users/authenticate"
data = { data = {
@ -65,318 +65,13 @@ class Start_API():
data = response.json() data = response.json()
return data return data
# 獲取CAM全部資料 # 更新 token
def get_all_cam_data(self): def token_update(self):
API = "/api/Parking_spaces_cam"
headers = {
'Content-type': 'application/json',
"Authorization": self.token
}
data = Parking_spaces_API(API, headers).get()
return data
# 獲取ROI某筆資料
def get_roi_pass_data(self,parking_spaces_name):
API = "/api/Parking_spaces_roi_pass/" + parking_spaces_name
headers = CaseInsensitiveDict();
headers['Accept'] = 'application/json'
data = Parking_spaces_API(API, headers).get()
return data
# 獲取CAM某停車場資料
def get_one_cam_data(self,parking_spaces_name):
API = "/api/Parking_spaces_cam/parking_spaces_name-" + parking_spaces_name
headers = {
'Content-type': 'application/json',
"Authorization": self.token
}
cam_data = Parking_spaces_API(API, headers).get()
return cam_data
# 獲取ROI某筆資料
def get_roi_violation_data(self, parking_spaces_name):
API = "/api/Parking_spaces_roi_violation/parking_spaces_name-" + parking_spaces_name
headers = CaseInsensitiveDict();
headers['Accept'] = 'application/json'
data = Parking_spaces_API(API, headers).get()
return data
class Pass_API():
def __init__(self):
self.token = self.User_token()['token'] self.token = self.User_token()['token']
def User_token(self):
API = "/Users/authenticate"
data = {
'username': "admin",
'password': "admin"
}
headers = {'Content-type': 'application/json'}
response = Parking_spaces_API(API, headers).post(data)
data = response.json()
return data
# 獲取get_cam的資料
def get_cam(self,parking_spaces_name):
API = "/api/Parking_spaces_cam/parking_spaces_name-" + parking_spaces_name
headers = {
'Content-type': 'application/json',
"Authorization": self.token
}
data = Parking_spaces_API(API, headers).get()
return data
# 獲取get_cam_ptz的資料
def get_cam_ptz(self,parking_spaces_name):
API = "/api/Parking_spaces_cam_ptz_pass/parking_spaces_name-" + parking_spaces_name
headers = CaseInsensitiveDict();
headers['Accept'] = 'application/json'
data = Parking_spaces_API(API, headers).get()
return data
# 更新cam_ptz的資料
def put_cam_ptz(self,data, IP):
parking_spaces_name = data['parking_spaces_name']
port = data['rtsp_port']
API = "/api/Parking_spaces_cam_ptz_pass/parking_spaces_name-" + parking_spaces_name + "-IP-" + IP + "-port-" + port
headers = {
'accept': '*/*',
'Content-type': 'application/json'}
res = Parking_spaces_API(API, headers).put(data)
print(res)
# 上傳資料(當前) # 上傳資料(當前)
def post_data_instant(self,data): def post_data(self, data):
API = "/api/Parking_spaces_instant/" API = "/api/Yuntech_in_car_table"
headers = {'Content-type': 'application/json'}
response = Parking_spaces_API(API, headers).post(data)
print(response)
# 上傳資料(歷史)
def post_data_history(self,data):
API = "/api/Parking_spaces_history/"
headers = {'Content-type': 'application/json'}
response = Parking_spaces_API(API, headers).post(data)
print(response)
# 刪除資料
def delete_data(self,plate):
API = "/api/Parking_spaces_instant/" + plate
headers = {'Content-type': 'application/json'}
response = Parking_spaces_API(API, headers).delete()
print(response)
# 字幕機 數據+1
def LED_add(self,parking_spaces_name):
API = "/api/Parking_spaces_total_table/Parking_spaces_total_table_add-"+parking_spaces_name
headers = CaseInsensitiveDict();
headers['Accept'] = 'application/json'
data = Parking_spaces_API(API, headers).get()
# 字幕機 數據-1
def LED_minus(self,parking_spaces_name):
API = "/api/Parking_spaces_total_table/Parking_spaces_total_table_subtract-"+parking_spaces_name
headers = CaseInsensitiveDict();
headers['Accept'] = 'application/json'
data = Parking_spaces_API(API, headers).get()
def get_num(self,parking_spaces_name):
API = "/api/Parking_spaces_total_table/" + parking_spaces_name
headers = CaseInsensitiveDict();
headers['Accept'] = 'application/json'
data = Parking_spaces_API(API, headers).getSingleData()
num = data['parking_spaces_now_num'][0]
return num
class Car_num_check_API():
def __init__(self):
self.token = self.User_token()['token']
def User_token(self):
API = "/Users/authenticate"
data = {
'username': "admin",
'password': "admin"
}
headers = {'Content-type': 'application/json'}
response = Parking_spaces_API(API, headers).post(data)
data = response.json()
return data
# 獲取get_cam的資料
def get_cam(self,parking_spaces_name):
API = "/api/Parking_spaces_cam/parking_spaces_name-" + parking_spaces_name
headers = {
'Content-type': 'application/json',
"Authorization": self.token
}
data = Parking_spaces_API(API, headers).get()
return data
# 獲取get_cam_ptz的資料
def get_cam_ptz(self,parking_spaces_name):
API = "/api/Parking_spaces_cam_ptz_car_num_check/parking_spaces_name-" + parking_spaces_name
headers = CaseInsensitiveDict();
headers['Accept'] = 'application/json'
data = Parking_spaces_API(API, headers).get()
return data
# 更新cam_ptz的資料
def put_can_ptz(self,data):
parking_spaces_name = data['parking_spaces_name']
serial_num = data['serial_num']
port = data['rtsp_port']
API = "/api/Parking_spaces_cam_ptz_car_num_check/parking_spaces_name-" + parking_spaces_name + "-serial_num-" + serial_num
# print(API)
headers = {
'accept': '*/*',
'Content-type': 'application/json'}
res = Parking_spaces_API(API, headers).put(data)
# print(res)
# 獲取get_cam_roi的資料
def get_cam_roi(self,parking_spaces_name):
API = "/api/Parking_spaces_roi_car_num_check/parking_spaces_name-" + parking_spaces_name
headers = {
'Content-type': 'application/json',
"Authorization": self.token
}
data = Parking_spaces_API(API, headers).get()
return data
def get_num(self,parking_spaces_name):
API = "/api/Parking_spaces_total_table/" + parking_spaces_name
headers = {
'Content-type': 'application/json',
"Authorization": self.token
}
data = Parking_spaces_API(API, headers).getSingleData()
num = data['parking_spaces_now_num'][0]
return num
def get_all_num(self,parking_spaces_name):
API = "/api/Parking_spaces_total_table/" + parking_spaces_name
headers = {
'Content-type': 'application/json',
"Authorization": self.token
}
data = Parking_spaces_API(API, headers).getSingleData()
num = data['parking_spaces_total_num'][0]
return num
def put_parking_spaces_name_num(self,parking_spaces_name,num):
API = "/api/Parking_spaces_total_table/" + parking_spaces_name
headers = {
'Content-type': 'application/json',
"Authorization": self.token
}
data = Parking_spaces_API(API, headers).getSingleData()
data_1 = {
"parking_spaces_name":data['parking_spaces_name'][0],
"parking_spaces_total_num": data['parking_spaces_total_num'][0],
"longitude": data['longitude'][0],
"latitude": data['latitude'][0],
"parking_spaces_now_num": str(num),
}
API = "/api/Parking_spaces_total_table/" + parking_spaces_name
# print(API)
headers = {
'accept': '*/*',
'Content-type': 'application/json'}
res = Parking_spaces_API(API, headers).put(data_1)
print(res)
API = "/api/Parking_spaces_lcd_instand/parking_space_area-" + parking_spaces_name
headers = {
'Content-type': 'application/json',
"Authorization": self.token
}
data = Parking_spaces_API(API, headers).get()
for i in range(0,len(data)):
if data['parking_spaces_name'][i] == parking_spaces_name:
data_1 = {
"parking_spaces_name":data['parking_spaces_name'][i],
"lcd_ip":data['lcd_ip'][i],
"parking_space_amount":str(num)
}
API = "/api/Parking_spaces_lcd_instand/" + data['lcd_ip'][i]
# print(API)
headers = {
'accept': '*/*',
'Content-type': 'application/json'}
res = Parking_spaces_API(API, headers).put(data_1)
print(res)
#return num
class Violation_API():
def __init__(self):
self.token = self.User_token()['token']
def User_token(self):
API = "/Users/authenticate"
data = {
'username': "admin",
'password': "admin"
}
headers = {'Content-type': 'application/json'}
response = Parking_spaces_API(API, headers).post(data)
data = response.json()
return data
# 獲取get_cam的資料
def get_cam(self,parking_spaces_name):
API = "/api/Parking_spaces_cam/parking_spaces_name-" + parking_spaces_name
headers = {
'Content-type': 'application/json',
"Authorization": self.token
}
data = Parking_spaces_API(API, headers).get()
return data
# 獲取get_cam_ptz的資料
def get_cam_ptz(self,parking_spaces_name):
API = "/api/Parking_spaces_cam_ptz_violation/parking_spaces_name-" + parking_spaces_name
headers = CaseInsensitiveDict();
headers['Accept'] = 'application/json'
data = Parking_spaces_API(API, headers).get()
return data
# 更新cam_ptz的資料
def put_can_ptz(self,data):
parking_spaces_name = data['parking_spaces_name']
port = data['rtsp_port']
serial_num = data['serial_num']
API = "/api/Parking_spaces_cam_ptz_violation/parking_spaces_name-" + parking_spaces_name + "-rtsp_port-" + port+"-serial_num-"+serial_num
print(API)
headers = {
'accept': '*/*',
'Content-type': 'application/json'}
res = Parking_spaces_API(API, headers).put(data)
print(res)
def post_violation_car(self,data):
API = "/api/Violation_car_table/"
headers = { headers = {
'Content-type': 'application/json', 'Content-type': 'application/json',
"Authorization": self.token "Authorization": self.token
@ -399,3 +94,4 @@ class Violation_API():

View File

@ -8,7 +8,7 @@ import re
from flask import render_template from flask import render_template
from flask import Flask, request, jsonify from flask import Flask, request, jsonify
from pyModbusTCP.client import ModbusClient from pyModbusTCP.client import ModbusClient
from Class.API_class import South_API
import datetime import datetime
import shutil import shutil
@ -421,12 +421,16 @@ class Webcam(threading.Thread):
time.sleep(1) time.sleep(1)
class Camera(mp.Process): class Camera(mp.Process):
def __init__(self, data, parent=None): def __init__(self, data, parent=None):
super().__init__(parent) super().__init__(parent)
self.url = data['rtsp_url'] self.url = data['rtsp_url']
self.parking_spaces_name = data['rtsp_name'] self.parking_spaces_name = data['rtsp_name']
self.IO_IP = data['IP'] self.IO_IP = data['IP']
self.api = South_API()
def run(self): def run(self):
# self.IO = IO_Webapi(self.IO_IP) # self.IO = IO_Webapi(self.IO_IP)
@ -493,7 +497,6 @@ class Camera(mp.Process):
self.licence_check(cars_data) self.licence_check(cars_data)
self.IO_control() self.IO_control()
# self.all_open() # self.all_open()
self.state_check()
img = self.img.copy() img = self.img.copy()
self.show_img(car_img) self.show_img(car_img)
@ -529,6 +532,8 @@ class Camera(mp.Process):
licence_img = car_img[licence_box_location[1]:licence_box_location[1] + licence_box_location[3], licence_img = car_img[licence_box_location[1]:licence_box_location[1] + licence_box_location[3],
licence_box_location[0]:licence_box_location[0] + licence_box_location[2]] licence_box_location[0]:licence_box_location[0] + licence_box_location[2]]
self.flag_1 = True self.flag_1 = True
if len(self.licence_list)==0:
self.car_img = img
result = self.ocr.licence_detect(licence_img) result = self.ocr.licence_detect(licence_img)
if result != [[]]: if result != [[]]:
@ -564,7 +569,7 @@ class Camera(mp.Process):
self.licence_fps = self.licence_fps + 1 self.licence_fps = self.licence_fps + 1
if self.licence_fps > 3 and len(self.licence_list) > 2: if self.licence_fps > 3 and len(self.licence_list) > 2:
self.licence_num = self.licence_vote() self.licence_num = self.licence_vote()
if self.licence_num in self.white_licence_list: #if self.licence_num in self.white_licence_list:
self.open = True self.open = True
self.flag_1 = False self.flag_1 = False
@ -596,6 +601,7 @@ class Camera(mp.Process):
print("open") print("open")
self.text = "open" self.text = "open"
self.open_time = time.time() self.open_time = time.time()
self.state_check()
self.flag_2 = False self.flag_2 = False
self.IO.IO.out_1_clicked() self.IO.IO.out_1_clicked()
if self.open == True and (close_time - self.open_time) > 1: if self.open == True and (close_time - self.open_time) > 1:
@ -613,6 +619,23 @@ class Camera(mp.Process):
# 判斷狀態並上傳到API # 判斷狀態並上傳到API
def state_check(self): def state_check(self):
if self.open == True:
if self.flag_2 == True:
print(f'{self.licence_num}-進入')
cv2.imwrite(f'img/licence/{self.licence_num}.jpg', self.car_img) # 写入图像路径
img_base64 = self.img_to_base64(self.car_img)
data = {
"location": "南側門",
"license_plate_number": self.licence_num,
"car_img": img_base64,
"in_time": "2024-02-04T19:15:09.4371245+08:00",
"out_time": "2024-02-04T19:15:09.4371245+08:00"
}
try:
self.api.post_data(data)
except BaseException as e:
print(e)
print("error_api")
pass pass
## 將指定時間影片儲存 ## 將指定時間影片儲存
@ -718,6 +741,18 @@ class Camera(mp.Process):
cv2.imshow(f'{self.parking_spaces_name}', img) cv2.imshow(f'{self.parking_spaces_name}', img)
cv2.waitKey(1) cv2.waitKey(1)
# 圖片 轉 base64
def img_to_base64(self,img):
_, buffer = cv2.imencode('.jpg', img)
img_bytes = buffer.tobytes()
# 編碼為 base64 字串
img_base64 = base64.b64encode(img_bytes).decode('utf-8')
# 拼接 base64 字串
data_uri = "data:image/jpeg;base64," + img_base64
return data_uri
if __name__ == '__main__': if __name__ == '__main__':
@ -730,6 +765,14 @@ if __name__ == '__main__':
# rtsp_url_list = [basepath+'test_2.avi']# , basepath+'test_1.avi'] # rtsp_url_list = [basepath+'test_2.avi']# , basepath+'test_1.avi']
rtsp_name_list = ['南側門出口', '南側門入口'] rtsp_name_list = ['南側門出口', '南側門入口']
IO_IP_list = ["192.168.100.201", "192.168.100.204"] IO_IP_list = ["192.168.100.201", "192.168.100.204"]
# 開發用
# rtsp_url_list = ['rtsp://admin:admin123@192.168.100.102', 'rtsp://admin:admin123@192.168.100.101']
rtsp_url_list = [basepath+'test1.avi']# , basepath+'test_1.avi']
rtsp_name_list = ['南側門入口']
IO_IP_list = [ "192.168.1.200"]
webcam_list = [] webcam_list = []
for i in range(0, len(rtsp_url_list)): for i in range(0, len(rtsp_url_list)):
data = { data = {