feat: 添加权限管理并增强导航功能
- 新增 PermissionManager 用于检查和请求 Temi 机器人权限 - 在 AndroidManifest 中声明所需的权限元数据 - 为 NavController 添加充电功能和改进的巡逻逻辑 - 扩展 MQTT 命令支持,包括充电和可配置巡逻 - 添加 Python 测试脚本用于 MQTT 流式文本测试 - 使用任务状态跟踪替代原有的接待模式标志
This commit is contained in:
100
test.py
Normal file
100
test.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import paho.mqtt.client as mqtt
|
||||
import json
|
||||
import time
|
||||
import sys
|
||||
import random
|
||||
|
||||
# --- MQTT 配置 ---
|
||||
# 重要:请将这里替换为您的 MQTT 代理的 IP 地址
|
||||
BROKER_IP = "192.168.2.236"
|
||||
BROKER_PORT = 1883
|
||||
MQTT_TOPIC = "robot/cmd"
|
||||
MQTT_USERNAME = "lzwc"
|
||||
MQTT_PASSWORD = "Lzwc@4187."
|
||||
|
||||
# --- 要进行流式传输的文本 ---
|
||||
# 您可以更改此文本
|
||||
TEXT_TO_STREAM = "你好,我是一个先进的人工智能助理。我的设计目标是理解并生成自然语言,从而能够与人类进行流畅的对话。我可以回答问题、提供信息、撰写文章,甚至进行一些基础的编程任务。这个流式传输演示旨在展示我如何将长篇回复分解成小的数据块,并实时发送给客户端,从而创造出一种更具互动性和即时性的体验。希望这次演示能够清晰地展示我的能力。"
|
||||
|
||||
# --- 流媒体参数 ---
|
||||
# 每个数据块中发送的字符长度范围
|
||||
MIN_CHUNK_SIZE = 2
|
||||
MAX_CHUNK_SIZE = 8
|
||||
# 块之间的延迟(秒)
|
||||
STREAM_DELAY = 0.1
|
||||
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
"""客户端连接到代理时的回调。"""
|
||||
if rc == 0:
|
||||
print("成功连接到 MQTT 代理!")
|
||||
else:
|
||||
print(f"连接失败, 返回码 {rc}\n")
|
||||
sys.exit(1)
|
||||
|
||||
def on_publish(client, userdata, mid):
|
||||
"""消息发布时的回调。"""
|
||||
# 默认注释掉,因为可能会产生大量输出
|
||||
# print(f"已发布消息,mid: {mid}")
|
||||
pass
|
||||
|
||||
def create_mqtt_client():
|
||||
"""创建并配置 MQTT 客户端。"""
|
||||
client = mqtt.Client()
|
||||
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
|
||||
client.on_connect = on_connect
|
||||
client.on_publish = on_publish
|
||||
return client
|
||||
|
||||
def stream_text(client, text_to_stream):
|
||||
"""将给定的文本以数据块的形式流式传输到 MQTT 主题。"""
|
||||
print(f"开始向主题 '{MQTT_TOPIC}' 流式传输文本:")
|
||||
print(f"文本: {text_to_stream}")
|
||||
|
||||
index = 0
|
||||
while index < len(text_to_stream):
|
||||
# 生成一个随机的块大小
|
||||
chunk_size = random.randint(MIN_CHUNK_SIZE, MAX_CHUNK_SIZE)
|
||||
# 获取块,确保不会超出文本长度
|
||||
chunk = text_to_stream[index:index + chunk_size]
|
||||
index += len(chunk)
|
||||
|
||||
# 创建 JSON 负载
|
||||
payload = {
|
||||
"action": "stream",
|
||||
"text": chunk,
|
||||
"lang": "zh" # 假设是中文,如果需要可以更改
|
||||
}
|
||||
payload_json = json.dumps(payload, ensure_ascii=False)
|
||||
|
||||
# 发布消息
|
||||
result = client.publish(MQTT_TOPIC, payload_json)
|
||||
|
||||
# 检查发布是否成功
|
||||
if result.rc != mqtt.MQTT_ERR_SUCCESS:
|
||||
print(f"发送消息失败: {mqtt.error_string(result.rc)}")
|
||||
else:
|
||||
print(f"已发送块: '{chunk}'")
|
||||
|
||||
# 等待一小段时间以模拟流式传输
|
||||
time.sleep(STREAM_DELAY)
|
||||
|
||||
print("\n流式传输完成。")
|
||||
|
||||
def main():
|
||||
"""运行 MQTT 流式脚本的主函数。"""
|
||||
client = create_mqtt_client()
|
||||
|
||||
try:
|
||||
client.connect(BROKER_IP, BROKER_PORT)
|
||||
except Exception as e:
|
||||
print(f"连接到代理 {BROKER_IP}:{BROKER_PORT} 时出错。请检查 IP 地址。")
|
||||
print(f"错误: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
stream_text(client, TEXT_TO_STREAM)
|
||||
|
||||
client.disconnect()
|
||||
print("已从 MQTT 代理断开。")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user