170 lines
4.7 KiB
C++
170 lines
4.7 KiB
C++
#include <thread>
|
|
#include <iostream>
|
|
#include <json/json.h>
|
|
#include "base64.h"
|
|
#include "mq_worker.h"
|
|
|
|
#include <pulsar/Client.h>
|
|
using namespace pulsar;
|
|
using namespace std;
|
|
|
|
// Define a simple struct with string fields
|
|
struct MqMessage {
|
|
string space;
|
|
string path;
|
|
string result_topic;
|
|
vector<uint8_t> bytes;
|
|
};
|
|
|
|
struct Response {
|
|
string path;
|
|
bool succeeded;
|
|
string result_path;
|
|
vector<uint8_t> output;
|
|
size_t size;
|
|
string error;
|
|
};
|
|
|
|
static
|
|
MqMessage parse_message(const std::string& str) {
|
|
Json::CharReaderBuilder builder;
|
|
|
|
Json::Value root;
|
|
std::istringstream jsonStream(str);
|
|
Json::parseFromStream(builder, jsonStream, &root, nullptr);
|
|
|
|
MqMessage msg;
|
|
msg.space = root["space"].asString();
|
|
msg.path = root["path"].asString();
|
|
msg.result_topic = root["result_topic"].asString();
|
|
msg.bytes = base64_decode(root["data_b64"].asString());
|
|
|
|
return msg;
|
|
}
|
|
|
|
static
|
|
std::string response_to_json(const Response& msg) {
|
|
Json::Value root;
|
|
root["path"] = msg.path;
|
|
root["succeeded"] = msg.succeeded;
|
|
root["size"] = msg.size;
|
|
if (msg.error.size()) {
|
|
root["error"] = msg.error;
|
|
}
|
|
Json::Value ofs;
|
|
Json::Value of;
|
|
of["path"] = msg.result_path;
|
|
of["data_b64"] = base64_encode(msg.output.data(), msg.output.size());
|
|
ofs.append(of);
|
|
root["output_files"] = ofs;
|
|
|
|
Json::StreamWriterBuilder builder;
|
|
std::string str = Json::writeString(builder, root);
|
|
|
|
return str;
|
|
}
|
|
|
|
int mq_worker(const char *topic, const char *worker_name, handler_fn handle_image) {
|
|
Client client("pulsar://localhost:6650");
|
|
|
|
Producer producer;
|
|
string prev_producer_topic;
|
|
|
|
Consumer consumer;
|
|
ConsumerConfiguration config;
|
|
config.setConsumerType(ConsumerShared);
|
|
config.setSubscriptionInitialPosition(InitialPositionEarliest);
|
|
Result result = client.subscribe(topic, worker_name, config, consumer);
|
|
if (result != ResultOk) {
|
|
cout << "Failed to subscribe: " << result << endl;
|
|
return -1;
|
|
}
|
|
|
|
Message mq_msg;
|
|
int processed = 0;
|
|
int failed = 0;
|
|
while (1) {
|
|
consumer.receive(mq_msg);
|
|
auto payload = mq_msg.getDataAsString();
|
|
auto msg = parse_message(payload);
|
|
if (processed % 1000 == 0) {
|
|
cout << processed << ": " << msg.path << " " << msg.bytes.size() << endl;
|
|
}
|
|
Response resp;
|
|
resp.path = msg.path;
|
|
resp.size = msg.bytes.size();
|
|
resp.succeeded = true;
|
|
int r = handle_image(msg.path,
|
|
msg.bytes,
|
|
resp.result_path,
|
|
resp.output);
|
|
if (r) {
|
|
resp.succeeded = false;
|
|
resp.error = string("error ") + to_string(r);
|
|
}
|
|
auto reply = response_to_json(resp);
|
|
|
|
if (prev_producer_topic != msg.result_topic) {
|
|
Result result = client.createProducer(msg.result_topic, producer);
|
|
if (result != ResultOk) {
|
|
cerr << "Error creating producer: " << result << endl;
|
|
return -1;
|
|
}
|
|
prev_producer_topic = msg.result_topic;
|
|
}
|
|
Message result_msg = MessageBuilder().setContent(reply).build();
|
|
Result result = producer.send(result_msg);
|
|
if (result != ResultOk) {
|
|
cerr << "Error sending reply: " << result << endl;
|
|
consumer.negativeAcknowledge(mq_msg);
|
|
failed++;
|
|
} else {
|
|
processed++;
|
|
consumer.acknowledge(mq_msg);
|
|
}
|
|
if (processed % 1000 == 0) {
|
|
cout << "processed: " << processed << ", failed: " << failed << endl;
|
|
}
|
|
}
|
|
|
|
client.close();
|
|
return 0;
|
|
|
|
}
|
|
|
|
#if 0
|
|
static
|
|
int test_pulsar_worker() {
|
|
Client client("pulsar://localhost:6650");
|
|
|
|
Producer producer;
|
|
|
|
Result result = client.createProducer("persistent://public/default/my-topic", producer);
|
|
if (result != ResultOk) {
|
|
std::cout << "Error creating producer: " << result << std::endl;
|
|
return -1;
|
|
}
|
|
|
|
// Send 100 messages synchronously
|
|
int ctr = 0;
|
|
while (ctr < 100) {
|
|
std::string content = "msg" + std::to_string(ctr);
|
|
Message msg = MessageBuilder().setContent(content).setProperty("x", "1").build();
|
|
Result result = producer.send(msg);
|
|
if (result != ResultOk) {
|
|
std::cout << "The message " << content << " could not be sent, received code: " << result << std::endl;
|
|
} else {
|
|
std::cout << "The message " << content << " sent successfully" << std::endl;
|
|
}
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
ctr++;
|
|
}
|
|
|
|
std::cout << "Finished producing synchronously!" << std::endl;
|
|
|
|
client.close();
|
|
return 0;
|
|
}
|
|
#endif
|