88 lines
2.5 KiB
C++
88 lines
2.5 KiB
C++
#pragma once
|
|
|
|
#include <string>
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <thread>
|
|
#include <atomic>
|
|
#include <queue>
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
#include <nlohmann/json.hpp>
|
|
#include <MQTTClient.h> // Paho MQTT C library
|
|
|
|
namespace custom {
|
|
|
|
class MqttClient {
|
|
public:
|
|
using MessageCb = std::function<void(const std::string& topic, const std::string& payload)>;
|
|
using ConnectionCb = std::function<void(bool connected)>;
|
|
|
|
MqttClient(const std::string& broker, int port, const std::string& clientId);
|
|
~MqttClient();
|
|
|
|
bool connect(const std::string& username = "", const std::string& password = "");
|
|
void disconnect();
|
|
bool isConnected() const;
|
|
|
|
bool subscribe(const std::string& topic, int qos = 1);
|
|
bool unsubscribe(const std::string& topic);
|
|
|
|
bool publish(const std::string& topic, const std::string& payload, int qos = 1, bool retain = false);
|
|
bool publishJson(const std::string& topic, const nlohmann::json& json, int qos = 1, bool retain = false);
|
|
|
|
void setMessageCb(MessageCb cb);
|
|
void setConnectionCb(ConnectionCb cb);
|
|
|
|
// Message queue for async processing
|
|
void startMessageProcessor();
|
|
void stopMessageProcessor();
|
|
|
|
private:
|
|
struct QueuedMessage {
|
|
std::string topic;
|
|
std::string payload;
|
|
};
|
|
|
|
// C callback functions (static)
|
|
static int messageArrivedCallback(void* context, char* topicName, int topicLen, MQTTClient_message* message);
|
|
static void connectionLostCallback(void* context, char* cause);
|
|
static void deliveryCompleteCallback(void* context, MQTTClient_deliveryToken dt);
|
|
|
|
void processMessageQueue();
|
|
void handleConnectionLost();
|
|
void handleConnectionEstablished();
|
|
void reconnectWorker();
|
|
|
|
MQTTClient client_;
|
|
|
|
MessageCb messageCb_;
|
|
ConnectionCb connectionCb_;
|
|
|
|
std::atomic<bool> connected_;
|
|
std::atomic<bool> reconnecting_;
|
|
|
|
// Message processing
|
|
std::queue<QueuedMessage> messageQueue_;
|
|
std::mutex queueMutex_;
|
|
std::condition_variable queueCondition_;
|
|
std::thread messageProcessor_;
|
|
std::atomic<bool> processorRunning_;
|
|
|
|
// Connection parameters
|
|
std::string serverURI_;
|
|
std::string clientId_;
|
|
std::string username_;
|
|
std::string password_;
|
|
|
|
// Reconnection
|
|
std::thread reconnectThread_;
|
|
std::atomic<bool> shouldReconnect_;
|
|
int reconnectDelay_ = 5; // seconds
|
|
|
|
// Connection options
|
|
MQTTClient_connectOptions connOpts_;
|
|
};
|
|
|
|
} // namespace custom
|