#include #include #include #include "base64.h" #include "mq_worker.h" #include using namespace pulsar; using namespace std; // Define a simple struct with string fields struct MqMessage { string space; string path; string result_topic; vector bytes; }; struct Response { string path; bool succeeded; string result_path; vector output; size_t size; string error; }; static MqMessage parse_message(const std::string& str) { Json::CharReaderBuilder builder; Json::Value root; std::istringstream jsonStream(str); Json::parseFromStream(builder, jsonStream, &root, nullptr); MqMessage msg; msg.space = root["space"].asString(); msg.path = root["path"].asString(); msg.result_topic = root["result_topic"].asString(); msg.bytes = base64_decode(root["data_b64"].asString()); return msg; } static std::string response_to_json(const Response& msg) { Json::Value root; root["path"] = msg.path; root["succeeded"] = msg.succeeded; root["size"] = msg.size; if (msg.error.size()) { root["error"] = msg.error; } Json::Value ofs; Json::Value of; of["path"] = msg.result_path; of["data_b64"] = base64_encode(msg.output.data(), msg.output.size()); ofs.append(of); root["output_files"] = ofs; Json::StreamWriterBuilder builder; std::string str = Json::writeString(builder, root); return str; } int mq_worker(const char *topic, const char *worker_name, handler_fn handle_image) { Client client("pulsar://localhost:6650"); Producer producer; string prev_producer_topic; Consumer consumer; ConsumerConfiguration config; config.setConsumerType(ConsumerShared); config.setSubscriptionInitialPosition(InitialPositionEarliest); Result result = client.subscribe(topic, worker_name, config, consumer); if (result != ResultOk) { cout << "Failed to subscribe: " << result << endl; return -1; } Message mq_msg; int processed = 0; int failed = 0; while (1) { consumer.receive(mq_msg); auto payload = mq_msg.getDataAsString(); auto msg = parse_message(payload); if (processed % 1000 == 0) { cout << processed << ": " << msg.path << " " << msg.bytes.size() << endl; } Response resp; resp.path = msg.path; resp.size = msg.bytes.size(); resp.succeeded = true; int r = handle_image(msg.path, msg.bytes, resp.result_path, resp.output); if (r) { resp.succeeded = false; resp.error = string("error ") + to_string(r); } auto reply = response_to_json(resp); if (prev_producer_topic != msg.result_topic) { Result result = client.createProducer(msg.result_topic, producer); if (result != ResultOk) { cerr << "Error creating producer: " << result << endl; return -1; } prev_producer_topic = msg.result_topic; } Message result_msg = MessageBuilder().setContent(reply).build(); Result result = producer.send(result_msg); if (result != ResultOk) { cerr << "Error sending reply: " << result << endl; consumer.negativeAcknowledge(mq_msg); failed++; } else { processed++; consumer.acknowledge(mq_msg); } if (processed % 1000 == 0) { cout << "processed: " << processed << ", failed: " << failed << endl; } } client.close(); return 0; } #if 0 static int test_pulsar_worker() { Client client("pulsar://localhost:6650"); Producer producer; Result result = client.createProducer("persistent://public/default/my-topic", producer); if (result != ResultOk) { std::cout << "Error creating producer: " << result << std::endl; return -1; } // Send 100 messages synchronously int ctr = 0; while (ctr < 100) { std::string content = "msg" + std::to_string(ctr); Message msg = MessageBuilder().setContent(content).setProperty("x", "1").build(); Result result = producer.send(msg); if (result != ResultOk) { std::cout << "The message " << content << " could not be sent, received code: " << result << std::endl; } else { std::cout << "The message " << content << " sent successfully" << std::endl; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); ctr++; } std::cout << "Finished producing synchronously!" << std::endl; client.close(); return 0; } #endif