Integrate Paho MQTT C library into project. Updated CMakeLists.txt to find PahoMqttC package and added fallback for manual linking. Refactored mqtt.hpp and mqtt.cpp to utilize Paho MQTT C API, replacing previous async client implementation. Added Windows setup guide for MQTT library installation.

This commit is contained in:
2025-09-08 19:53:39 +08:00
parent c655cd8c91
commit 7a9a31afd4
4 changed files with 254 additions and 156 deletions

View File

@@ -1,21 +1,35 @@
#include "mqtt.hpp"
#include "logger.hpp"
#include <sstream>
#include <cstring>
namespace custom {
MqttClient::MqttClient(const std::string& broker, int port, const std::string& clientId)
: broker_(broker), port_(port), clientId_(clientId),
connected_(false), reconnecting_(false), processorRunning_(false),
shouldReconnect_(true) {
: clientId_(clientId), connected_(false), reconnecting_(false),
processorRunning_(false), shouldReconnect_(true) {
std::stringstream ss;
ss << "tcp://" << broker_ << ":" << port_;
std::string serverURI = ss.str();
ss << "tcp://" << broker << ":" << port;
serverURI_ = ss.str();
client_ = std::make_unique<mqtt::async_client>(serverURI, clientId_);
callback_ = std::make_unique<Callback>(this);
client_->set_callback(*callback_);
// 初始化连接选项
connOpts_ = MQTTClient_connectOptions_initializer;
connOpts_.keepAliveInterval = 20;
connOpts_.cleansession = 1;
connOpts_.reliable = 0; // 异步发送
// 创建 MQTT 客户端
int rc = MQTTClient_create(&client_, serverURI_.c_str(), clientId_.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, nullptr);
if (rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR("Failed to create MQTT client, return code: " + std::to_string(rc));
return;
}
// 设置回调函数
MQTTClient_setCallbacks(client_, this, connectionLostCallback,
messageArrivedCallback, deliveryCompleteCallback);
}
MqttClient::~MqttClient() {
@@ -26,120 +40,105 @@ MqttClient::~MqttClient() {
if (reconnectThread_.joinable()) {
reconnectThread_.join();
}
}
bool MqttClient::connect(const std::string& username, const std::string& password) {
try {
username_ = username;
password_ = password;
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
connOpts.set_automatic_reconnect(true);
if (!username_.empty()) {
connOpts.set_user_name(username_);
if (!password_.empty()) {
connOpts.set_password(password_);
}
}
LOG_INFO("Connecting to MQTT broker: " + broker_ + ":" + std::to_string(port_));
auto token = client_->connect(connOpts, nullptr, *callback_);
token->wait_for(std::chrono::seconds(10));
if (token->is_complete() && client_->is_connected()) {
connected_ = true;
LOG_INFO("Connected to MQTT broker successfully");
return true;
} else {
LOG_ERROR("Failed to connect to MQTT broker");
return false;
}
} catch (const mqtt::exception& e) {
LOG_ERROR("MQTT connection exception: " + std::string(e.what()));
return false;
// 销毁 MQTT 客户端
if (client_) {
MQTTClient_destroy(&client_);
}
}
void MqttClient::disconnect() {
try {
if (client_ && client_->is_connected()) {
LOG_INFO("Disconnecting from MQTT broker");
auto token = client_->disconnect();
token->wait_for(std::chrono::seconds(5));
connected_ = false;
bool MqttClient::connect(const std::string& username, const std::string& password) {
username_ = username;
password_ = password;
// 设置用户名和密码
if (!username_.empty()) {
connOpts_.username = username_.c_str();
if (!password_.empty()) {
connOpts_.password = password_.c_str();
}
} catch (const mqtt::exception& e) {
LOG_ERROR("MQTT disconnect exception: " + std::string(e.what()));
}
LOG_INFO("Connecting to MQTT broker: " + serverURI_);
int rc = MQTTClient_connect(client_, &connOpts_);
if (rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR("Failed to connect to MQTT broker, return code: " + std::to_string(rc));
return false;
}
connected_ = true;
LOG_INFO("Connected to MQTT broker successfully");
return true;
}
void MqttClient::disconnect() {
if (client_ && MQTTClient_isConnected(client_)) {
LOG_INFO("Disconnecting from MQTT broker");
int rc = MQTTClient_disconnect(client_, 5000); // 5 seconds timeout
if (rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR("Failed to disconnect from MQTT broker, return code: " + std::to_string(rc));
}
connected_ = false;
}
}
bool MqttClient::isConnected() const {
return connected_ && client_ && client_->is_connected();
return connected_ && client_ && MQTTClient_isConnected(client_);
}
bool MqttClient::subscribe(const std::string& topic, int qos) {
try {
if (!isConnected()) {
LOG_ERROR("Cannot subscribe: not connected to MQTT broker");
return false;
}
LOG_INFO("Subscribing to topic: " + topic);
auto token = client_->subscribe(topic, qos, nullptr, *callback_);
token->wait_for(std::chrono::seconds(5));
return token->is_complete();
} catch (const mqtt::exception& e) {
LOG_ERROR("MQTT subscribe exception: " + std::string(e.what()));
if (!isConnected()) {
LOG_ERROR("Cannot subscribe: not connected to MQTT broker");
return false;
}
LOG_INFO("Subscribing to topic: " + topic);
int rc = MQTTClient_subscribe(client_, topic.c_str(), qos);
if (rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR("Failed to subscribe to topic: " + topic + ", return code: " + std::to_string(rc));
return false;
}
return true;
}
bool MqttClient::unsubscribe(const std::string& topic) {
try {
if (!isConnected()) {
LOG_ERROR("Cannot unsubscribe: not connected to MQTT broker");
return false;
}
LOG_INFO("Unsubscribing from topic: " + topic);
auto token = client_->unsubscribe(topic, nullptr, *callback_);
token->wait_for(std::chrono::seconds(5));
return token->is_complete();
} catch (const mqtt::exception& e) {
LOG_ERROR("MQTT unsubscribe exception: " + std::string(e.what()));
if (!isConnected()) {
LOG_ERROR("Cannot unsubscribe: not connected to MQTT broker");
return false;
}
LOG_INFO("Unsubscribing from topic: " + topic);
int rc = MQTTClient_unsubscribe(client_, topic.c_str());
if (rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR("Failed to unsubscribe from topic: " + topic + ", return code: " + std::to_string(rc));
return false;
}
return true;
}
bool MqttClient::publish(const std::string& topic, const std::string& payload, int qos, bool retain) {
try {
if (!isConnected()) {
LOG_ERROR("Cannot publish: not connected to MQTT broker");
return false;
}
auto msg = mqtt::make_message(topic, payload);
msg->set_qos(qos);
msg->set_retained(retain);
auto token = client_->publish(msg, nullptr, *callback_);
// Don't wait for completion to avoid blocking
return true;
} catch (const mqtt::exception& e) {
LOG_ERROR("MQTT publish exception: " + std::string(e.what()));
if (!isConnected()) {
LOG_ERROR("Cannot publish: not connected to MQTT broker");
return false;
}
MQTTClient_message message = MQTTClient_message_initializer;
message.payload = const_cast<char*>(payload.c_str());
message.payloadlen = payload.length();
message.qos = qos;
message.retained = retain ? 1 : 0;
MQTTClient_deliveryToken token;
int rc = MQTTClient_publishMessage(client_, topic.c_str(), &message, &token);
if (rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR("Failed to publish message to topic: " + topic + ", return code: " + std::to_string(rc));
return false;
}
return true;
}
bool MqttClient::publishJson(const std::string& topic, const nlohmann::json& json, int qos, bool retain) {
@@ -204,9 +203,43 @@ void MqttClient::processMessageQueue() {
}
}
// C 风格回调函数实现
int MqttClient::messageArrivedCallback(void* context, char* topicName, int topicLen, MQTTClient_message* message) {
MqttClient* client = static_cast<MqttClient*>(context);
std::string topic(topicName);
std::string payload(static_cast<char*>(message->payload), message->payloadlen);
QueuedMessage queuedMsg;
queuedMsg.topic = topic;
queuedMsg.payload = payload;
{
std::lock_guard<std::mutex> lock(client->queueMutex_);
client->messageQueue_.push(queuedMsg);
}
client->queueCondition_.notify_one();
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1; // 成功处理消息
}
void MqttClient::connectionLostCallback(void* context, char* cause) {
MqttClient* client = static_cast<MqttClient*>(context);
client->handleConnectionLost();
std::string causeStr = cause ? std::string(cause) : "Unknown reason";
LOG_WARN("MQTT connection lost: " + causeStr);
}
void MqttClient::deliveryCompleteCallback(void* context, MQTTClient_deliveryToken dt) {
// 消息发送完成回调,目前不需要特殊处理
}
void MqttClient::handleConnectionLost() {
connected_ = false;
LOG_WARN("MQTT connection lost");
if (connectionCallback_) {
connectionCallback_(false);
@@ -214,7 +247,7 @@ void MqttClient::handleConnectionLost() {
if (shouldReconnect_ && !reconnecting_) {
reconnecting_ = true;
reconnectThread_ = std::thread(&MqttClient::handleConnectionLost, this);
reconnectThread_ = std::thread(&MqttClient::reconnectWorker, this);
}
}
@@ -228,38 +261,18 @@ void MqttClient::handleConnectionEstablished() {
}
}
// Callback implementation
void MqttClient::Callback::connected(const std::string& cause) {
client_->handleConnectionEstablished();
}
void MqttClient::Callback::connection_lost(const std::string& cause) {
LOG_WARN("MQTT connection lost: " + cause);
client_->handleConnectionLost();
}
void MqttClient::Callback::message_arrived(mqtt::const_message_ptr msg) {
QueuedMessage queuedMsg;
queuedMsg.topic = msg->get_topic();
queuedMsg.payload = msg->to_string();
{
std::lock_guard<std::mutex> lock(client_->queueMutex_);
client_->messageQueue_.push(queuedMsg);
void MqttClient::reconnectWorker() {
while (shouldReconnect_ && !connected_) {
LOG_INFO("Attempting to reconnect to MQTT broker...");
if (connect(username_, password_)) {
LOG_INFO("Successfully reconnected to MQTT broker");
break;
}
std::this_thread::sleep_for(std::chrono::seconds(reconnectDelay_));
}
client_->queueCondition_.notify_one();
}
void MqttClient::Callback::delivery_complete(mqtt::delivery_token_ptr token) {
// Message delivered successfully
}
void MqttClient::Callback::on_failure(const mqtt::token& tok) {
LOG_ERROR("MQTT operation failed");
}
void MqttClient::Callback::on_success(const mqtt::token& tok) {
// Operation completed successfully
reconnecting_ = false;
}
} // namespace custom