fix(MqttManager): 防止MQTT客户端重复连接
添加 `isConnecting` 标志位以跟踪连接状态,避免在连接过程中发起新的连接请求。 在连接失败时安全重置客户端,并在连接完成或丢失时正确更新状态标志。
This commit is contained in:
@@ -36,6 +36,8 @@ class MqttManager(
|
|||||||
private val job = SupervisorJob()
|
private val job = SupervisorJob()
|
||||||
private val scope = CoroutineScope(Dispatchers.IO + job)
|
private val scope = CoroutineScope(Dispatchers.IO + job)
|
||||||
private var reconnectJob: Job? = null
|
private var reconnectJob: Job? = null
|
||||||
|
@Volatile
|
||||||
|
private var isConnecting: Boolean = false
|
||||||
private val prefs = appContext.getSharedPreferences("app_prefs", Context.MODE_PRIVATE)
|
private val prefs = appContext.getSharedPreferences("app_prefs", Context.MODE_PRIVATE)
|
||||||
private val agentDempIdKey = "agent_demp_id"
|
private val agentDempIdKey = "agent_demp_id"
|
||||||
|
|
||||||
@@ -57,10 +59,15 @@ class MqttManager(
|
|||||||
private var danceJob: Job? = null
|
private var danceJob: Job? = null
|
||||||
|
|
||||||
init {
|
init {
|
||||||
|
createMqttClient()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun createMqttClient() {
|
||||||
try {
|
try {
|
||||||
mqttClient = MqttClient(brokerUri, clientId, MemoryPersistence())
|
mqttClient = MqttClient(brokerUri, clientId, MemoryPersistence())
|
||||||
mqttClient?.setCallback(object : MqttCallbackExtended {
|
mqttClient?.setCallback(object : MqttCallbackExtended {
|
||||||
override fun connectComplete(reconnect: Boolean, serverURI: String?) {
|
override fun connectComplete(reconnect: Boolean, serverURI: String?) {
|
||||||
|
isConnecting = false
|
||||||
Log.i(TAG, "MQTT connection complete. Reconnect: $reconnect")
|
Log.i(TAG, "MQTT connection complete. Reconnect: $reconnect")
|
||||||
subscribeTopic("robot/cmd")
|
subscribeTopic("robot/cmd")
|
||||||
subscribeTopic("soul2user")
|
subscribeTopic("soul2user")
|
||||||
@@ -68,6 +75,7 @@ class MqttManager(
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun connectionLost(cause: Throwable?) {
|
override fun connectionLost(cause: Throwable?) {
|
||||||
|
isConnecting = false
|
||||||
Log.e(TAG, "Connection lost: ${cause?.message}")
|
Log.e(TAG, "Connection lost: ${cause?.message}")
|
||||||
updateConnectionStatus(false)
|
updateConnectionStatus(false)
|
||||||
scheduleReconnect()
|
scheduleReconnect()
|
||||||
@@ -96,6 +104,10 @@ class MqttManager(
|
|||||||
updateConnectionStatus(true)
|
updateConnectionStatus(true)
|
||||||
return@launch
|
return@launch
|
||||||
}
|
}
|
||||||
|
if (isConnecting) {
|
||||||
|
Log.d(TAG, "MQTT connect skipped: already connecting.")
|
||||||
|
return@launch
|
||||||
|
}
|
||||||
val username = prefs.getString(HttpManager.PREF_KEY_MQTT_USERNAME, "").orEmpty().trim()
|
val username = prefs.getString(HttpManager.PREF_KEY_MQTT_USERNAME, "").orEmpty().trim()
|
||||||
val password = prefs.getString(HttpManager.PREF_KEY_MQTT_PASSWORD, "").orEmpty()
|
val password = prefs.getString(HttpManager.PREF_KEY_MQTT_PASSWORD, "").orEmpty()
|
||||||
if (username.isEmpty() || password.isEmpty()) {
|
if (username.isEmpty() || password.isEmpty()) {
|
||||||
@@ -103,8 +115,12 @@ class MqttManager(
|
|||||||
updateConnectionStatus(false)
|
updateConnectionStatus(false)
|
||||||
return@launch
|
return@launch
|
||||||
}
|
}
|
||||||
|
if (mqttClient == null) {
|
||||||
|
createMqttClient()
|
||||||
|
}
|
||||||
Log.i(TAG, "Attempting to connect to MQTT broker at $brokerUri")
|
Log.i(TAG, "Attempting to connect to MQTT broker at $brokerUri")
|
||||||
try {
|
try {
|
||||||
|
isConnecting = true
|
||||||
val options = MqttConnectOptions().apply {
|
val options = MqttConnectOptions().apply {
|
||||||
isAutomaticReconnect = false
|
isAutomaticReconnect = false
|
||||||
isCleanSession = true
|
isCleanSession = true
|
||||||
@@ -114,8 +130,10 @@ class MqttManager(
|
|||||||
this.password = password.toCharArray()
|
this.password = password.toCharArray()
|
||||||
}
|
}
|
||||||
mqttClient?.connect(options)
|
mqttClient?.connect(options)
|
||||||
} catch (e: MqttException) {
|
} catch (t: Throwable) {
|
||||||
Log.e(TAG, "Initial connection failed: ${e.message}")
|
Log.e(TAG, "Initial connection failed", t)
|
||||||
|
isConnecting = false
|
||||||
|
resetClientSafely()
|
||||||
updateConnectionStatus(false)
|
updateConnectionStatus(false)
|
||||||
scheduleReconnect()
|
scheduleReconnect()
|
||||||
}
|
}
|
||||||
@@ -137,6 +155,7 @@ class MqttManager(
|
|||||||
scope.launch {
|
scope.launch {
|
||||||
try {
|
try {
|
||||||
reconnectJob?.cancel()
|
reconnectJob?.cancel()
|
||||||
|
isConnecting = false
|
||||||
if (mqttClient?.isConnected == true) {
|
if (mqttClient?.isConnected == true) {
|
||||||
mqttClient?.disconnect()
|
mqttClient?.disconnect()
|
||||||
Log.i(TAG, "Disconnected from MQTT broker.")
|
Log.i(TAG, "Disconnected from MQTT broker.")
|
||||||
@@ -151,7 +170,7 @@ class MqttManager(
|
|||||||
|
|
||||||
fun shutdown() {
|
fun shutdown() {
|
||||||
reconnectJob?.cancel()
|
reconnectJob?.cancel()
|
||||||
job.cancel()
|
isConnecting = false
|
||||||
try {
|
try {
|
||||||
if (mqttClient?.isConnected == true) {
|
if (mqttClient?.isConnected == true) {
|
||||||
mqttClient?.disconnect()
|
mqttClient?.disconnect()
|
||||||
@@ -165,6 +184,16 @@ class MqttManager(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun resetClientSafely() {
|
||||||
|
try {
|
||||||
|
mqttClient?.close()
|
||||||
|
} catch (_: Exception) {
|
||||||
|
} finally {
|
||||||
|
mqttClient = null
|
||||||
|
}
|
||||||
|
createMqttClient()
|
||||||
|
}
|
||||||
|
|
||||||
private fun updateConnectionStatus(connected: Boolean) {
|
private fun updateConnectionStatus(connected: Boolean) {
|
||||||
Handler(Looper.getMainLooper()).post {
|
Handler(Looper.getMainLooper()).post {
|
||||||
statusListener(connected)
|
statusListener(connected)
|
||||||
|
|||||||
Reference in New Issue
Block a user