themblem/alg/worker.py
2024-09-01 21:51:50 +01:00

68 lines
2.2 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import uuid
import json
import base64
import tempfile
import subprocess
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
worker_id = str(uuid.uuid4())
producer = client.create_producer('estor')
result_producer = None
consumer = client.subscribe('roi', f'roi-worker-{worker_id}')
def import_file(fname, content):
print('Import file', fname, len(content))
producer.send(fname.encode() + b'\0' + content)
def make_roi_path(orig):
comps = [x for x in orig.split('/') if x.strip()]
new_comps = comps[:3] + ['roi'] + comps[4:]
return '/' + '/'.join(new_comps)
def handle_qr(fname, content, result_topic):
with tempfile.NamedTemporaryFile(suffix=".jpg") as tf:
tf.write(content)
tf.flush()
cmd = ['./qrtool', 'roi', tf.name]
subprocess.check_call(cmd)
newfile = fname + ".roi.jpg"
with open(tf.name + ".roi.jpg", 'rb') as f:
roi_data = f.read()
global result_producer
if not result_producer or result_producer.topic() != result_topic:
result_producer = client.create_producer(result_topic)
roi_path = make_roi_path(fname)
resp = {
'path': fname,
'succeeded': True,
'output_files': [{
'path': roi_path,
'data_b64': base64.b64encode(roi_data).decode(),
}],
'size': len(content),
}
result_producer.send(json.dumps(resp).encode())
def roi_worker():
while True:
msg = consumer.receive()
try:
body = msg.data()
print("Received message id='{}'".format(msg.message_id()))
payload = json.loads(body)
fname = payload['path']
content = base64.b64decode(payload['data_b64'])
handle_qr(fname, content, payload['result_topic'])
consumer.acknowledge(msg)
except Exception as e:
print(e)
consumer.negative_acknowledge(msg)
# import_file("/emblem/batches/test-batch/import/test.jpg", open('/etc/fstab', 'rb').read())
roi_worker()