- 新增 PermissionManager 用于检查和请求 Temi 机器人权限 - 在 AndroidManifest 中声明所需的权限元数据 - 为 NavController 添加充电功能和改进的巡逻逻辑 - 扩展 MQTT 命令支持,包括充电和可配置巡逻 - 添加 Python 测试脚本用于 MQTT 流式文本测试 - 使用任务状态跟踪替代原有的接待模式标志
101 lines
3.3 KiB
Python
101 lines
3.3 KiB
Python
import paho.mqtt.client as mqtt
|
||
import json
|
||
import time
|
||
import sys
|
||
import random
|
||
|
||
# --- MQTT 配置 ---
|
||
# 重要:请将这里替换为您的 MQTT 代理的 IP 地址
|
||
BROKER_IP = "192.168.2.236"
|
||
BROKER_PORT = 1883
|
||
MQTT_TOPIC = "robot/cmd"
|
||
MQTT_USERNAME = "lzwc"
|
||
MQTT_PASSWORD = "Lzwc@4187."
|
||
|
||
# --- 要进行流式传输的文本 ---
|
||
# 您可以更改此文本
|
||
TEXT_TO_STREAM = "你好,我是一个先进的人工智能助理。我的设计目标是理解并生成自然语言,从而能够与人类进行流畅的对话。我可以回答问题、提供信息、撰写文章,甚至进行一些基础的编程任务。这个流式传输演示旨在展示我如何将长篇回复分解成小的数据块,并实时发送给客户端,从而创造出一种更具互动性和即时性的体验。希望这次演示能够清晰地展示我的能力。"
|
||
|
||
# --- 流媒体参数 ---
|
||
# 每个数据块中发送的字符长度范围
|
||
MIN_CHUNK_SIZE = 2
|
||
MAX_CHUNK_SIZE = 8
|
||
# 块之间的延迟(秒)
|
||
STREAM_DELAY = 0.1
|
||
|
||
def on_connect(client, userdata, flags, rc):
|
||
"""客户端连接到代理时的回调。"""
|
||
if rc == 0:
|
||
print("成功连接到 MQTT 代理!")
|
||
else:
|
||
print(f"连接失败, 返回码 {rc}\n")
|
||
sys.exit(1)
|
||
|
||
def on_publish(client, userdata, mid):
|
||
"""消息发布时的回调。"""
|
||
# 默认注释掉,因为可能会产生大量输出
|
||
# print(f"已发布消息,mid: {mid}")
|
||
pass
|
||
|
||
def create_mqtt_client():
|
||
"""创建并配置 MQTT 客户端。"""
|
||
client = mqtt.Client()
|
||
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
|
||
client.on_connect = on_connect
|
||
client.on_publish = on_publish
|
||
return client
|
||
|
||
def stream_text(client, text_to_stream):
|
||
"""将给定的文本以数据块的形式流式传输到 MQTT 主题。"""
|
||
print(f"开始向主题 '{MQTT_TOPIC}' 流式传输文本:")
|
||
print(f"文本: {text_to_stream}")
|
||
|
||
index = 0
|
||
while index < len(text_to_stream):
|
||
# 生成一个随机的块大小
|
||
chunk_size = random.randint(MIN_CHUNK_SIZE, MAX_CHUNK_SIZE)
|
||
# 获取块,确保不会超出文本长度
|
||
chunk = text_to_stream[index:index + chunk_size]
|
||
index += len(chunk)
|
||
|
||
# 创建 JSON 负载
|
||
payload = {
|
||
"action": "stream",
|
||
"text": chunk,
|
||
"lang": "zh" # 假设是中文,如果需要可以更改
|
||
}
|
||
payload_json = json.dumps(payload, ensure_ascii=False)
|
||
|
||
# 发布消息
|
||
result = client.publish(MQTT_TOPIC, payload_json)
|
||
|
||
# 检查发布是否成功
|
||
if result.rc != mqtt.MQTT_ERR_SUCCESS:
|
||
print(f"发送消息失败: {mqtt.error_string(result.rc)}")
|
||
else:
|
||
print(f"已发送块: '{chunk}'")
|
||
|
||
# 等待一小段时间以模拟流式传输
|
||
time.sleep(STREAM_DELAY)
|
||
|
||
print("\n流式传输完成。")
|
||
|
||
def main():
|
||
"""运行 MQTT 流式脚本的主函数。"""
|
||
client = create_mqtt_client()
|
||
|
||
try:
|
||
client.connect(BROKER_IP, BROKER_PORT)
|
||
except Exception as e:
|
||
print(f"连接到代理 {BROKER_IP}:{BROKER_PORT} 时出错。请检查 IP 地址。")
|
||
print(f"错误: {e}")
|
||
sys.exit(1)
|
||
|
||
stream_text(client, TEXT_TO_STREAM)
|
||
|
||
client.disconnect()
|
||
print("已从 MQTT 代理断开。")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|