From 7a9a31afd42b4540aa672e0373eea6dd2dcf93e6 Mon Sep 17 00:00:00 2001 From: Sucan126 <632190820@qq.com> Date: Mon, 8 Sep 2025 19:53:39 +0800 Subject: [PATCH] Integrate Paho MQTT C library into project. Updated CMakeLists.txt to find PahoMqttC package and added fallback for manual linking. Refactored mqtt.hpp and mqtt.cpp to utilize Paho MQTT C API, replacing previous async client implementation. Added Windows setup guide for MQTT library installation. --- CMakeLists.txt | 14 ++- MQTT_SETUP_WINDOWS.md | 85 +++++++++++++ include/mqtt.hpp | 34 ++---- src/mqtt.cpp | 277 ++++++++++++++++++++++-------------------- 4 files changed, 254 insertions(+), 156 deletions(-) create mode 100644 MQTT_SETUP_WINDOWS.md diff --git a/CMakeLists.txt b/CMakeLists.txt index 198cafb..d69e634 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,13 +9,21 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) find_package(PkgConfig REQUIRED) find_package(Threads REQUIRED) -# Find installed unitree_sdk2 +# Try to find PahoMqttC, fallback to manual linking if not found +find_package(PahoMqttC QUIET) +if(NOT PahoMqttC_FOUND) + message(STATUS "PahoMqttC package not found, using manual library linking") + find_library(PAHO_MQTT_C_LIBRARY NAMES paho-mqtt3c REQUIRED) + find_path(PAHO_MQTT_C_INCLUDE_DIR NAMES MQTTClient.h REQUIRED) +endif() + find_package(unitree_sdk2 REQUIRED) # Include directories include_directories( ${CMAKE_CURRENT_SOURCE_DIR}/include ${CMAKE_CURRENT_SOURCE_DIR}/include/nlohmann + $<$>:${PAHO_MQTT_C_INCLUDE_DIR}> ) # Source files @@ -25,7 +33,7 @@ set(SOURCES src/custom_robot.cpp src/config.cpp src/logger.cpp - # src/mqtt.cpp # Disabled MQTT for now + src/mqtt.cpp ) # Create executable with simple name @@ -35,6 +43,8 @@ add_executable(main ${SOURCES}) target_link_libraries(main unitree_sdk2 Threads::Threads + $<$:PahoMqttC::PahoMqttC> + $<$>:${PAHO_MQTT_C_LIBRARY}> ) # Set output directory diff --git a/MQTT_SETUP_WINDOWS.md b/MQTT_SETUP_WINDOWS.md new file mode 100644 index 0000000..9e88693 --- /dev/null +++ b/MQTT_SETUP_WINDOWS.md @@ -0,0 +1,85 @@ +# Windows MQTT库安装指南 + +## 方法1: 使用vcpkg (推荐) + +### 1. 安装vcpkg +```cmd +git clone https://github.com/Microsoft/vcpkg.git +cd vcpkg +.\bootstrap-vcpkg.bat +``` + +### 2. 安装MQTT库 +```cmd +.\vcpkg install paho-mqtt:x64-windows +.\vcpkg install paho-mqttpp3:x64-windows +.\vcpkg install nlohmann-json:x64-windows +``` + +### 3. 集成到CMake +```cmd +.\vcpkg integrate install +``` + +### 4. 修改CMake命令 +在构建时添加vcpkg工具链: +```cmd +mkdir build +cd build +cmake .. -DCMAKE_TOOLCHAIN_FILE=[vcpkg根目录]/scripts/buildsystems/vcpkg.cmake +cmake --build . +``` + +## 方法2: 手动编译 + +### 1. 编译Paho MQTT C库 +```cmd +git clone https://github.com/eclipse/paho.mqtt.c.git +cd paho.mqtt.c +mkdir build +cd build +cmake .. -DPAHO_BUILD_STATIC=TRUE -DPAHO_BUILD_SHARED=FALSE +cmake --build . --config Release +cmake --install . --prefix C:/mqtt/c +``` + +### 2. 编译Paho MQTT C++库 +```cmd +git clone https://github.com/eclipse/paho.mqtt.cpp.git +cd paho.mqtt.cpp +mkdir build +cd build +cmake .. -DPAHO_MQTT_C_PATH=C:/mqtt/c -DPAHO_BUILD_STATIC=TRUE +cmake --build . --config Release +cmake --install . --prefix C:/mqtt/cpp +``` + +### 3. 修改CMakeLists.txt +添加库路径: +```cmake +set(CMAKE_PREFIX_PATH "C:/mqtt/cpp;C:/mqtt/c") +find_package(PahoMqttCpp REQUIRED) +``` + +## 验证安装 + +运行以下命令验证: +```cmd +cd build +cmake .. +``` + +如果没有错误,说明MQTT库已正确配置。 + +## 使用示例 + +在您的代码中导入MQTT: +```cpp +#include +#include "mqtt.hpp" + +// 创建MQTT客户端 +auto mqttClient = std::make_unique("localhost", 1883, "test_client"); +mqttClient->connect(); +mqttClient->subscribe("test/topic"); +``` diff --git a/include/mqtt.hpp b/include/mqtt.hpp index ffc4e4d..ea52477 100644 --- a/include/mqtt.hpp +++ b/include/mqtt.hpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include // Paho MQTT C library namespace custom { @@ -39,34 +39,22 @@ public: void stopMessageProcessor(); private: - class Callback : public virtual mqtt::callback, - public virtual mqtt::iaction_listener { - public: - Callback(MqttClient* client) : client_(client) {} - - void connected(const std::string& cause) override; - void connection_lost(const std::string& cause) override; - void message_arrived(mqtt::const_message_ptr msg) override; - void delivery_complete(mqtt::delivery_token_ptr token) override; - - void on_failure(const mqtt::token& tok) override; - void on_success(const mqtt::token& tok) override; - - private: - MqttClient* client_; - }; - struct QueuedMessage { std::string topic; std::string payload; }; + // C callback functions (static) + static int messageArrivedCallback(void* context, char* topicName, int topicLen, MQTTClient_message* message); + static void connectionLostCallback(void* context, char* cause); + static void deliveryCompleteCallback(void* context, MQTTClient_deliveryToken dt); + void processMessageQueue(); void handleConnectionLost(); void handleConnectionEstablished(); + void reconnectWorker(); - std::unique_ptr client_; - std::unique_ptr callback_; + MQTTClient client_; MessageCallback messageCallback_; ConnectionCallback connectionCallback_; @@ -82,8 +70,7 @@ private: std::atomic processorRunning_; // Connection parameters - std::string broker_; - int port_; + std::string serverURI_; std::string clientId_; std::string username_; std::string password_; @@ -92,6 +79,9 @@ private: std::thread reconnectThread_; std::atomic shouldReconnect_; int reconnectDelay_ = 5; // seconds + + // Connection options + MQTTClient_connectOptions connOpts_; }; } // namespace custom diff --git a/src/mqtt.cpp b/src/mqtt.cpp index bdf4c55..2f6fd38 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -1,21 +1,35 @@ #include "mqtt.hpp" #include "logger.hpp" #include +#include namespace custom { MqttClient::MqttClient(const std::string& broker, int port, const std::string& clientId) - : broker_(broker), port_(port), clientId_(clientId), - connected_(false), reconnecting_(false), processorRunning_(false), - shouldReconnect_(true) { + : clientId_(clientId), connected_(false), reconnecting_(false), + processorRunning_(false), shouldReconnect_(true) { std::stringstream ss; - ss << "tcp://" << broker_ << ":" << port_; - std::string serverURI = ss.str(); + ss << "tcp://" << broker << ":" << port; + serverURI_ = ss.str(); - client_ = std::make_unique(serverURI, clientId_); - callback_ = std::make_unique(this); - client_->set_callback(*callback_); + // 初始化连接选项 + connOpts_ = MQTTClient_connectOptions_initializer; + connOpts_.keepAliveInterval = 20; + connOpts_.cleansession = 1; + connOpts_.reliable = 0; // 异步发送 + + // 创建 MQTT 客户端 + int rc = MQTTClient_create(&client_, serverURI_.c_str(), clientId_.c_str(), + MQTTCLIENT_PERSISTENCE_NONE, nullptr); + if (rc != MQTTCLIENT_SUCCESS) { + LOG_ERROR("Failed to create MQTT client, return code: " + std::to_string(rc)); + return; + } + + // 设置回调函数 + MQTTClient_setCallbacks(client_, this, connectionLostCallback, + messageArrivedCallback, deliveryCompleteCallback); } MqttClient::~MqttClient() { @@ -26,120 +40,105 @@ MqttClient::~MqttClient() { if (reconnectThread_.joinable()) { reconnectThread_.join(); } -} - -bool MqttClient::connect(const std::string& username, const std::string& password) { - try { - username_ = username; - password_ = password; - - mqtt::connect_options connOpts; - connOpts.set_keep_alive_interval(20); - connOpts.set_clean_session(true); - connOpts.set_automatic_reconnect(true); - - if (!username_.empty()) { - connOpts.set_user_name(username_); - if (!password_.empty()) { - connOpts.set_password(password_); - } - } - - LOG_INFO("Connecting to MQTT broker: " + broker_ + ":" + std::to_string(port_)); - - auto token = client_->connect(connOpts, nullptr, *callback_); - token->wait_for(std::chrono::seconds(10)); - - if (token->is_complete() && client_->is_connected()) { - connected_ = true; - LOG_INFO("Connected to MQTT broker successfully"); - return true; - } else { - LOG_ERROR("Failed to connect to MQTT broker"); - return false; - } - - } catch (const mqtt::exception& e) { - LOG_ERROR("MQTT connection exception: " + std::string(e.what())); - return false; + + // 销毁 MQTT 客户端 + if (client_) { + MQTTClient_destroy(&client_); } } -void MqttClient::disconnect() { - try { - if (client_ && client_->is_connected()) { - LOG_INFO("Disconnecting from MQTT broker"); - auto token = client_->disconnect(); - token->wait_for(std::chrono::seconds(5)); - connected_ = false; +bool MqttClient::connect(const std::string& username, const std::string& password) { + username_ = username; + password_ = password; + + // 设置用户名和密码 + if (!username_.empty()) { + connOpts_.username = username_.c_str(); + if (!password_.empty()) { + connOpts_.password = password_.c_str(); } - } catch (const mqtt::exception& e) { - LOG_ERROR("MQTT disconnect exception: " + std::string(e.what())); + } + + LOG_INFO("Connecting to MQTT broker: " + serverURI_); + + int rc = MQTTClient_connect(client_, &connOpts_); + if (rc != MQTTCLIENT_SUCCESS) { + LOG_ERROR("Failed to connect to MQTT broker, return code: " + std::to_string(rc)); + return false; + } + + connected_ = true; + LOG_INFO("Connected to MQTT broker successfully"); + return true; +} + +void MqttClient::disconnect() { + if (client_ && MQTTClient_isConnected(client_)) { + LOG_INFO("Disconnecting from MQTT broker"); + int rc = MQTTClient_disconnect(client_, 5000); // 5 seconds timeout + if (rc != MQTTCLIENT_SUCCESS) { + LOG_ERROR("Failed to disconnect from MQTT broker, return code: " + std::to_string(rc)); + } + connected_ = false; } } bool MqttClient::isConnected() const { - return connected_ && client_ && client_->is_connected(); + return connected_ && client_ && MQTTClient_isConnected(client_); } bool MqttClient::subscribe(const std::string& topic, int qos) { - try { - if (!isConnected()) { - LOG_ERROR("Cannot subscribe: not connected to MQTT broker"); - return false; - } - - LOG_INFO("Subscribing to topic: " + topic); - auto token = client_->subscribe(topic, qos, nullptr, *callback_); - token->wait_for(std::chrono::seconds(5)); - - return token->is_complete(); - - } catch (const mqtt::exception& e) { - LOG_ERROR("MQTT subscribe exception: " + std::string(e.what())); + if (!isConnected()) { + LOG_ERROR("Cannot subscribe: not connected to MQTT broker"); return false; } + + LOG_INFO("Subscribing to topic: " + topic); + int rc = MQTTClient_subscribe(client_, topic.c_str(), qos); + if (rc != MQTTCLIENT_SUCCESS) { + LOG_ERROR("Failed to subscribe to topic: " + topic + ", return code: " + std::to_string(rc)); + return false; + } + + return true; } bool MqttClient::unsubscribe(const std::string& topic) { - try { - if (!isConnected()) { - LOG_ERROR("Cannot unsubscribe: not connected to MQTT broker"); - return false; - } - - LOG_INFO("Unsubscribing from topic: " + topic); - auto token = client_->unsubscribe(topic, nullptr, *callback_); - token->wait_for(std::chrono::seconds(5)); - - return token->is_complete(); - - } catch (const mqtt::exception& e) { - LOG_ERROR("MQTT unsubscribe exception: " + std::string(e.what())); + if (!isConnected()) { + LOG_ERROR("Cannot unsubscribe: not connected to MQTT broker"); return false; } + + LOG_INFO("Unsubscribing from topic: " + topic); + int rc = MQTTClient_unsubscribe(client_, topic.c_str()); + if (rc != MQTTCLIENT_SUCCESS) { + LOG_ERROR("Failed to unsubscribe from topic: " + topic + ", return code: " + std::to_string(rc)); + return false; + } + + return true; } bool MqttClient::publish(const std::string& topic, const std::string& payload, int qos, bool retain) { - try { - if (!isConnected()) { - LOG_ERROR("Cannot publish: not connected to MQTT broker"); - return false; - } - - auto msg = mqtt::make_message(topic, payload); - msg->set_qos(qos); - msg->set_retained(retain); - - auto token = client_->publish(msg, nullptr, *callback_); - // Don't wait for completion to avoid blocking - - return true; - - } catch (const mqtt::exception& e) { - LOG_ERROR("MQTT publish exception: " + std::string(e.what())); + if (!isConnected()) { + LOG_ERROR("Cannot publish: not connected to MQTT broker"); return false; } + + MQTTClient_message message = MQTTClient_message_initializer; + message.payload = const_cast(payload.c_str()); + message.payloadlen = payload.length(); + message.qos = qos; + message.retained = retain ? 1 : 0; + + MQTTClient_deliveryToken token; + int rc = MQTTClient_publishMessage(client_, topic.c_str(), &message, &token); + if (rc != MQTTCLIENT_SUCCESS) { + LOG_ERROR("Failed to publish message to topic: " + topic + ", return code: " + std::to_string(rc)); + return false; + } + + return true; } bool MqttClient::publishJson(const std::string& topic, const nlohmann::json& json, int qos, bool retain) { @@ -204,9 +203,43 @@ void MqttClient::processMessageQueue() { } } +// C 风格回调函数实现 +int MqttClient::messageArrivedCallback(void* context, char* topicName, int topicLen, MQTTClient_message* message) { + MqttClient* client = static_cast(context); + + std::string topic(topicName); + std::string payload(static_cast(message->payload), message->payloadlen); + + QueuedMessage queuedMsg; + queuedMsg.topic = topic; + queuedMsg.payload = payload; + + { + std::lock_guard lock(client->queueMutex_); + client->messageQueue_.push(queuedMsg); + } + client->queueCondition_.notify_one(); + + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + + return 1; // 成功处理消息 +} + +void MqttClient::connectionLostCallback(void* context, char* cause) { + MqttClient* client = static_cast(context); + client->handleConnectionLost(); + + std::string causeStr = cause ? std::string(cause) : "Unknown reason"; + LOG_WARN("MQTT connection lost: " + causeStr); +} + +void MqttClient::deliveryCompleteCallback(void* context, MQTTClient_deliveryToken dt) { + // 消息发送完成回调,目前不需要特殊处理 +} + void MqttClient::handleConnectionLost() { connected_ = false; - LOG_WARN("MQTT connection lost"); if (connectionCallback_) { connectionCallback_(false); @@ -214,7 +247,7 @@ void MqttClient::handleConnectionLost() { if (shouldReconnect_ && !reconnecting_) { reconnecting_ = true; - reconnectThread_ = std::thread(&MqttClient::handleConnectionLost, this); + reconnectThread_ = std::thread(&MqttClient::reconnectWorker, this); } } @@ -228,38 +261,18 @@ void MqttClient::handleConnectionEstablished() { } } -// Callback implementation -void MqttClient::Callback::connected(const std::string& cause) { - client_->handleConnectionEstablished(); -} - -void MqttClient::Callback::connection_lost(const std::string& cause) { - LOG_WARN("MQTT connection lost: " + cause); - client_->handleConnectionLost(); -} - -void MqttClient::Callback::message_arrived(mqtt::const_message_ptr msg) { - QueuedMessage queuedMsg; - queuedMsg.topic = msg->get_topic(); - queuedMsg.payload = msg->to_string(); - - { - std::lock_guard lock(client_->queueMutex_); - client_->messageQueue_.push(queuedMsg); +void MqttClient::reconnectWorker() { + while (shouldReconnect_ && !connected_) { + LOG_INFO("Attempting to reconnect to MQTT broker..."); + + if (connect(username_, password_)) { + LOG_INFO("Successfully reconnected to MQTT broker"); + break; + } + + std::this_thread::sleep_for(std::chrono::seconds(reconnectDelay_)); } - client_->queueCondition_.notify_one(); -} - -void MqttClient::Callback::delivery_complete(mqtt::delivery_token_ptr token) { - // Message delivered successfully -} - -void MqttClient::Callback::on_failure(const mqtt::token& tok) { - LOG_ERROR("MQTT operation failed"); -} - -void MqttClient::Callback::on_success(const mqtt::token& tok) { - // Operation completed successfully + reconnecting_ = false; } } // namespace custom