Implement MQTT functionality in CustomRobot class. Added methods for MQTT initialization, message handling, and connection management. Updated configuration to remove multimedia settings and streamline MQTT-related parameters. Enhanced logging for MQTT operations and ensured proper cleanup during shutdown.

This commit is contained in:
2025-09-08 18:35:02 +08:00
parent 926d88972d
commit 90228e4ab9
4 changed files with 174 additions and 43 deletions

View File

@@ -2,13 +2,17 @@
#include <thread>
#include <chrono>
#include <functional>
#include <ctime>
#include <random>
#include <sstream>
#include <iomanip>
#include <nlohmann/json.hpp>
#include <unitree/robot/channel/channel_factory.hpp>
namespace custom {
CustomRobot::CustomRobot()
: initialized_(false), running_(false) {
// 创建并加载配置
config_.loadDefaults();
try {
@@ -21,6 +25,15 @@ CustomRobot::CustomRobot()
CustomRobot::~CustomRobot() {
LOG_INFO("Shutting down CustomRobot");
// Cleanup MQTT client
if (mqttClient_) {
mqttClient_->stopMessageProcessor();
mqttClient_->disconnect();
mqttClient_.reset();
LOG_INFO("MQTT client disconnected and cleaned up");
}
if (controller_) {
controller_->shutdown();
controller_.reset();
@@ -48,6 +61,12 @@ bool CustomRobot::initialize() {
rsc_ = std::make_unique<unitree::robot::go2::RobotStateClient>();
rsc_->SetTimeout(3.0);
rsc_->Init();
// Initialize MQTT client
if (!initializeMqtt()) {
LOG_ERROR("Failed to initialize MQTT client");
return false;
}
} catch (const std::exception& e) {
LOG_ERROR("Failed to initialize : " + std::string(e.what()));
return false;
@@ -157,4 +176,149 @@ bool CustomRobot::SetReportFreq(int32_t interval, int32_t duration) {
}
}
bool CustomRobot::initializeMqtt() {
try {
std::string broker = config_->mqtt_broker;
int port = config_->mqtt_port;
std::string clientId = generateRandomClientId();
mqttClient_ = std::make_unique<MqttClient>(broker, port, clientId);
using namespace std::placeholders;
mqttClient_->setMessageCallback(std::bind(&CustomRobot::onMqttMessage, this, _1, _2));
mqttClient_->setConnectionCallback(std::bind(&CustomRobot::onMqttConnection, this, _1));
mqttClient_->startMessageProcessor();
if (!mqttClient_->connect()) {
LOG_ERROR("Failed to connect to MQTT broker");
return false;
}
std::string controlTopic = config_->topic_prefix + "/" + config_->topic_cmd;
mqttClient_->subscribe(controlTopic, 1);
LOG_INFO("MQTT client initialized successfully");
return true;
} catch (const std::exception& e) {
LOG_ERROR("Exception during MQTT initialization: " + std::string(e.what()));
return false;
}
}
void CustomRobot::onMqttMessage(const std::string& topic, const std::string& payload) {
try {
LOG_INFO("Received MQTT message on topic: " + topic + ", payload: " + payload);
// nlohmann::json message = nlohmann::json::parse(payload);
// std::string controlTopic = config_->topic_prefix + "/" + config_->topic_cmd;
// if (topic == controlTopic) {
// if (message.contains("command")) {
// std::string command = message["command"];
// if (command == "stand" && controller_) {
// controller_->standUp();
// // Publish status response
// nlohmann::json response;
// response["status"] = "executed";
// response["command"] = command;
// response["timestamp"] = std::time(nullptr);
// std::string statusTopic = config_->topic_prefix + "/" + config_->topic_state;
// mqttClient_->publishJson(statusTopic, response);
// // Publish updated status
// publishStatus();
// } else if (command == "sit" && controller_) {
// controller_->sitDown();
// nlohmann::json response;
// response["status"] = "executed";
// response["command"] = command;
// response["timestamp"] = std::time(nullptr);
// std::string statusTopic = config_->topic_prefix + "/" + config_->topic_state;
// mqttClient_->publishJson(statusTopic, response);
// // Publish updated status
// publishStatus();
// }
// } else {
// // Unknown command, send error response
// nlohmann::json response;
// response["status"] = "error";
// response["message"] = "Unknown command: " + command;
// response["timestamp"] = std::time(nullptr);
// std::string statusTopic = config_->topic_prefix + "/" + config_->topic_state;
// mqttClient_->publishJson(statusTopic, response);
// // Publish updated status
// publishStatus();
// }
// }
} catch (const std::exception& e) {
LOG_ERROR("Error processing MQTT message: " + std::string(e.what()));
}
}
void CustomRobot::onMqttConnection(bool connected) {
if (connected) {
LOG_INFO("MQTT client connected to broker");
nlohmann::json status;
status["status"] = "online";
status["timestamp"] = std::time(nullptr);
std::string statusTopic = config_->topic_prefix + "/" + config_->topic_state;
mqttClient_->publishJson(statusTopic, status, 1, true);
publishStatus();
} else {
LOG_WARN("MQTT client disconnected from broker");
}
}
void CustomRobot::publishStatus() {
if (!mqttClient_ || !mqttClient_->isConnected()) {
return;
}
try {
nlohmann::json status;
status["initialized"] = initialized_.load();
status["running"] = running_.load();
status["mqtt_connected"] = mqttClient_->isConnected();
status["timestamp"] = std::time(nullptr);
if (controller_) {
status["controller_connected"] = controller_->isConnected();
}
std::string statusTopic = config_->topic_prefix + "/" + config_->topic_state;
mqttClient_->publishJson(statusTopic, status, 1, true);
LOG_DEBUG("Published status to topic: " + statusTopic);
} catch (const std::exception& e) {
LOG_ERROR("Failed to publish status: " + std::string(e.what()));
}
}
std::string CustomRobot::generateRandomClientId() const {
std::string baseId = config_->mqtt_client_id;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1000, 9999);
std::stringstream ss;
ss << baseId << "_" << std::time(nullptr) << "_" << dis(gen);
return ss.str();
}
} // namespace custom