#!/usr/bin/env python3 import argparse import os import requests import json import multiprocessing as mp from loguru import logger import shutil from PIL import Image from ossclient import * from common import * import io from tqdm import tqdm import datetime from urllib.parse import quote data_dir = 'data' class ScanDataFetcher(object): def __init__(self): self.token = '3ebd8c33-f46e-4b06-bda8-4c0f5f5eb530' def make_headers(self): return { 'Authorization': f'Token {self.token}' } def load_local_scan_data(self): ret = {} scans_dir = os.path.join(data_dir, 'scans') os.makedirs(scans_dir, exist_ok=True) for scan_id in os.listdir(scans_dir): scan_dir = os.path.join(scans_dir, scan_id) if not os.path.isdir(scan_dir): continue fetch_state_path = os.path.join(scan_dir, 'fetch-state.json') if not os.path.exists(fetch_state_path): continue metadata_path = os.path.join(scan_dir, 'metadata.json') if not os.path.exists(metadata_path): continue md = json.load(open(metadata_path)) ret[md['id']] = md return ret def fetch(self, sample_rate=None, scan_ids=None): local_scan_data = self.load_local_scan_data() logger.info(f'local_scan_data: {len(local_scan_data)}') url = 'https://themblem.com/api/v1/scan-data-labels/' r = requests.get(url, headers=self.make_headers(), timeout=30) data = r.json() fetch_backlog = [] scan_ids_set = set(scan_ids) if scan_ids else None for item in data['items']: if 'code' not in item or 'id' not in item or not item.get('labels') or 'image' not in item or not item.get('image'): continue if scan_ids_set and item['id'] not in scan_ids_set: continue if item['id'] in local_scan_data: local_labels = local_scan_data[item['id']]['labels'] if local_labels == item['labels']: continue fetch_backlog.append(item) if sample_rate: fetch_backlog = random.sample(fetch_backlog, int(len(fetch_backlog) * sample_rate)) logger.info(f'fetch_backlog: {len(fetch_backlog)}') pool = mp.Pool(min(8, mp.cpu_count())) counts = defaultdict(int) for r in tqdm(pool.imap_unordered(self.fetch_one_scan, fetch_backlog), total=len(fetch_backlog)): counts[r] += 1 logger.info(f'counts: {counts}') pool.close() pool.join() def fetch_one_scan(self, scan): try: self.do_fetch_one_scan(scan) return 'ok' except Exception as e: scan_dir = os.path.join(data_dir, 'scans', str(scan['id'])) fetch_state_path = os.path.join(scan_dir, 'fetch-state.json') with open(fetch_state_path, 'w') as f: json.dump({ 'status': 'error', 'timestamp': datetime.datetime.now().isoformat(), 'scan_id': scan['id'], 'labels': scan.get('labels', ''), 'error': str(e) }, f, indent=2) return 'error' def do_fetch_one_scan(self, scan): scan_dir = os.path.join(data_dir, 'scans', str(scan['id'])) os.makedirs(scan_dir, exist_ok=True) # Check if fetch-state.json exists, if so skip this scan fetch_state_path = os.path.join(scan_dir, 'fetch-state.json') if os.path.exists(fetch_state_path): return metadata_path = os.path.join(scan_dir, 'metadata.json') metadata_str = json.dumps(scan, indent=2) frame_img_url = f'https://themblem.com/api/v1/oss-image/?token={self.token}&name={quote(scan["image"])}' frame_img_file = os.path.join(scan_dir, 'frame.jpg') if not os.path.exists(frame_img_file): r = requests.get(frame_img_url, timeout=30) r.raise_for_status() # Raise an exception for bad status codes with open(frame_img_file, 'wb') as f: f.write(r.content) std_img_file = os.path.join(scan_dir, 'std.jpg') if not os.path.exists(std_img_file): std_img = Image.open(io.BytesIO(get_qr_image_bytes(scan['code']))) std_img.save(std_img_file) with open(metadata_path, 'w') as f: f.write(metadata_str) frame_qr_img_file = os.path.join(scan_dir, 'frame-qr.jpg') if not os.path.exists(frame_qr_img_file): frame_img = Image.open(frame_img_file) _, frame_qr = extract_qr(frame_img) frame_qr.save(frame_qr_img_file) std_qr_img_file = os.path.join(scan_dir, 'std-qr.jpg') if not os.path.exists(std_qr_img_file): std_img = Image.open(std_img_file) _, std_qr = extract_qr(std_img) std_qr.save(std_qr_img_file) # Create fetch-state.json to mark successful completion fetch_state = { 'status': 'completed', 'timestamp': datetime.datetime.now().isoformat(), 'scan_id': scan['id'], 'labels': scan.get('labels', ''), } with open(fetch_state_path, 'w') as f: json.dump(fetch_state, f, indent=2) def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--data-dir', type=str, default='data') parser.add_argument('--sample-rate', '-r', type=float) parser.add_argument('--scan-ids', type=str, help='Comma-separated list of scan IDs to fetch') return parser.parse_args() def main(): args = parse_args() global data_dir data_dir = args.data_dir fetcher = ScanDataFetcher() logger.info('fetch') scan_ids = [int(x.strip()) for x in args.scan_ids.split(',')] if args.scan_ids else None fetcher.fetch(args.sample_rate, scan_ids) if __name__ == "__main__": main()