import imageio import io import requests from flask import Flask, request, jsonify, make_response import base64 import cv2 import time from datetime import datetime import json import plate_detect import plate_ocr config = {} with open("config.json", "r") as config_file: config = json.load(config_file) def print_current_time(): now = datetime.now() print("\n", now) def extract_from_base64_str(img64): splits = img64.split(";base64,") if len(splits) == 2: img64 = splits[1] img_binary = base64.b64decode(img64) # img = io_imread(io.BytesIO(base64.b64decode(b64_string))) return img_binary def check_plate_quality(lp_elements): head, mid, tail = lp_elements # if len(tail) < 4: return 0 return len(head) + len(mid) + len(tail) def make_automatic_res(plate, track_id, confidence): return { "vehicle": { "plate": plate, "vehicle_id": str(track_id), "confidence": str(confidence), "open_door": "0" } } app = Flask(__name__) """ Description: API for intops factory only Input: {track_id, image, mode, type, frame} Output: {name, person_id (track_id), confidence, open_door} """ @app.route("/automatic/get-from-frame", methods=["POST"]) def get_from_frame_automatic(): data = request.json track_id = data.get("vehicle_id", 0) try: print_current_time() print (request.path) start_time = time.time() img64 = data.get("image", "") mode = data.get("mode", "long") # ["long", "square"] type_come = data.get("type", "in") # in / out gate_id = data.get("gate_id", "1") frame64 = data.get("frame", "") img_binary = extract_from_base64_str(img64) img_np = imageio.imread(io.BytesIO(img_binary)) frame_binary= extract_from_base64_str(frame64) frame_np = imageio.imread(io.BytesIO(frame_binary)) frame_np = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR) if config['save_img']: cv2.imwrite("img/"+str(start_time)+"_frame.jpg", frame_np) # cv2.imwrite("plate.jpg", Ilp) Ilp, mode = plate_detect.run(img_np) if Ilp is None: return make_automatic_res("", track_id, 0) split_plate = mode == "square" ocr_result, elements = plate_ocr.run(mode, Ilp, "full", split_plate) confidence = check_plate_quality(elements) if confidence > 10 or confidence < 7: print("\t [WARNING] Too long/short plate:", ocr_result) confidence = 0 return make_automatic_res("", track_id, confidence) if config['save_img']: cv2.imwrite("img/"+str(start_time)+"_" + ocr_result + ".jpg", Ilp) if config["save_logs_api"]["enable"]: log_start_time = time.time() res = requests.request( config["save_logs_api"]["push_log"]["method"], config["save_logs_api"]["push_log"]["url"], json={ "plate": ocr_result.replace("-", ""), "plate_image": base64.b64encode(cv2.imencode(".jpg", Ilp)[1]).decode("utf-8"), "frame_image": frame64, "gate_id" : gate_id, "type": type_come, "time": data['frametime'] } ) log_end_time = time.time() print("\tAPI time:", log_end_time - log_start_time) dataRes = json.loads(res.text) print ("\tWebServer response:", dataRes["status"]) if dataRes["status"] and config["open_door_api"]["enable"]: zk_start_time = time.time() if config["os"] == "linux": requests.post(config['open_door'][type_come]['url'], json={"door": config['open_door'][type_come]['door']}) if config["os"] == "windows": requests.post(config["open_door"][type_come]["url"], headers={"Content-Type" : "text/html","doorId": config["open_door"][type_come]["door"]}) zk_end_time = time.time() print("\tZK time:", zk_end_time - zk_start_time) end_time = time.time() print("\tTotal time:", end_time - start_time) return make_automatic_res(ocr_result, track_id, confidence) except Exception as ex: print(ex) return make_automatic_res("", track_id, 0) @app.route("/automatic/vehicle", methods=["POST"]) def get_from_vehicle(): data = request.json track_id = data.get("vehicle_id", 0) try: print_current_time() print (request.path) start_time = time.time() img64 = data.get("image", "") mode = data.get("mode", "long") # ["long", "square"] type_come = data.get("type", "in") # in / out gate_id = data.get("gate_id", "1") frame64 = data.get("frame", "") img_binary = extract_from_base64_str(img64) img_np = imageio.imread(io.BytesIO(img_binary)) frame_binary= extract_from_base64_str(frame64) frame_np = imageio.imread(io.BytesIO(frame_binary)) frame_np = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR) # cv2.imwrite("plate.jpg", Ilp) Ilps, modes = plate_detect.run2(img_np) if Ilps is None: return make_automatic_res("", track_id, 0) longest = '' longest_i = 0 for i, Ilp in enumerate(Ilps): mode = modes[i] split_plate = mode == "square" ocr_result, elements = plate_ocr.run(mode, Ilp, "full", split_plate) if len(ocr_result) > len(longest): longest = ocr_result longest_i = i print(len(Ilps), longest_i, longest) ocr_result = longest Ilp = Ilps[longest_i] confidence = check_plate_quality(elements) if confidence > 10 or confidence < 7: print("\t [WARNING] Too long/short plate:", ocr_result) confidence = 0 return make_automatic_res("", track_id, confidence) ocr_result = ocr_result.replace("-", "") if config["save_logs_api"]["enable"]: log_start_time = time.time() res = requests.request( config["save_logs_api"]["push_log"]["method"], config["save_logs_api"]["push_log"]["url"], json={ "plate": ocr_result.replace("-", ""), "plate_image": base64.b64encode(cv2.imencode(".jpg", Ilp)[1]).decode("utf-8"), "frame_image": frame64, "gate_id" : gate_id, "type": type_come, "time": data['frametime'] } ) log_end_time = time.time() print("\tAPI time:", log_end_time - log_start_time) dataRes = json.loads(res.text) print ("\tWebServer response:", dataRes["status"]) if dataRes["status"] and config["open_door_api"]["enable"]: zk_start_time = time.time() if config["os"] == "linux": requests.post(config['open_door'][type_come]['url'], json={"door": config['open_door'][type_come]['door']}) if config["os"] == "windows": requests.post(config["open_door"][type_come]["url"], headers={"Content-Type" : "text/html","doorId": config["open_door"][type_come]["door"]}) zk_end_time = time.time() print("\tZK time:", zk_end_time - zk_start_time) end_time = time.time() print("\tTotal time:", end_time - start_time) ocr_result = ocr_result.replace("-", "") if config['save_img']: cv2.imwrite("img2/"+str(start_time)+"_" + ocr_result + ".jpg", Ilp) cv2.imwrite("img2/"+str(start_time)+"_frame.jpg", frame_np) return make_automatic_res(ocr_result, track_id, confidence) except Exception as ex: print(ex) return make_automatic_res("", track_id, 0) """ Description: API to get plate OCR from image of plate Note: This API is not exacted than get-from-frame in most situations because it does not re-detect plate to align the plate Input: img64: base64 img string, mode: "long" or "square", display: "full" or "row2" (This options is applied into square plates having 2 lines) Output: ocr: "" """ @app.route("/get-from-plate", methods=["POST"]) def get_from_plate(): try: print_current_time() print (request.path) start_time = time.time() data = request.json img64 = data.get("img64", "") if img64 == "": return {"ocr": ""} mode = data.get("mode", "long") # ["long", "square"] display = data.get("display", "full") # ["full", "row2"] print("\t Mode:", mode) img_binary = extract_from_base64_str(img64) img_np = imageio.imread(io.BytesIO(img_binary)) ocr_result, elements = plate_ocr.run(mode, img_np, display, False) end_time = time.time() print("\tTotal time:", end_time - start_time) return {"ocr": ocr_result} except Exception as ex: print (ex) return {"ocr": ""} """ Description: API to get plate OCR from a large image that contains the plate Note: This API requires the input image is squared evenif OCR on long plate !!! For best result, resize input img to 272x272 Input: img64: base64 img string, mode: "long" or "square", display: "full" or "row2" (This options is applied into square plates having 2 lines) Output: ocr: "", plate: "" (img base64 string for plate image) """ @app.route("/get-from-frame", methods=["POST"]) def get_from_frame(): try: print_current_time() print(request.path) start_time = time.time() data = request.json img64 = data.get("img64", "") if img64 == "": return {"ocr": "", "plate": ""} client_mode = data.get("mode", "long") # ["long", "square"] display = data.get("display", "full") # ["full", "row2"] img_binary = extract_from_base64_str(img64) img_np = imageio.imread(io.BytesIO(img_binary)) Ilp, mode = plate_detect.run(img_np) if Ilp is None: Ilp = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) Ilp = cv2.cvtColor(Ilp, cv2.COLOR_GRAY2BGR) ocr_result, elements = plate_ocr.run(client_mode, Ilp, display, False) end_time = time.time() print("\tTotal time:", end_time - start_time) return {"ocr": ocr_result, "plate": img64} ocr_result, elements = plate_ocr.run(mode, Ilp, display, False) plate_result = base64.b64encode(cv2.imencode(".jpg", Ilp)[1]).decode("utf-8") end_time = time.time() print("\tTotal time:", end_time - start_time) return { "ocr": ocr_result, "plate": plate_result } except Exception as ex: return { "ocr": "", "plate": "" } print(ex)