#!/usr/bin/env python3 import os import uuid import json import base64 import tempfile import subprocess import pulsar client = pulsar.Client('pulsar://localhost:6650') worker_id = str(uuid.uuid4()) producer = client.create_producer('estor') result_producer = None consumer = client.subscribe('roi', f'roi-worker-{worker_id}') def import_file(fname, content): print('Import file', fname, len(content)) producer.send(fname.encode() + b'\0' + content) def make_roi_path(orig): comps = [x for x in orig.split('/') if x.strip()] new_comps = comps[:3] + ['roi'] + comps[4:] return '/' + '/'.join(new_comps) def handle_qr(fname, content, result_topic): with tempfile.NamedTemporaryFile(suffix=".jpg") as tf: tf.write(content) tf.flush() cmd = ['./qrtool', 'roi', tf.name] subprocess.check_call(cmd) newfile = fname + ".roi.jpg" with open(tf.name + ".roi.jpg", 'rb') as f: roi_data = f.read() global result_producer if not result_producer or result_producer.topic() != result_topic: result_producer = client.create_producer(result_topic) roi_path = make_roi_path(fname) resp = { 'path': fname, 'succeeded': True, 'output_files': [{ 'path': roi_path, 'data_b64': base64.b64encode(roi_data).decode(), }], 'size': len(content), } result_producer.send(json.dumps(resp).encode()) def roi_worker(): while True: msg = consumer.receive() try: body = msg.data() print("Received message id='{}'".format(msg.message_id())) payload = json.loads(body) fname = payload['path'] content = base64.b64decode(payload['data_b64']) handle_qr(fname, content, payload['result_topic']) consumer.acknowledge(msg) except Exception as e: print(e) consumer.negative_acknowledge(msg) # import_file("/emblem/batches/test-batch/import/test.jpg", open('/etc/fstab', 'rb').read()) roi_worker()