themblem/emblem5/ai/fetch-scans.py
2025-10-29 21:27:29 +00:00

150 lines
5.4 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import os
import requests
import json
import multiprocessing as mp
from loguru import logger
import shutil
from PIL import Image
from ossclient import *
from common import *
import io
from tqdm import tqdm
import datetime
data_dir = 'data'
class ScanDataFetcher(object):
def __init__(self):
self.token = '3ebd8c33-f46e-4b06-bda8-4c0f5f5eb530'
def make_headers(self):
return {
'Authorization': f'Token {self.token}'
}
def load_local_scan_data(self):
ret = {}
scans_dir = os.path.join(data_dir, 'scans')
os.makedirs(scans_dir, exist_ok=True)
for scan_id in os.listdir(scans_dir):
scan_dir = os.path.join(scans_dir, scan_id)
if not os.path.isdir(scan_dir):
continue
fetch_state_path = os.path.join(scan_dir, 'fetch-state.json')
if not os.path.exists(fetch_state_path):
continue
metadata_path = os.path.join(scan_dir, 'metadata.json')
if not os.path.exists(metadata_path):
continue
md = json.load(open(metadata_path))
ret[md['id']] = md
return ret
def fetch(self, sample_rate=None):
local_scan_data = self.load_local_scan_data()
logger.info(f'local_scan_data: {len(local_scan_data)}')
url = 'https://themblem.com/api/v1/scan-data-labels/'
r = requests.get(url, headers=self.make_headers())
data = r.json()
fetch_backlog = []
for item in data['items']:
if 'code' not in item or 'id' not in item or not item.get('labels') or 'image' not in item:
continue
if item['id'] in local_scan_data:
local_labels = local_scan_data[item['id']]['labels']
if local_labels == item['labels']:
continue
fetch_backlog.append(item)
if sample_rate:
fetch_backlog = random.sample(fetch_backlog, int(len(fetch_backlog) * sample_rate))
logger.info(f'fetch_backlog: {len(fetch_backlog)}')
pool = mp.Pool(mp.cpu_count() * 4)
counts = defaultdict(int)
for r in tqdm(pool.imap_unordered(self.fetch_one_scan, fetch_backlog), total=len(fetch_backlog)):
counts[r] += 1
logger.info(f'counts: {counts}')
pool.close()
pool.join()
def fetch_one_scan(self, scan):
try:
self.do_fetch_one_scan(scan)
return 'ok'
except Exception as e:
scan_dir = os.path.join(data_dir, 'scans', str(scan['id']))
fetch_state_path = os.path.join(scan_dir, 'fetch-state.json')
with open(fetch_state_path, 'w') as f:
json.dump({
'status': 'error',
'timestamp': datetime.datetime.now().isoformat(),
'scan_id': scan['id'],
'labels': scan.get('labels', ''),
'error': str(e)
}, f, indent=2)
return 'error'
def do_fetch_one_scan(self, scan):
scan_dir = os.path.join(data_dir, 'scans', str(scan['id']))
os.makedirs(scan_dir, exist_ok=True)
# Check if fetch-state.json exists, if so skip this scan
fetch_state_path = os.path.join(scan_dir, 'fetch-state.json')
if os.path.exists(fetch_state_path):
return
metadata_path = os.path.join(scan_dir, 'metadata.json')
metadata_str = json.dumps(scan, indent=2)
frame_img_url = f'https://themblem.com/api/v1/oss-image/?token={self.token}&name={scan["image"]}'
frame_img_file = os.path.join(scan_dir, 'frame.jpg')
if not os.path.exists(frame_img_file):
frame_img_bytes = requests.get(frame_img_url).content
with open(frame_img_file, 'wb') as f:
f.write(frame_img_bytes)
std_img_file = os.path.join(scan_dir, 'std.jpg')
if not os.path.exists(std_img_file):
std_img = Image.open(io.BytesIO(get_qr_image_bytes(scan['code'])))
std_img.save(std_img_file)
with open(metadata_path, 'w') as f:
f.write(metadata_str)
frame_qr_img_file = os.path.join(scan_dir, 'frame-qr.jpg')
if not os.path.exists(frame_qr_img_file):
frame_img = Image.open(frame_img_file)
_, frame_qr = extract_qr(frame_img)
frame_qr.save(frame_qr_img_file)
std_qr_img_file = os.path.join(scan_dir, 'std-qr.jpg')
if not os.path.exists(std_qr_img_file):
std_img = Image.open(std_img_file)
_, std_qr = extract_qr(std_img)
std_qr.save(std_qr_img_file)
# Create fetch-state.json to mark successful completion
fetch_state = {
'status': 'completed',
'timestamp': datetime.datetime.now().isoformat(),
'scan_id': scan['id'],
'labels': scan.get('labels', ''),
}
with open(fetch_state_path, 'w') as f:
json.dump(fetch_state, f, indent=2)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--data-dir', type=str, default='data')
parser.add_argument('--sample-rate', '-r', type=float)
return parser.parse_args()
def main():
args = parse_args()
global data_dir
data_dir = args.data_dir
fetcher = ScanDataFetcher()
logger.info('fetch')
fetcher.fetch(args.sample_rate)
if __name__ == "__main__":
main()