Add initial project structure with CMake configuration, source files, and README documentation for Unitree GO2 Custom Controller
This commit is contained in:
265
src/mqtt.cpp
Normal file
265
src/mqtt.cpp
Normal file
@@ -0,0 +1,265 @@
|
||||
#include "mqtt.hpp"
|
||||
#include "logger.hpp"
|
||||
#include <sstream>
|
||||
|
||||
namespace custom {
|
||||
|
||||
MqttClient::MqttClient(const std::string& broker, int port, const std::string& clientId)
|
||||
: broker_(broker), port_(port), clientId_(clientId),
|
||||
connected_(false), reconnecting_(false), processorRunning_(false),
|
||||
shouldReconnect_(true) {
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "tcp://" << broker_ << ":" << port_;
|
||||
std::string serverURI = ss.str();
|
||||
|
||||
client_ = std::make_unique<mqtt::async_client>(serverURI, clientId_);
|
||||
callback_ = std::make_unique<Callback>(this);
|
||||
client_->set_callback(*callback_);
|
||||
}
|
||||
|
||||
MqttClient::~MqttClient() {
|
||||
shouldReconnect_ = false;
|
||||
stopMessageProcessor();
|
||||
disconnect();
|
||||
|
||||
if (reconnectThread_.joinable()) {
|
||||
reconnectThread_.join();
|
||||
}
|
||||
}
|
||||
|
||||
bool MqttClient::connect(const std::string& username, const std::string& password) {
|
||||
try {
|
||||
username_ = username;
|
||||
password_ = password;
|
||||
|
||||
mqtt::connect_options connOpts;
|
||||
connOpts.set_keep_alive_interval(20);
|
||||
connOpts.set_clean_session(true);
|
||||
connOpts.set_automatic_reconnect(true);
|
||||
|
||||
if (!username_.empty()) {
|
||||
connOpts.set_user_name(username_);
|
||||
if (!password_.empty()) {
|
||||
connOpts.set_password(password_);
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO("Connecting to MQTT broker: " + broker_ + ":" + std::to_string(port_));
|
||||
|
||||
auto token = client_->connect(connOpts, nullptr, *callback_);
|
||||
token->wait_for(std::chrono::seconds(10));
|
||||
|
||||
if (token->is_complete() && client_->is_connected()) {
|
||||
connected_ = true;
|
||||
LOG_INFO("Connected to MQTT broker successfully");
|
||||
return true;
|
||||
} else {
|
||||
LOG_ERROR("Failed to connect to MQTT broker");
|
||||
return false;
|
||||
}
|
||||
|
||||
} catch (const mqtt::exception& e) {
|
||||
LOG_ERROR("MQTT connection exception: " + std::string(e.what()));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void MqttClient::disconnect() {
|
||||
try {
|
||||
if (client_ && client_->is_connected()) {
|
||||
LOG_INFO("Disconnecting from MQTT broker");
|
||||
auto token = client_->disconnect();
|
||||
token->wait_for(std::chrono::seconds(5));
|
||||
connected_ = false;
|
||||
}
|
||||
} catch (const mqtt::exception& e) {
|
||||
LOG_ERROR("MQTT disconnect exception: " + std::string(e.what()));
|
||||
}
|
||||
}
|
||||
|
||||
bool MqttClient::isConnected() const {
|
||||
return connected_ && client_ && client_->is_connected();
|
||||
}
|
||||
|
||||
bool MqttClient::subscribe(const std::string& topic, int qos) {
|
||||
try {
|
||||
if (!isConnected()) {
|
||||
LOG_ERROR("Cannot subscribe: not connected to MQTT broker");
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG_INFO("Subscribing to topic: " + topic);
|
||||
auto token = client_->subscribe(topic, qos, nullptr, *callback_);
|
||||
token->wait_for(std::chrono::seconds(5));
|
||||
|
||||
return token->is_complete();
|
||||
|
||||
} catch (const mqtt::exception& e) {
|
||||
LOG_ERROR("MQTT subscribe exception: " + std::string(e.what()));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool MqttClient::unsubscribe(const std::string& topic) {
|
||||
try {
|
||||
if (!isConnected()) {
|
||||
LOG_ERROR("Cannot unsubscribe: not connected to MQTT broker");
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG_INFO("Unsubscribing from topic: " + topic);
|
||||
auto token = client_->unsubscribe(topic, nullptr, *callback_);
|
||||
token->wait_for(std::chrono::seconds(5));
|
||||
|
||||
return token->is_complete();
|
||||
|
||||
} catch (const mqtt::exception& e) {
|
||||
LOG_ERROR("MQTT unsubscribe exception: " + std::string(e.what()));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool MqttClient::publish(const std::string& topic, const std::string& payload, int qos, bool retain) {
|
||||
try {
|
||||
if (!isConnected()) {
|
||||
LOG_ERROR("Cannot publish: not connected to MQTT broker");
|
||||
return false;
|
||||
}
|
||||
|
||||
auto msg = mqtt::make_message(topic, payload);
|
||||
msg->set_qos(qos);
|
||||
msg->set_retained(retain);
|
||||
|
||||
auto token = client_->publish(msg, nullptr, *callback_);
|
||||
// Don't wait for completion to avoid blocking
|
||||
|
||||
return true;
|
||||
|
||||
} catch (const mqtt::exception& e) {
|
||||
LOG_ERROR("MQTT publish exception: " + std::string(e.what()));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool MqttClient::publishJson(const std::string& topic, const nlohmann::json& json, int qos, bool retain) {
|
||||
return publish(topic, json.dump(), qos, retain);
|
||||
}
|
||||
|
||||
void MqttClient::setMessageCallback(MessageCallback callback) {
|
||||
messageCallback_ = callback;
|
||||
}
|
||||
|
||||
void MqttClient::setConnectionCallback(ConnectionCallback callback) {
|
||||
connectionCallback_ = callback;
|
||||
}
|
||||
|
||||
void MqttClient::startMessageProcessor() {
|
||||
if (processorRunning_) {
|
||||
return;
|
||||
}
|
||||
|
||||
processorRunning_ = true;
|
||||
messageProcessor_ = std::thread(&MqttClient::processMessageQueue, this);
|
||||
LOG_INFO("MQTT message processor started");
|
||||
}
|
||||
|
||||
void MqttClient::stopMessageProcessor() {
|
||||
if (!processorRunning_) {
|
||||
return;
|
||||
}
|
||||
|
||||
processorRunning_ = false;
|
||||
queueCondition_.notify_all();
|
||||
|
||||
if (messageProcessor_.joinable()) {
|
||||
messageProcessor_.join();
|
||||
}
|
||||
|
||||
LOG_INFO("MQTT message processor stopped");
|
||||
}
|
||||
|
||||
void MqttClient::processMessageQueue() {
|
||||
while (processorRunning_) {
|
||||
std::unique_lock<std::mutex> lock(queueMutex_);
|
||||
queueCondition_.wait(lock, [this] {
|
||||
return !messageQueue_.empty() || !processorRunning_;
|
||||
});
|
||||
|
||||
while (!messageQueue_.empty() && processorRunning_) {
|
||||
auto message = messageQueue_.front();
|
||||
messageQueue_.pop();
|
||||
lock.unlock();
|
||||
|
||||
if (messageCallback_) {
|
||||
try {
|
||||
messageCallback_(message.topic, message.payload);
|
||||
} catch (const std::exception& e) {
|
||||
LOG_ERROR("Exception in message callback: " + std::string(e.what()));
|
||||
}
|
||||
}
|
||||
|
||||
lock.lock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void MqttClient::handleConnectionLost() {
|
||||
connected_ = false;
|
||||
LOG_WARN("MQTT connection lost");
|
||||
|
||||
if (connectionCallback_) {
|
||||
connectionCallback_(false);
|
||||
}
|
||||
|
||||
if (shouldReconnect_ && !reconnecting_) {
|
||||
reconnecting_ = true;
|
||||
reconnectThread_ = std::thread(&MqttClient::handleConnectionLost, this);
|
||||
}
|
||||
}
|
||||
|
||||
void MqttClient::handleConnectionEstablished() {
|
||||
connected_ = true;
|
||||
reconnecting_ = false;
|
||||
LOG_INFO("MQTT connection established");
|
||||
|
||||
if (connectionCallback_) {
|
||||
connectionCallback_(true);
|
||||
}
|
||||
}
|
||||
|
||||
// Callback implementation
|
||||
void MqttClient::Callback::connected(const std::string& cause) {
|
||||
client_->handleConnectionEstablished();
|
||||
}
|
||||
|
||||
void MqttClient::Callback::connection_lost(const std::string& cause) {
|
||||
LOG_WARN("MQTT connection lost: " + cause);
|
||||
client_->handleConnectionLost();
|
||||
}
|
||||
|
||||
void MqttClient::Callback::message_arrived(mqtt::const_message_ptr msg) {
|
||||
QueuedMessage queuedMsg;
|
||||
queuedMsg.topic = msg->get_topic();
|
||||
queuedMsg.payload = msg->to_string();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(client_->queueMutex_);
|
||||
client_->messageQueue_.push(queuedMsg);
|
||||
}
|
||||
client_->queueCondition_.notify_one();
|
||||
}
|
||||
|
||||
void MqttClient::Callback::delivery_complete(mqtt::delivery_token_ptr token) {
|
||||
// Message delivered successfully
|
||||
}
|
||||
|
||||
void MqttClient::Callback::on_failure(const mqtt::token& tok) {
|
||||
LOG_ERROR("MQTT operation failed");
|
||||
}
|
||||
|
||||
void MqttClient::Callback::on_success(const mqtt::token& tok) {
|
||||
// Operation completed successfully
|
||||
}
|
||||
|
||||
} // namespace custom
|
||||
Reference in New Issue
Block a user