68 lines
2.2 KiB
Python
Executable File
68 lines
2.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import uuid
|
|
import json
|
|
import base64
|
|
import tempfile
|
|
import subprocess
|
|
import pulsar
|
|
|
|
client = pulsar.Client('pulsar://localhost:6650')
|
|
|
|
worker_id = str(uuid.uuid4())
|
|
producer = client.create_producer('estor')
|
|
result_producer = None
|
|
consumer = client.subscribe('roi', f'roi-worker-{worker_id}')
|
|
|
|
def import_file(fname, content):
|
|
print('Import file', fname, len(content))
|
|
producer.send(fname.encode() + b'\0' + content)
|
|
|
|
def make_roi_path(orig):
|
|
comps = [x for x in orig.split('/') if x.strip()]
|
|
new_comps = comps[:3] + ['roi'] + comps[4:]
|
|
return '/' + '/'.join(new_comps)
|
|
|
|
def handle_qr(fname, content, result_topic):
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg") as tf:
|
|
tf.write(content)
|
|
tf.flush()
|
|
cmd = ['./qrtool', 'roi', tf.name]
|
|
subprocess.check_call(cmd)
|
|
newfile = fname + ".roi.jpg"
|
|
with open(tf.name + ".roi.jpg", 'rb') as f:
|
|
roi_data = f.read()
|
|
global result_producer
|
|
if not result_producer or result_producer.topic() != result_topic:
|
|
result_producer = client.create_producer(result_topic)
|
|
roi_path = make_roi_path(fname)
|
|
resp = {
|
|
'path': fname,
|
|
'succeeded': True,
|
|
'output_files': [{
|
|
'path': roi_path,
|
|
'data_b64': base64.b64encode(roi_data).decode(),
|
|
}],
|
|
'size': len(content),
|
|
}
|
|
result_producer.send(json.dumps(resp).encode())
|
|
|
|
def roi_worker():
|
|
while True:
|
|
msg = consumer.receive()
|
|
try:
|
|
body = msg.data()
|
|
print("Received message id='{}'".format(msg.message_id()))
|
|
payload = json.loads(body)
|
|
fname = payload['path']
|
|
content = base64.b64decode(payload['data_b64'])
|
|
handle_qr(fname, content, payload['result_topic'])
|
|
consumer.acknowledge(msg)
|
|
except Exception as e:
|
|
print(e)
|
|
consumer.negative_acknowledge(msg)
|
|
|
|
# import_file("/emblem/batches/test-batch/import/test.jpg", open('/etc/fstab', 'rb').read())
|
|
roi_worker()
|