#!/usr/bin/env python3 import subprocess from flask import Flask, request, jsonify, send_file from datetime import datetime import os import json import uuid import shutil import base64 from io import BytesIO import tempfile import cv2 import numpy as np app = Flask(__name__) DATA_DIR = os.environ.get('DATA_DIR', 'data') os.makedirs(DATA_DIR, exist_ok=True) def image_data_url_to_binary(image_data_url): if image_data_url.startswith('data:image/jpeg;base64,'): return base64.b64decode(image_data_url.split(',')[1]), 'image/jpeg' elif image_data_url.startswith('data:image/png;base64,'): return base64.b64decode(image_data_url.split(',')[1]), 'image/png' else: raise ValueError(f"Unsupported image data URL: {image_data_url}") @app.route('/') def index(): return "Emblem research API\n" @app.route('/event/', methods=['POST']) def event(category): d = os.path.join(DATA_DIR, category, datetime.now().strftime("%Y-%m-%d")) os.makedirs(d, exist_ok=True) fname = datetime.now().strftime("%H%M%S") + '-' + str(uuid.uuid4()) with open(os.path.join(d, fname), 'wb') as f: data = request.get_data() f.write(data) return { "ok": True, "message": "Event saved", } def get_folders(): return [ x for x in os.listdir(DATA_DIR + "/json") if os.path.isdir(os.path.join(DATA_DIR, "json", x)) ] @app.route('/api/folders') def folders(): x = get_folders() ret = [] for d in x: ret.append({ "folder": d, "count": len(os.listdir(os.path.join(DATA_DIR, "json", d))), }) return { "ok": True, "folders": ret, } @app.route("/api/qrtool", methods=["POST"]) def qrtool(): data = request.get_json() image_data_url = data.get("image_data_url") binary, mimetype = image_data_url_to_binary(image_data_url) if mimetype == 'image/png': suffix = ".png" elif mimetype == 'image/jpeg': suffix = ".jpg" else: raise ValueError(f"Unsupported image type: {mimetype}") with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f: f.write(binary) qrtool = os.path.abspath("../alg/qrtool") if not os.path.exists(qrtool): return { "ok": False, "error": f"qrtool not found: {qrtool}", } cmd = ["./qrtool", data['cmd'], f.name] print(cmd) try: sp = subprocess.Popen(cmd, cwd=os.path.dirname(qrtool), stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = sp.communicate() return { "exit_code": sp.returncode, "stdout": stdout.decode('utf-8'), "stderr": stderr.decode('utf-8'), } except Exception as e: return { "ok": False, "error": str(e), } @app.route('/api/datapoints') def datapoints(): folder = request.args.get('folder') start = int(request.args.get('start', 0)) count = int(request.args.get('count', 10)) folder_dir = os.path.join(DATA_DIR, "json", folder) x = os.listdir(folder_dir) total = len(x) x = x[start:start+count] datapoints = [] for f in x: try: datapoints.append(load_datapoint(os.path.join(folder_dir, f))) except Exception as e: print(e) return { "ok": True, "datapoints": datapoints, "total": total, "start": start, "count": count, } def load_datapoint(path): with open(path, 'rb') as f: data = json.load(f) data["path"] = path data["datetime"] = get_file_datetime(path) return data def get_file_datetime(path): return datetime.fromtimestamp(os.path.getmtime(path)) @app.route('/api/image') def image(): folder = request.args.get('folder') name = request.args.get('name') path = os.path.join(DATA_DIR, "json", folder, name) with open(path, 'rb') as f: data = json.load(f) binary, mimetype = image_data_url_to_binary(data["image_data_url"]) buf = BytesIO(binary) return send_file(buf, mimetype=mimetype) @app.route('/api/datapoint') def datapoint(): folder = request.args.get('folder') name = request.args.get('name') path = os.path.join(DATA_DIR, "json", folder, name) return { "ok": True, "datapoint": load_datapoint(path), } def do_copy_to_folder(paths, target_folder): for path in paths: src = os.path.join(DATA_DIR, "json", path) dst = os.path.join(DATA_DIR, "json", target_folder) os.makedirs(dst, exist_ok=True) print(f"Copying {src} to {dst}") shutil.copy2(src, dst) @app.route('/api/copy-to-folder', methods=['POST']) def copy_to_folder(): data = request.get_json() paths = data.get('paths') target_folder = data.get('target_folder') do_copy_to_folder(paths, target_folder) return { "ok": True, "message": "Datapoints copied to folder", } def do_qrtool(image, cmd): with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as f: image.save(f.name) qrtool = os.path.abspath("../alg/qrtool") cmd = ["./qrtool", cmd, f.name] print(cmd) sp = subprocess.Popen(cmd, cwd=os.path.dirname(qrtool), stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = sp.communicate() return stdout.decode('utf-8') def calc_sharpness(image_data_url): image = image_data_url_to_image(image_data_url) # 转换为灰度图像 gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # 高斯滤波 blurred = cv2.GaussianBlur(gray, (5, 5), 0) # 计算Sobel梯度 sobelx = cv2.Sobel(blurred, cv2.CV_64F, 1, 0, ksize=3) sobely = cv2.Sobel(blurred, cv2.CV_64F, 0, 1, ksize=3) # 计算梯度幅值 magnitude = np.sqrt(sobelx**2 + sobely**2) # 计算清晰度 sharpness = np.mean(magnitude) / (image.shape[0] * image.shape[1]) * 1000 return sharpness def image_data_url_to_image(image_data_url): binary, mimetype = image_data_url_to_binary(image_data_url) return cv2.imdecode(np.frombuffer(binary, np.uint8), cv2.IMREAD_COLOR) def get_image_data_url(path): mimetype = "image/jpeg" if path.endswith(".png"): mimetype = "image/png" with open(path, "rb") as f: binary = f.read() return "data:" + mimetype + ";base64," + base64.b64encode(binary).decode('utf-8') def get_image_dot_area(image): with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as f: cv2.imwrite(f.name, image) qrtool = os.path.abspath("../alg/qrtool") cmd = ["./qrtool", "dot", f.name] if 0 == subprocess.call(cmd, cwd=os.path.dirname(qrtool)): return get_image_data_url(f.name + ".dot.jpg") def analyze_datapoint(data): image = image_data_url_to_image(data['image_data_url']) # data["clarity"] = do_qrtool(image, "clarity").split()[-1] # data["analyze_result"] = data data["analyze_result"] = {} data["dot_area"] = get_image_dot_area(image) if data["dot_area"]: data["analyze_result"]["dot_area_sharpness"] = calc_sharpness(data["dot_area"]) return data @app.route('/api/analyze', methods=['POST']) def analyze(): data = request.get_json() analyze_datapoint(data) return data if __name__ == '__main__': app.run(host='0.0.0.0', port=26966, debug=True)