319 lines
11 KiB
Python
319 lines
11 KiB
Python
import imageio
|
|
import io
|
|
import requests
|
|
from flask import Flask, request, jsonify, make_response
|
|
import base64
|
|
import cv2
|
|
import time
|
|
from datetime import datetime
|
|
import json
|
|
import plate_detect
|
|
import plate_ocr
|
|
|
|
|
|
config = {}
|
|
with open("config.json", "r") as config_file:
|
|
config = json.load(config_file)
|
|
|
|
|
|
def print_current_time():
|
|
now = datetime.now()
|
|
print("\n", now)
|
|
|
|
|
|
def extract_from_base64_str(img64):
|
|
splits = img64.split(";base64,")
|
|
if len(splits) == 2:
|
|
img64 = splits[1]
|
|
|
|
img_binary = base64.b64decode(img64)
|
|
# img = io_imread(io.BytesIO(base64.b64decode(b64_string)))
|
|
return img_binary
|
|
|
|
|
|
def check_plate_quality(lp_elements):
|
|
head, mid, tail = lp_elements
|
|
# if len(tail) < 4: return 0
|
|
return len(head) + len(mid) + len(tail)
|
|
|
|
|
|
def make_automatic_res(plate, track_id, confidence):
|
|
return {
|
|
"vehicle": {
|
|
"plate": plate,
|
|
"vehicle_id": str(track_id),
|
|
"confidence": str(confidence),
|
|
"open_door": "0"
|
|
}
|
|
}
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
|
"""
|
|
Description: API for intops factory only
|
|
Input: {track_id, image, mode, type, frame}
|
|
Output: {name, person_id (track_id), confidence, open_door}
|
|
"""
|
|
|
|
|
|
@app.route("/automatic/get-from-frame", methods=["POST"])
|
|
def get_from_frame_automatic():
|
|
data = request.json
|
|
track_id = data.get("vehicle_id", 0)
|
|
|
|
try:
|
|
print_current_time()
|
|
print (request.path)
|
|
start_time = time.time()
|
|
|
|
img64 = data.get("image", "")
|
|
mode = data.get("mode", "long") # ["long", "square"]
|
|
type_come = data.get("type", "in") # in / out
|
|
gate_id = data.get("gate_id", "1")
|
|
frame64 = data.get("frame", "")
|
|
img_binary = extract_from_base64_str(img64)
|
|
img_np = imageio.imread(io.BytesIO(img_binary))
|
|
|
|
frame_binary= extract_from_base64_str(frame64)
|
|
frame_np = imageio.imread(io.BytesIO(frame_binary))
|
|
frame_np = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR)
|
|
if config['save_img']:
|
|
cv2.imwrite("img/"+str(start_time)+"_frame.jpg", frame_np)
|
|
|
|
# cv2.imwrite("plate.jpg", Ilp)
|
|
Ilp, mode = plate_detect.run(img_np)
|
|
print("\t Mode:", mode)
|
|
|
|
if Ilp is None:
|
|
print("\t No plate found")
|
|
return make_automatic_res("", track_id, 0)
|
|
|
|
split_plate = mode == "square"
|
|
|
|
ocr_start_time = time.time()
|
|
ocr_result, elements = plate_ocr.run(mode, Ilp, "full", split_plate)
|
|
ocr_end_time = time.time()
|
|
print("\tOCR time:", ocr_end_time - ocr_start_time)
|
|
|
|
confidence = check_plate_quality(elements)
|
|
if confidence > 10 or confidence < 7:
|
|
print("\t [WARNING] Too long/short plate:", ocr_result)
|
|
confidence = 0
|
|
return make_automatic_res("", track_id, confidence)
|
|
|
|
if config['save_img']:
|
|
cv2.imwrite("img/"+str(start_time)+"_" + ocr_result + ".jpg", Ilp)
|
|
if config["save_logs_api"]["enable"]:
|
|
log_start_time = time.time()
|
|
res = requests.request(
|
|
config["save_logs_api"]["push_log"]["method"],
|
|
config["save_logs_api"]["push_log"]["url"],
|
|
json={
|
|
"plate": ocr_result.replace("-", ""),
|
|
"plate_image": base64.b64encode(cv2.imencode(".jpg", Ilp)[1]).decode("utf-8"),
|
|
"frame_image": frame64,
|
|
"gate_id" : gate_id,
|
|
"type": type_come,
|
|
"time": data['frametime']
|
|
}
|
|
)
|
|
log_end_time = time.time()
|
|
print("\tAPI time:", log_end_time - log_start_time)
|
|
|
|
dataRes = json.loads(res.text)
|
|
print ("\tWebServer response:", dataRes["status"])
|
|
if dataRes["status"] and config["open_door_api"]["enable"]:
|
|
zk_start_time = time.time()
|
|
if config["os"] == "linux":
|
|
requests.post(config['open_door'][type_come]['url'], json={"door": config['open_door'][type_come]['door']})
|
|
if config["os"] == "windows":
|
|
requests.post(config["open_door"][type_come]["url"], headers={"Content-Type" : "text/html","doorId": config["open_door"][type_come]["door"]})
|
|
zk_end_time = time.time()
|
|
print("\tZK time:", zk_end_time - zk_start_time)
|
|
|
|
end_time = time.time()
|
|
print("\tTotal time:", end_time - start_time)
|
|
return make_automatic_res(ocr_result, track_id, confidence)
|
|
except Exception as ex:
|
|
print(ex)
|
|
return make_automatic_res("", track_id, 0)
|
|
|
|
@app.route("/automatic/vehicle", methods=["POST"])
|
|
def get_from_vehicle():
|
|
data = request.json
|
|
track_id = data.get("vehicle_id", 0)
|
|
|
|
try:
|
|
print_current_time()
|
|
print (request.path)
|
|
start_time = time.time()
|
|
|
|
img64 = data.get("image", "")
|
|
mode = data.get("mode", "long") # ["long", "square"]
|
|
type_come = data.get("type", "in") # in / out
|
|
gate_id = data.get("gate_id", "1")
|
|
frame64 = data.get("frame", "")
|
|
img_binary = extract_from_base64_str(img64)
|
|
img_np = imageio.imread(io.BytesIO(img_binary))
|
|
|
|
frame_binary= extract_from_base64_str(frame64)
|
|
frame_np = imageio.imread(io.BytesIO(frame_binary))
|
|
frame_np = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR)
|
|
|
|
# cv2.imwrite("plate.jpg", Ilp)
|
|
Ilps, modes = plate_detect.run2(img_np)
|
|
print("\t Mode:", mode)
|
|
|
|
if Ilps is None:
|
|
print("\t No plate found")
|
|
return make_automatic_res("", track_id, 0)
|
|
|
|
ocr_start_time = time.time()
|
|
longest = ''
|
|
longest_i = 0
|
|
for i, Ilp in enumerate(Ilps):
|
|
mode = modes[i]
|
|
split_plate = mode == "square"
|
|
|
|
ocr_result, elements = plate_ocr.run(mode, Ilp, "full", split_plate)
|
|
ocr_end_time = time.time()
|
|
if len(ocr_result) > len(longest):
|
|
longest = ocr_result
|
|
longest_i = i
|
|
print(len(Ilps), longest_i, longest)
|
|
ocr_result = longest
|
|
Ilp = Ilps[longest_i]
|
|
|
|
print("\tOCR time:", ocr_end_time - ocr_start_time)
|
|
|
|
confidence = check_plate_quality(elements)
|
|
if confidence > 10 or confidence < 7:
|
|
print("\t [WARNING] Too long/short plate:", ocr_result)
|
|
confidence = 0
|
|
return make_automatic_res("", track_id, confidence)
|
|
|
|
ocr_result = ocr_result.replace("-", "")
|
|
if config["save_logs_api"]["enable"]:
|
|
log_start_time = time.time()
|
|
res = requests.request(
|
|
config["save_logs_api"]["push_log"]["method"],
|
|
config["save_logs_api"]["push_log"]["url"],
|
|
json={
|
|
"plate": ocr_result.replace("-", ""),
|
|
"plate_image": base64.b64encode(cv2.imencode(".jpg", Ilp)[1]).decode("utf-8"),
|
|
"frame_image": frame64,
|
|
"gate_id" : gate_id,
|
|
"type": type_come,
|
|
"time": data['frametime']
|
|
}
|
|
)
|
|
log_end_time = time.time()
|
|
print("\tAPI time:", log_end_time - log_start_time)
|
|
|
|
dataRes = json.loads(res.text)
|
|
print ("\tWebServer response:", dataRes["status"])
|
|
if dataRes["status"] and config["open_door_api"]["enable"]:
|
|
zk_start_time = time.time()
|
|
if config["os"] == "linux":
|
|
requests.post(config['open_door'][type_come]['url'], json={"door": config['open_door'][type_come]['door']})
|
|
if config["os"] == "windows":
|
|
requests.post(config["open_door"][type_come]["url"], headers={"Content-Type" : "text/html","doorId": config["open_door"][type_come]["door"]})
|
|
zk_end_time = time.time()
|
|
print("\tZK time:", zk_end_time - zk_start_time)
|
|
|
|
end_time = time.time()
|
|
print("\tTotal time:", end_time - start_time)
|
|
ocr_result = ocr_result.replace("-", "")
|
|
if config['save_img']:
|
|
cv2.imwrite("img2/"+str(start_time)+"_" + ocr_result + ".jpg", Ilp)
|
|
cv2.imwrite("img2/"+str(start_time)+"_frame.jpg", frame_np)
|
|
return make_automatic_res(ocr_result, track_id, confidence)
|
|
except Exception as ex:
|
|
print(ex)
|
|
return make_automatic_res("", track_id, 0)
|
|
"""
|
|
Description: API to get plate OCR from image of plate
|
|
Note: This API is not exacted than get-from-frame in most situations
|
|
because it does not re-detect plate to align the plate
|
|
Input:
|
|
img64: base64 img string,
|
|
mode: "long" or "square",
|
|
display: "full" or "row2" (This options is applied into square plates having 2 lines)
|
|
Output:
|
|
ocr: ""
|
|
"""
|
|
|
|
|
|
@app.route("/get-from-plate", methods=["POST"])
|
|
def get_from_plate():
|
|
try:
|
|
print_current_time()
|
|
print (request.path)
|
|
start_time = time.time()
|
|
data = request.json
|
|
|
|
img64 = data.get("img64", "")
|
|
if img64 == "": return {"ocr": ""}
|
|
|
|
mode = data.get("mode", "long") # ["long", "square"]
|
|
display = data.get("display", "full") # ["full", "row2"]
|
|
print("\t Mode:", mode)
|
|
|
|
img_binary = extract_from_base64_str(img64)
|
|
img_np = imageio.imread(io.BytesIO(img_binary))
|
|
ocr_result, elements = plate_ocr.run(mode, img_np, display, False)
|
|
end_time = time.time()
|
|
print("\tTotal time:", end_time - start_time)
|
|
return {"ocr": ocr_result}
|
|
except Exception as ex:
|
|
print (ex)
|
|
|
|
|
|
"""
|
|
Description: API to get plate OCR from a large image that contains the plate
|
|
Note: This API requires the input image is squared evenif OCR on long plate !!!
|
|
For best result, resize input img to 272x272
|
|
Input:
|
|
img64: base64 img string,
|
|
mode: "long" or "square",
|
|
display: "full" or "row2" (This options is applied into square plates having 2 lines)
|
|
Output:
|
|
ocr: "",
|
|
plate: "" (img base64 string for plate image)
|
|
"""
|
|
|
|
|
|
@app.route("/get-from-frame", methods=["POST"])
|
|
def get_from_frame():
|
|
try:
|
|
print_current_time()
|
|
print(request.path)
|
|
start_time = time.time()
|
|
data = request.json
|
|
img64 = data.get("img64", "")
|
|
if img64 == "": return {"ocr": "", "plate": ""}
|
|
mode = data.get("mode", "long") # ["long", "square"]
|
|
display = data.get("display", "full") # ["full", "row2"]
|
|
img_binary = extract_from_base64_str(img64)
|
|
img_np = imageio.imread(io.BytesIO(img_binary))
|
|
Ilp, mode = plate_detect.run(img_np)
|
|
# detect_stop = time.time()
|
|
# print "Detect time:", detect_stop - detect_start
|
|
if Ilp is None: return {"ocr": "", "plate": ""}
|
|
print("\tMode:", mode)
|
|
|
|
split_plate = mode == "square"
|
|
ocr_result, elements = plate_ocr.run(mode, Ilp, display, split_plate)
|
|
plate_result = base64.b64encode(cv2.imencode(".jpg", Ilp)[1]).decode("utf-8")
|
|
|
|
end_time = time.time()
|
|
print("\tTotal time:", end_time - start_time)
|
|
return {
|
|
"ocr": ocr_result,
|
|
"plate": plate_result
|
|
}
|
|
|
|
except Exception as ex:
|
|
print(ex)
|