From 6a3431d741d597687142dd7f0fb61aa600c2a894 Mon Sep 17 00:00:00 2001 From: tanjianbin <632190820@qq.com> Date: Mon, 20 Apr 2026 19:51:36 +0800 Subject: [PATCH] =?UTF-8?q?fix(MqttManager):=20=E9=98=B2=E6=AD=A2MQTT?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E9=87=8D=E5=A4=8D=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加 `isConnecting` 标志位以跟踪连接状态,避免在连接过程中发起新的连接请求。 在连接失败时安全重置客户端,并在连接完成或丢失时正确更新状态标志。 --- .../lzwcai_terminal_temi/MqttManager.kt | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/app/src/main/java/com/example/lzwcai_terminal_temi/MqttManager.kt b/app/src/main/java/com/example/lzwcai_terminal_temi/MqttManager.kt index 92f06d2..d3bae7a 100644 --- a/app/src/main/java/com/example/lzwcai_terminal_temi/MqttManager.kt +++ b/app/src/main/java/com/example/lzwcai_terminal_temi/MqttManager.kt @@ -36,6 +36,8 @@ class MqttManager( private val job = SupervisorJob() private val scope = CoroutineScope(Dispatchers.IO + job) private var reconnectJob: Job? = null + @Volatile + private var isConnecting: Boolean = false private val prefs = appContext.getSharedPreferences("app_prefs", Context.MODE_PRIVATE) private val agentDempIdKey = "agent_demp_id" @@ -57,10 +59,15 @@ class MqttManager( private var danceJob: Job? = null init { + createMqttClient() + } + + private fun createMqttClient() { try { mqttClient = MqttClient(brokerUri, clientId, MemoryPersistence()) mqttClient?.setCallback(object : MqttCallbackExtended { override fun connectComplete(reconnect: Boolean, serverURI: String?) { + isConnecting = false Log.i(TAG, "MQTT connection complete. Reconnect: $reconnect") subscribeTopic("robot/cmd") subscribeTopic("soul2user") @@ -68,6 +75,7 @@ class MqttManager( } override fun connectionLost(cause: Throwable?) { + isConnecting = false Log.e(TAG, "Connection lost: ${cause?.message}") updateConnectionStatus(false) scheduleReconnect() @@ -96,6 +104,10 @@ class MqttManager( updateConnectionStatus(true) return@launch } + if (isConnecting) { + Log.d(TAG, "MQTT connect skipped: already connecting.") + return@launch + } val username = prefs.getString(HttpManager.PREF_KEY_MQTT_USERNAME, "").orEmpty().trim() val password = prefs.getString(HttpManager.PREF_KEY_MQTT_PASSWORD, "").orEmpty() if (username.isEmpty() || password.isEmpty()) { @@ -103,8 +115,12 @@ class MqttManager( updateConnectionStatus(false) return@launch } + if (mqttClient == null) { + createMqttClient() + } Log.i(TAG, "Attempting to connect to MQTT broker at $brokerUri") try { + isConnecting = true val options = MqttConnectOptions().apply { isAutomaticReconnect = false isCleanSession = true @@ -114,8 +130,10 @@ class MqttManager( this.password = password.toCharArray() } mqttClient?.connect(options) - } catch (e: MqttException) { - Log.e(TAG, "Initial connection failed: ${e.message}") + } catch (t: Throwable) { + Log.e(TAG, "Initial connection failed", t) + isConnecting = false + resetClientSafely() updateConnectionStatus(false) scheduleReconnect() } @@ -137,6 +155,7 @@ class MqttManager( scope.launch { try { reconnectJob?.cancel() + isConnecting = false if (mqttClient?.isConnected == true) { mqttClient?.disconnect() Log.i(TAG, "Disconnected from MQTT broker.") @@ -151,7 +170,7 @@ class MqttManager( fun shutdown() { reconnectJob?.cancel() - job.cancel() + isConnecting = false try { if (mqttClient?.isConnected == true) { mqttClient?.disconnect() @@ -165,6 +184,16 @@ class MqttManager( } } + private fun resetClientSafely() { + try { + mqttClient?.close() + } catch (_: Exception) { + } finally { + mqttClient = null + } + createMqttClient() + } + private fun updateConnectionStatus(connected: Boolean) { Handler(Looper.getMainLooper()).post { statusListener(connected)