themblem/alg/mq_worker.cpp
2024-09-01 21:51:50 +01:00

170 lines
4.7 KiB
C++

#include <thread>
#include <iostream>
#include <json/json.h>
#include "base64.h"
#include "mq_worker.h"
#include <pulsar/Client.h>
using namespace pulsar;
using namespace std;
// Define a simple struct with string fields
struct MqMessage {
string space;
string path;
string result_topic;
vector<uint8_t> bytes;
};
struct Response {
string path;
bool succeeded;
string result_path;
vector<uint8_t> output;
size_t size;
string error;
};
static
MqMessage parse_message(const std::string& str) {
Json::CharReaderBuilder builder;
Json::Value root;
std::istringstream jsonStream(str);
Json::parseFromStream(builder, jsonStream, &root, nullptr);
MqMessage msg;
msg.space = root["space"].asString();
msg.path = root["path"].asString();
msg.result_topic = root["result_topic"].asString();
msg.bytes = base64_decode(root["data_b64"].asString());
return msg;
}
static
std::string response_to_json(const Response& msg) {
Json::Value root;
root["path"] = msg.path;
root["succeeded"] = msg.succeeded;
root["size"] = msg.size;
if (msg.error.size()) {
root["error"] = msg.error;
}
Json::Value ofs;
Json::Value of;
of["path"] = msg.result_path;
of["data_b64"] = base64_encode(msg.output.data(), msg.output.size());
ofs.append(of);
root["output_files"] = ofs;
Json::StreamWriterBuilder builder;
std::string str = Json::writeString(builder, root);
return str;
}
int mq_worker(const char *topic, const char *worker_name, handler_fn handle_image) {
Client client("pulsar://localhost:6650");
Producer producer;
string prev_producer_topic;
Consumer consumer;
ConsumerConfiguration config;
config.setConsumerType(ConsumerShared);
config.setSubscriptionInitialPosition(InitialPositionEarliest);
Result result = client.subscribe(topic, worker_name, config, consumer);
if (result != ResultOk) {
cout << "Failed to subscribe: " << result << endl;
return -1;
}
Message mq_msg;
int processed = 0;
int failed = 0;
while (1) {
consumer.receive(mq_msg);
auto payload = mq_msg.getDataAsString();
auto msg = parse_message(payload);
if (processed % 1000 == 0) {
cout << processed << ": " << msg.path << " " << msg.bytes.size() << endl;
}
Response resp;
resp.path = msg.path;
resp.size = msg.bytes.size();
resp.succeeded = true;
int r = handle_image(msg.path,
msg.bytes,
resp.result_path,
resp.output);
if (r) {
resp.succeeded = false;
resp.error = string("error ") + to_string(r);
}
auto reply = response_to_json(resp);
if (prev_producer_topic != msg.result_topic) {
Result result = client.createProducer(msg.result_topic, producer);
if (result != ResultOk) {
cerr << "Error creating producer: " << result << endl;
return -1;
}
prev_producer_topic = msg.result_topic;
}
Message result_msg = MessageBuilder().setContent(reply).build();
Result result = producer.send(result_msg);
if (result != ResultOk) {
cerr << "Error sending reply: " << result << endl;
consumer.negativeAcknowledge(mq_msg);
failed++;
} else {
processed++;
consumer.acknowledge(mq_msg);
}
if (processed % 1000 == 0) {
cout << "processed: " << processed << ", failed: " << failed << endl;
}
}
client.close();
return 0;
}
#if 0
static
int test_pulsar_worker() {
Client client("pulsar://localhost:6650");
Producer producer;
Result result = client.createProducer("persistent://public/default/my-topic", producer);
if (result != ResultOk) {
std::cout << "Error creating producer: " << result << std::endl;
return -1;
}
// Send 100 messages synchronously
int ctr = 0;
while (ctr < 100) {
std::string content = "msg" + std::to_string(ctr);
Message msg = MessageBuilder().setContent(content).setProperty("x", "1").build();
Result result = producer.send(msg);
if (result != ResultOk) {
std::cout << "The message " << content << " could not be sent, received code: " << result << std::endl;
} else {
std::cout << "The message " << content << " sent successfully" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ctr++;
}
std::cout << "Finished producing synchronously!" << std::endl;
client.close();
return 0;
}
#endif