Files
lzwc-terminal-unitreeGo2/src/mqtt.cpp

320 lines
9.4 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "mqtt.hpp"
#include "logger.hpp"
#include <sstream>
#include <cstring>
namespace custom {
MqttClient::MqttClient(const std::string& broker, int port, const std::string& clientId)
: clientId_(clientId), connected_(false), reconnecting_(false),
processorRunning_(false), shouldReconnect_(true) {
std::stringstream ss;
ss << "tcp://" << broker << ":" << port;
serverURI_ = ss.str();
connOpts_ = MQTTClient_connectOptions_initializer;
connOpts_.keepAliveInterval = 20;
connOpts_.cleansession = 1;
connOpts_.reliable = 0; // 异步发送
int rc = MQTTClient_create(&client_, serverURI_.c_str(), clientId_.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, nullptr);
if (rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR("Failed to create MQTT client, return code: " + std::to_string(rc));
return;
}
MQTTClient_setCallbacks(client_, this, connectionLostCallback,
messageArrivedCallback, deliveryCompleteCallback);
}
MqttClient::~MqttClient() {
shouldReconnect_ = false;
stopMessageProcessor();
disconnect();
if (reconnectThread_.joinable()) {
reconnectThread_.join();
}
// 销毁 MQTT 客户端
if (client_) {
MQTTClient_destroy(&client_);
}
}
bool MqttClient::connect(const std::string& username, const std::string& password) {
username_ = username;
password_ = password;
// 验证字符串是否为有效UTF-8实际上确保为ASCII以避免问题
auto isValidMqttString = [](const std::string& str) -> bool {
for (unsigned char c : str) {
if (c > 127) { // 非ASCII字符
return false;
}
if (c < 32 && c != '\t' && c != '\n' && c != '\r') { // 控制字符(除了常见的空白字符)
return false;
}
}
return true;
};
if (!isValidMqttString(username_)) {
LOG_ERROR("Username contains invalid characters");
return false;
}
if (!isValidMqttString(password_)) {
LOG_ERROR("Password contains invalid characters");
return false;
}
connOpts_ = MQTTClient_connectOptions_initializer;
connOpts_.keepAliveInterval = 20;
connOpts_.cleansession = 1;
connOpts_.reliable = 0;
if (!username_.empty()) {
connOpts_.username = username_.c_str();
if (!password_.empty()) {
connOpts_.password = password_.c_str();
}
}
int rc = MQTTClient_connect(client_, &connOpts_);
if (rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR("Failed to connect to MQTT broker, return code: " + std::to_string(rc));
switch (rc) {
case 1:
LOG_ERROR("Connection refused: unacceptable protocol version");
break;
case 2:
LOG_ERROR("Connection refused: identifier rejected");
break;
case 3:
LOG_ERROR("Connection refused: server unavailable");
break;
case 4:
LOG_ERROR("Connection refused: bad user name or password (UTF-8 encoding issue)");
break;
case 5:
LOG_ERROR("Connection refused: not authorized");
break;
default:
LOG_ERROR("Connection error: code " + std::to_string(rc));
break;
}
return false;
}
connected_ = true;
LOG_INFO("Connected to MQTT broker successfully");
return true;
}
void MqttClient::disconnect() {
if (client_ && MQTTClient_isConnected(client_)) {
LOG_INFO("Disconnecting from MQTT broker");
int rc = MQTTClient_disconnect(client_, 5000); // 5 seconds timeout
if (rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR("Failed to disconnect from MQTT broker, return code: " + std::to_string(rc));
}
connected_ = false;
}
}
bool MqttClient::isConnected() const {
return connected_ && client_ && MQTTClient_isConnected(client_);
}
bool MqttClient::subscribe(const std::string& topic, int qos) {
if (!isConnected()) {
LOG_ERROR("Cannot subscribe: not connected to MQTT broker");
return false;
}
LOG_INFO("Subscribing to topic: " + topic);
int rc = MQTTClient_subscribe(client_, topic.c_str(), qos);
if (rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR("Failed to subscribe to topic: " + topic + ", return code: " + std::to_string(rc));
return false;
}
return true;
}
bool MqttClient::unsubscribe(const std::string& topic) {
if (!isConnected()) {
LOG_ERROR("Cannot unsubscribe: not connected to MQTT broker");
return false;
}
LOG_INFO("Unsubscribing from topic: " + topic);
int rc = MQTTClient_unsubscribe(client_, topic.c_str());
if (rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR("Failed to unsubscribe from topic: " + topic + ", return code: " + std::to_string(rc));
return false;
}
return true;
}
bool MqttClient::publish(const std::string& topic, const std::string& payload, int qos, bool retain) {
if (!isConnected()) {
LOG_ERROR("Cannot publish: not connected to MQTT broker");
return false;
}
MQTTClient_message message = MQTTClient_message_initializer;
message.payload = const_cast<char*>(payload.c_str());
message.payloadlen = payload.length();
message.qos = qos;
message.retained = retain ? 1 : 0;
MQTTClient_deliveryToken token;
int rc = MQTTClient_publishMessage(client_, topic.c_str(), &message, &token);
if (rc != MQTTCLIENT_SUCCESS) {
LOG_ERROR("Failed to publish message to topic: " + topic + ", return code: " + std::to_string(rc));
return false;
}
return true;
}
bool MqttClient::publishJson(const std::string& topic, const nlohmann::json& json, int qos, bool retain) {
return publish(topic, json.dump(), qos, retain);
}
void MqttClient::setMessageCb(MessageCb cb) {
messageCb_ = cb;
}
void MqttClient::setConnectionCb(ConnectionCb cb) {
connectionCb_ = cb;
}
void MqttClient::startMessageProcessor() {
if (processorRunning_) {
return;
}
processorRunning_ = true;
messageProcessor_ = std::thread(&MqttClient::processMessageQueue, this);
LOG_INFO("MQTT message processor started");
}
void MqttClient::stopMessageProcessor() {
if (!processorRunning_) {
return;
}
processorRunning_ = false;
queueCondition_.notify_all();
if (messageProcessor_.joinable()) {
messageProcessor_.join();
}
LOG_INFO("MQTT message processor stopped");
}
void MqttClient::processMessageQueue() {
while (processorRunning_) {
std::unique_lock<std::mutex> lock(queueMutex_);
queueCondition_.wait(lock, [this] {
return !messageQueue_.empty() || !processorRunning_;
});
while (!messageQueue_.empty() && processorRunning_) {
auto message = messageQueue_.front();
messageQueue_.pop();
lock.unlock();
if (messageCb_) {
try {
messageCb_(message.topic, message.payload);
} catch (const std::exception& e) {
LOG_ERROR("Exception in message callback: " + std::string(e.what()));
}
}
lock.lock();
}
}
}
int MqttClient::messageArrivedCallback(void* context, char* topicName, int topicLen, MQTTClient_message* message) {
MqttClient* client = static_cast<MqttClient*>(context);
std::string topic(topicName);
std::string payload(static_cast<char*>(message->payload), message->payloadlen);
QueuedMessage queuedMsg;
queuedMsg.topic = topic;
queuedMsg.payload = payload;
{
std::lock_guard<std::mutex> lock(client->queueMutex_);
client->messageQueue_.push(queuedMsg);
}
client->queueCondition_.notify_one();
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void MqttClient::connectionLostCallback(void* context, char* cause) {
MqttClient* client = static_cast<MqttClient*>(context);
client->handleConnectionLost();
std::string causeStr = cause ? std::string(cause) : "Unknown reason";
LOG_WARN("MQTT connection lost: " + causeStr);
}
void MqttClient::deliveryCompleteCallback(void* context, MQTTClient_deliveryToken dt) {
}
void MqttClient::handleConnectionLost() {
connected_ = false;
if (connectionCb_) {
connectionCb_(false);
}
if (shouldReconnect_ && !reconnecting_) {
reconnecting_ = true;
reconnectThread_ = std::thread(&MqttClient::reconnectWorker, this);
}
}
void MqttClient::handleConnectionEstablished() {
connected_ = true;
reconnecting_ = false;
LOG_INFO("MQTT connection established");
if (connectionCb_) {
connectionCb_(true);
}
}
void MqttClient::reconnectWorker() {
while (shouldReconnect_ && !connected_) {
LOG_INFO("Attempting to reconnect to MQTT broker...");
if (connect(username_, password_)) {
LOG_INFO("Successfully reconnected to MQTT broker");
break;
}
std::this_thread::sleep_for(std::chrono::seconds(reconnectDelay_));
}
reconnecting_ = false;
}
} // namespace custom