This repository has been archived on 2020-09-28. You can view files and clone it, but cannot push or open issues or pull requests.
AIParking_Engine_deploy2/app.py

320 lines
11 KiB
Python

import imageio
import io
import requests
from flask import Flask, request, jsonify, make_response
import base64
import cv2
import time
from datetime import datetime
import json
import plate_detect
import plate_ocr
config = {}
with open("config.json", "r") as config_file:
config = json.load(config_file)
def print_current_time():
now = datetime.now()
print("\n", now)
def extract_from_base64_str(img64):
splits = img64.split(";base64,")
if len(splits) == 2:
img64 = splits[1]
img_binary = base64.b64decode(img64)
# img = io_imread(io.BytesIO(base64.b64decode(b64_string)))
return img_binary
def check_plate_quality(lp_elements):
head, mid, tail = lp_elements
# if len(tail) < 4: return 0
return len(head) + len(mid) + len(tail)
def make_automatic_res(plate, track_id, confidence):
return {
"vehicle": {
"plate": plate,
"vehicle_id": str(track_id),
"confidence": str(confidence),
"open_door": "0"
}
}
app = Flask(__name__)
"""
Description: API for intops factory only
Input: {track_id, image, mode, type, frame}
Output: {name, person_id (track_id), confidence, open_door}
"""
@app.route("/automatic/get-from-frame", methods=["POST"])
def get_from_frame_automatic():
data = request.json
track_id = data.get("vehicle_id", 0)
try:
print_current_time()
print (request.path)
start_time = time.time()
img64 = data.get("image", "")
mode = data.get("mode", "long") # ["long", "square"]
type_come = data.get("type", "in") # in / out
gate_id = data.get("gate_id", "1")
frame64 = data.get("frame", "")
img_binary = extract_from_base64_str(img64)
img_np = imageio.imread(io.BytesIO(img_binary))
frame_binary= extract_from_base64_str(frame64)
frame_np = imageio.imread(io.BytesIO(frame_binary))
frame_np = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR)
if config['save_img']:
cv2.imwrite("img/"+str(start_time)+"_frame.jpg", frame_np)
# cv2.imwrite("plate.jpg", Ilp)
Ilp, mode = plate_detect.run(img_np)
if Ilp is None:
return make_automatic_res("", track_id, 0)
split_plate = mode == "square"
ocr_result, elements = plate_ocr.run(mode, Ilp, "full", split_plate)
confidence = check_plate_quality(elements)
if confidence > 10 or confidence < 7:
print("\t [WARNING] Too long/short plate:", ocr_result)
confidence = 0
return make_automatic_res("", track_id, confidence)
if config['save_img']:
cv2.imwrite("img/"+str(start_time)+"_" + ocr_result + ".jpg", Ilp)
if config["save_logs_api"]["enable"]:
log_start_time = time.time()
res = requests.request(
config["save_logs_api"]["push_log"]["method"],
config["save_logs_api"]["push_log"]["url"],
json={
"plate": ocr_result.replace("-", ""),
"plate_image": base64.b64encode(cv2.imencode(".jpg", Ilp)[1]).decode("utf-8"),
"frame_image": frame64,
"gate_id" : gate_id,
"type": type_come,
"time": data['frametime']
}
)
log_end_time = time.time()
print("\tAPI time:", log_end_time - log_start_time)
dataRes = json.loads(res.text)
print ("\tWebServer response:", dataRes["status"])
if dataRes["status"] and config["open_door_api"]["enable"]:
zk_start_time = time.time()
if config["os"] == "linux":
requests.post(config['open_door'][type_come]['url'], json={"door": config['open_door'][type_come]['door']})
if config["os"] == "windows":
requests.post(config["open_door"][type_come]["url"], headers={"Content-Type" : "text/html","doorId": config["open_door"][type_come]["door"]})
zk_end_time = time.time()
print("\tZK time:", zk_end_time - zk_start_time)
end_time = time.time()
print("\tTotal time:", end_time - start_time)
return make_automatic_res(ocr_result, track_id, confidence)
except Exception as ex:
print(ex)
return make_automatic_res("", track_id, 0)
@app.route("/automatic/vehicle", methods=["POST"])
def get_from_vehicle():
data = request.json
track_id = data.get("vehicle_id", 0)
try:
print_current_time()
print (request.path)
start_time = time.time()
img64 = data.get("image", "")
mode = data.get("mode", "long") # ["long", "square"]
type_come = data.get("type", "in") # in / out
gate_id = data.get("gate_id", "1")
frame64 = data.get("frame", "")
img_binary = extract_from_base64_str(img64)
img_np = imageio.imread(io.BytesIO(img_binary))
frame_binary= extract_from_base64_str(frame64)
frame_np = imageio.imread(io.BytesIO(frame_binary))
frame_np = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR)
# cv2.imwrite("plate.jpg", Ilp)
Ilps, modes = plate_detect.run2(img_np)
if Ilps is None:
return make_automatic_res("", track_id, 0)
longest = ''
longest_i = 0
for i, Ilp in enumerate(Ilps):
mode = modes[i]
split_plate = mode == "square"
ocr_result, elements = plate_ocr.run(mode, Ilp, "full", split_plate)
if len(ocr_result) > len(longest):
longest = ocr_result
longest_i = i
print(len(Ilps), longest_i, longest)
ocr_result = longest
Ilp = Ilps[longest_i]
confidence = check_plate_quality(elements)
if confidence > 10 or confidence < 7:
print("\t [WARNING] Too long/short plate:", ocr_result)
confidence = 0
return make_automatic_res("", track_id, confidence)
ocr_result = ocr_result.replace("-", "")
if config["save_logs_api"]["enable"]:
log_start_time = time.time()
res = requests.request(
config["save_logs_api"]["push_log"]["method"],
config["save_logs_api"]["push_log"]["url"],
json={
"plate": ocr_result.replace("-", ""),
"plate_image": base64.b64encode(cv2.imencode(".jpg", Ilp)[1]).decode("utf-8"),
"frame_image": frame64,
"gate_id" : gate_id,
"type": type_come,
"time": data['frametime']
}
)
log_end_time = time.time()
print("\tAPI time:", log_end_time - log_start_time)
dataRes = json.loads(res.text)
print ("\tWebServer response:", dataRes["status"])
if dataRes["status"] and config["open_door_api"]["enable"]:
zk_start_time = time.time()
if config["os"] == "linux":
requests.post(config['open_door'][type_come]['url'], json={"door": config['open_door'][type_come]['door']})
if config["os"] == "windows":
requests.post(config["open_door"][type_come]["url"], headers={"Content-Type" : "text/html","doorId": config["open_door"][type_come]["door"]})
zk_end_time = time.time()
print("\tZK time:", zk_end_time - zk_start_time)
end_time = time.time()
print("\tTotal time:", end_time - start_time)
ocr_result = ocr_result.replace("-", "")
if config['save_img']:
cv2.imwrite("img2/"+str(start_time)+"_" + ocr_result + ".jpg", Ilp)
cv2.imwrite("img2/"+str(start_time)+"_frame.jpg", frame_np)
return make_automatic_res(ocr_result, track_id, confidence)
except Exception as ex:
print(ex)
return make_automatic_res("", track_id, 0)
"""
Description: API to get plate OCR from image of plate
Note: This API is not exacted than get-from-frame in most situations
because it does not re-detect plate to align the plate
Input:
img64: base64 img string,
mode: "long" or "square",
display: "full" or "row2" (This options is applied into square plates having 2 lines)
Output:
ocr: ""
"""
@app.route("/get-from-plate", methods=["POST"])
def get_from_plate():
try:
print_current_time()
print (request.path)
start_time = time.time()
data = request.json
img64 = data.get("img64", "")
if img64 == "": return {"ocr": ""}
mode = data.get("mode", "long") # ["long", "square"]
display = data.get("display", "full") # ["full", "row2"]
print("\t Mode:", mode)
img_binary = extract_from_base64_str(img64)
img_np = imageio.imread(io.BytesIO(img_binary))
ocr_result, elements = plate_ocr.run(mode, img_np, display, False)
end_time = time.time()
print("\tTotal time:", end_time - start_time)
return {"ocr": ocr_result}
except Exception as ex:
print (ex)
return {"ocr": ""}
"""
Description: API to get plate OCR from a large image that contains the plate
Note: This API requires the input image is squared evenif OCR on long plate !!!
For best result, resize input img to 272x272
Input:
img64: base64 img string,
mode: "long" or "square",
display: "full" or "row2" (This options is applied into square plates having 2 lines)
Output:
ocr: "",
plate: "" (img base64 string for plate image)
"""
@app.route("/get-from-frame", methods=["POST"])
def get_from_frame():
try:
print_current_time()
print(request.path)
start_time = time.time()
data = request.json
img64 = data.get("img64", "")
if img64 == "": return {"ocr": "", "plate": ""}
client_mode = data.get("mode", "long") # ["long", "square"]
display = data.get("display", "full") # ["full", "row2"]
img_binary = extract_from_base64_str(img64)
img_np = imageio.imread(io.BytesIO(img_binary))
Ilp, mode = plate_detect.run(img_np)
if Ilp is None:
Ilp = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY)
Ilp = cv2.cvtColor(Ilp, cv2.COLOR_GRAY2BGR)
ocr_result, elements = plate_ocr.run(client_mode, Ilp, display, False)
end_time = time.time()
print("\tTotal time:", end_time - start_time)
return {"ocr": ocr_result, "plate": img64}
ocr_result, elements = plate_ocr.run(mode, Ilp, display, False)
plate_result = base64.b64encode(cv2.imencode(".jpg", Ilp)[1]).decode("utf-8")
end_time = time.time()
print("\tTotal time:", end_time - start_time)
return {
"ocr": ocr_result,
"plate": plate_result
}
except Exception as ex:
return {
"ocr": "",
"plate": ""
}
print(ex)