themblem/research/server.py
2025-03-25 13:29:58 +08:00

257 lines
7.9 KiB
Python
Executable File

#!/usr/bin/env python3
import subprocess
from flask import Flask, request, jsonify, send_file
from datetime import datetime
import os
import json
import uuid
import shutil
import base64
from io import BytesIO
import tempfile
import cv2
import numpy as np
from roi_lib import *
app = Flask(__name__)
DATA_DIR = os.environ.get('DATA_DIR', 'data')
os.makedirs(DATA_DIR, exist_ok=True)
def image_data_url_to_binary(image_data_url):
if image_data_url.startswith('data:image/jpeg;base64,'):
return base64.b64decode(image_data_url.split(',')[1]), 'image/jpeg'
elif image_data_url.startswith('data:image/png;base64,'):
return base64.b64decode(image_data_url.split(',')[1]), 'image/png'
else:
raise ValueError(f"Unsupported image data URL: {image_data_url}")
@app.route('/')
def index():
return "Emblem research API\n"
@app.route('/event/<category>', methods=['POST'])
def event(category):
d = os.path.join(DATA_DIR, category, datetime.now().strftime("%Y-%m-%d"))
os.makedirs(d, exist_ok=True)
fname = datetime.now().strftime("%H%M%S") + '-' + str(uuid.uuid4())
with open(os.path.join(d, fname), 'wb') as f:
data = request.get_data()
f.write(data)
return {
"ok": True,
"message": "Event saved",
}
def get_folders():
return [
x for x in os.listdir(DATA_DIR + "/json")
if os.path.isdir(os.path.join(DATA_DIR, "json", x))
]
@app.route('/api/folders')
def folders():
x = get_folders()
ret = []
for d in x:
ret.append({
"folder": d,
"count": len(os.listdir(os.path.join(DATA_DIR, "json", d))),
})
return {
"ok": True,
"folders": ret,
}
@app.route("/api/qrtool", methods=["POST"])
def qrtool():
data = request.get_json()
image_data_url = data.get("image_data_url")
binary, mimetype = image_data_url_to_binary(image_data_url)
if mimetype == 'image/png':
suffix = ".png"
elif mimetype == 'image/jpeg':
suffix = ".jpg"
else:
raise ValueError(f"Unsupported image type: {mimetype}")
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
f.write(binary)
qrtool = os.path.abspath("../alg/qrtool")
if not os.path.exists(qrtool):
return {
"ok": False,
"error": f"qrtool not found: {qrtool}",
}
cmd = ["./qrtool", data['cmd'], f.name]
print(cmd)
try:
sp = subprocess.Popen(cmd, cwd=os.path.dirname(qrtool), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = sp.communicate()
return {
"exit_code": sp.returncode,
"stdout": stdout.decode('utf-8'),
"stderr": stderr.decode('utf-8'),
}
except Exception as e:
return {
"ok": False,
"error": str(e),
}
@app.route('/api/datapoints')
def datapoints():
folder = request.args.get('folder')
start = int(request.args.get('start', 0))
count = int(request.args.get('count', 10))
folder_dir = os.path.join(DATA_DIR, "json", folder)
x = os.listdir(folder_dir)
total = len(x)
x = x[start:start+count]
datapoints = []
for f in x:
try:
datapoints.append(load_datapoint(os.path.join(folder_dir, f)))
except Exception as e:
print(e)
return {
"ok": True,
"datapoints": datapoints,
"total": total,
"start": start,
"count": count,
}
def load_datapoint(path):
with open(path, 'rb') as f:
data = json.load(f)
data["path"] = path
data["datetime"] = get_file_datetime(path)
return data
def get_file_datetime(path):
return datetime.fromtimestamp(os.path.getmtime(path))
@app.route('/api/image')
def image():
folder = request.args.get('folder')
name = request.args.get('name')
path = os.path.join(DATA_DIR, "json", folder, name)
with open(path, 'rb') as f:
data = json.load(f)
binary, mimetype = image_data_url_to_binary(data["image_data_url"])
buf = BytesIO(binary)
return send_file(buf, mimetype=mimetype)
@app.route('/api/datapoint')
def datapoint():
folder = request.args.get('folder')
name = request.args.get('name')
path = os.path.join(DATA_DIR, "json", folder, name)
return {
"ok": True,
"datapoint": load_datapoint(path),
}
def do_copy_to_folder(paths, target_folder):
for path in paths:
src = os.path.join(DATA_DIR, "json", path)
dst = os.path.join(DATA_DIR, "json", target_folder)
os.makedirs(dst, exist_ok=True)
print(f"Copying {src} to {dst}")
shutil.copy2(src, dst)
@app.route('/api/copy-to-folder', methods=['POST'])
def copy_to_folder():
data = request.get_json()
paths = data.get('paths')
target_folder = data.get('target_folder')
do_copy_to_folder(paths, target_folder)
return {
"ok": True,
"message": "Datapoints copied to folder",
}
def do_qrtool(image, cmd):
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as f:
image.save(f.name)
qrtool = os.path.abspath("../alg/qrtool")
cmd = ["./qrtool", cmd, f.name]
print(cmd)
sp = subprocess.Popen(cmd, cwd=os.path.dirname(qrtool), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = sp.communicate()
return stdout.decode('utf-8')
def calc_sharpness(image_data_url):
image = image_data_url_to_image(image_data_url)
# 转换为灰度图像
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# 高斯滤波
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
# 计算Sobel梯度
sobelx = cv2.Sobel(blurred, cv2.CV_64F, 1, 0, ksize=3)
sobely = cv2.Sobel(blurred, cv2.CV_64F, 0, 1, ksize=3)
# 计算梯度幅值
magnitude = np.sqrt(sobelx**2 + sobely**2)
# 计算清晰度
sharpness = np.mean(magnitude) / (image.shape[0] * image.shape[1]) * 1000
return sharpness
def image_data_url_to_image(image_data_url):
binary, mimetype = image_data_url_to_binary(image_data_url)
return cv2.imdecode(np.frombuffer(binary, np.uint8), cv2.IMREAD_COLOR)
def get_image_data_url(path):
mimetype = "image/jpeg"
if path.endswith(".png"):
mimetype = "image/png"
with open(path, "rb") as f:
binary = f.read()
return "data:" + mimetype + ";base64," + base64.b64encode(binary).decode('utf-8')
def get_image_dot_area(image):
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as f:
cv2.imwrite(f.name, image)
qrtool = os.path.abspath("../alg/qrtool")
cmd = ["./qrtool", "dot", f.name]
if 0 == subprocess.call(cmd, cwd=os.path.dirname(qrtool)):
return get_image_data_url(f.name + ".dot.jpg")
def analyze_datapoint(data):
image = image_data_url_to_image(data['image_data_url'])
# data["clarity"] = do_qrtool(image, "clarity").split()[-1]
# data["analyze_result"] = data
data["analyze_result"] = {}
data["dot_area"] = get_image_dot_area(image)
if data["dot_area"]:
data["analyze_result"]["dot_area_sharpness"] = calc_sharpness(data["dot_area"])
return data
@app.route('/api/analyze', methods=['POST'])
def analyze():
data = request.get_json()
analyze_datapoint(data)
return data
@app.route('/api/roi-verify', methods=['POST'])
def roi_verify():
files = request.files
side_by_side_fn = files['side_by_side']
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as f:
side_by_side_fn.save(f.name)
img = Image.open(f.name)
img = preprocess_image(img)
predicted_class, probabilities = predict(model, image_tensor)
return {
"ok": True,
"predicted_class": predicted_class,
"probabilities": probabilities,
}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=26966, debug=True)