#include "mqtt.hpp" #include "logger.hpp" #include #include namespace custom { MqttClient::MqttClient(const std::string& broker, int port, const std::string& clientId) : clientId_(clientId), connected_(false), reconnecting_(false), processorRunning_(false), shouldReconnect_(true) { std::stringstream ss; ss << "tcp://" << broker << ":" << port; serverURI_ = ss.str(); connOpts_ = MQTTClient_connectOptions_initializer; connOpts_.keepAliveInterval = 20; connOpts_.cleansession = 1; connOpts_.reliable = 0; // 异步发送 int rc = MQTTClient_create(&client_, serverURI_.c_str(), clientId_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr); if (rc != MQTTCLIENT_SUCCESS) { LOG_ERROR("Failed to create MQTT client, return code: " + std::to_string(rc)); return; } MQTTClient_setCallbacks(client_, this, connectionLostCallback, messageArrivedCallback, deliveryCompleteCallback); } MqttClient::~MqttClient() { shouldReconnect_ = false; stopMessageProcessor(); disconnect(); if (reconnectThread_.joinable()) { reconnectThread_.join(); } // 销毁 MQTT 客户端 if (client_) { MQTTClient_destroy(&client_); } } bool MqttClient::connect(const std::string& username, const std::string& password) { username_ = username; password_ = password; // 验证字符串是否为有效UTF-8(实际上确保为ASCII以避免问题) auto isValidMqttString = [](const std::string& str) -> bool { for (unsigned char c : str) { if (c > 127) { // 非ASCII字符 return false; } if (c < 32 && c != '\t' && c != '\n' && c != '\r') { // 控制字符(除了常见的空白字符) return false; } } return true; }; if (!isValidMqttString(username_)) { LOG_ERROR("Username contains invalid characters"); return false; } if (!isValidMqttString(password_)) { LOG_ERROR("Password contains invalid characters"); return false; } connOpts_ = MQTTClient_connectOptions_initializer; connOpts_.keepAliveInterval = 20; connOpts_.cleansession = 1; connOpts_.reliable = 0; if (!username_.empty()) { connOpts_.username = username_.c_str(); if (!password_.empty()) { connOpts_.password = password_.c_str(); } } int rc = MQTTClient_connect(client_, &connOpts_); if (rc != MQTTCLIENT_SUCCESS) { LOG_ERROR("Failed to connect to MQTT broker, return code: " + std::to_string(rc)); switch (rc) { case 1: LOG_ERROR("Connection refused: unacceptable protocol version"); break; case 2: LOG_ERROR("Connection refused: identifier rejected"); break; case 3: LOG_ERROR("Connection refused: server unavailable"); break; case 4: LOG_ERROR("Connection refused: bad user name or password (UTF-8 encoding issue)"); break; case 5: LOG_ERROR("Connection refused: not authorized"); break; default: LOG_ERROR("Connection error: code " + std::to_string(rc)); break; } return false; } connected_ = true; LOG_INFO("Connected to MQTT broker successfully"); return true; } void MqttClient::disconnect() { if (client_ && MQTTClient_isConnected(client_)) { LOG_INFO("Disconnecting from MQTT broker"); int rc = MQTTClient_disconnect(client_, 5000); // 5 seconds timeout if (rc != MQTTCLIENT_SUCCESS) { LOG_ERROR("Failed to disconnect from MQTT broker, return code: " + std::to_string(rc)); } connected_ = false; } } bool MqttClient::isConnected() const { return connected_ && client_ && MQTTClient_isConnected(client_); } bool MqttClient::subscribe(const std::string& topic, int qos) { if (!isConnected()) { LOG_ERROR("Cannot subscribe: not connected to MQTT broker"); return false; } LOG_INFO("Subscribing to topic: " + topic); int rc = MQTTClient_subscribe(client_, topic.c_str(), qos); if (rc != MQTTCLIENT_SUCCESS) { LOG_ERROR("Failed to subscribe to topic: " + topic + ", return code: " + std::to_string(rc)); return false; } return true; } bool MqttClient::unsubscribe(const std::string& topic) { if (!isConnected()) { LOG_ERROR("Cannot unsubscribe: not connected to MQTT broker"); return false; } LOG_INFO("Unsubscribing from topic: " + topic); int rc = MQTTClient_unsubscribe(client_, topic.c_str()); if (rc != MQTTCLIENT_SUCCESS) { LOG_ERROR("Failed to unsubscribe from topic: " + topic + ", return code: " + std::to_string(rc)); return false; } return true; } bool MqttClient::publish(const std::string& topic, const std::string& payload, int qos, bool retain) { if (!isConnected()) { LOG_ERROR("Cannot publish: not connected to MQTT broker"); return false; } MQTTClient_message message = MQTTClient_message_initializer; message.payload = const_cast(payload.c_str()); message.payloadlen = payload.length(); message.qos = qos; message.retained = retain ? 1 : 0; MQTTClient_deliveryToken token; int rc = MQTTClient_publishMessage(client_, topic.c_str(), &message, &token); if (rc != MQTTCLIENT_SUCCESS) { LOG_ERROR("Failed to publish message to topic: " + topic + ", return code: " + std::to_string(rc)); return false; } return true; } bool MqttClient::publishJson(const std::string& topic, const nlohmann::json& json, int qos, bool retain) { return publish(topic, json.dump(), qos, retain); } void MqttClient::setMessageCb(MessageCb cb) { messageCb_ = cb; } void MqttClient::setConnectionCb(ConnectionCb cb) { connectionCb_ = cb; } void MqttClient::startMessageProcessor() { if (processorRunning_) { return; } processorRunning_ = true; messageProcessor_ = std::thread(&MqttClient::processMessageQueue, this); } void MqttClient::stopMessageProcessor() { if (!processorRunning_) { return; } processorRunning_ = false; queueCondition_.notify_all(); if (messageProcessor_.joinable()) { messageProcessor_.join(); } LOG_INFO("MQTT message processor stopped"); } void MqttClient::processMessageQueue() { while (processorRunning_) { std::unique_lock lock(queueMutex_); queueCondition_.wait(lock, [this] { return !messageQueue_.empty() || !processorRunning_; }); while (!messageQueue_.empty() && processorRunning_) { auto message = messageQueue_.front(); messageQueue_.pop(); lock.unlock(); if (messageCb_) { try { messageCb_(message.topic, message.payload); } catch (const std::exception& e) { LOG_ERROR("Exception in message callback: " + std::string(e.what())); } } lock.lock(); } } } int MqttClient::messageArrivedCallback(void* context, char* topicName, int topicLen, MQTTClient_message* message) { MqttClient* client = static_cast(context); std::string topic(topicName); std::string payload(static_cast(message->payload), message->payloadlen); QueuedMessage queuedMsg; queuedMsg.topic = topic; queuedMsg.payload = payload; { std::lock_guard lock(client->queueMutex_); client->messageQueue_.push(queuedMsg); } client->queueCondition_.notify_one(); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } void MqttClient::connectionLostCallback(void* context, char* cause) { MqttClient* client = static_cast(context); client->handleConnectionLost(); std::string causeStr = cause ? std::string(cause) : "Unknown reason"; LOG_WARN("MQTT connection lost: " + causeStr); } void MqttClient::deliveryCompleteCallback(void* context, MQTTClient_deliveryToken dt) { } void MqttClient::handleConnectionLost() { connected_ = false; if (connectionCb_) { connectionCb_(false); } if (shouldReconnect_ && !reconnecting_) { reconnecting_ = true; reconnectThread_ = std::thread(&MqttClient::reconnectWorker, this); } } void MqttClient::handleConnectionEstablished() { connected_ = true; reconnecting_ = false; LOG_INFO("MQTT connection established"); if (connectionCb_) { connectionCb_(true); } } void MqttClient::reconnectWorker() { while (shouldReconnect_ && !connected_) { LOG_INFO("Attempting to reconnect to MQTT broker..."); if (connect(username_, password_)) { LOG_INFO("Successfully reconnected to MQTT broker"); break; } std::this_thread::sleep_for(std::chrono::seconds(reconnectDelay_)); } reconnecting_ = false; } } // namespace custom