Files
lzwcai-terminal-temi/test.py
Sucan 3def989a8b feat: 添加权限管理并增强导航功能
- 新增 PermissionManager 用于检查和请求 Temi 机器人权限
- 在 AndroidManifest 中声明所需的权限元数据
- 为 NavController 添加充电功能和改进的巡逻逻辑
- 扩展 MQTT 命令支持,包括充电和可配置巡逻
- 添加 Python 测试脚本用于 MQTT 流式文本测试
- 使用任务状态跟踪替代原有的接待模式标志
2026-03-12 16:37:59 +08:00

101 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import paho.mqtt.client as mqtt
import json
import time
import sys
import random
# --- MQTT 配置 ---
# 重要:请将这里替换为您的 MQTT 代理的 IP 地址
BROKER_IP = "192.168.2.236"
BROKER_PORT = 1883
MQTT_TOPIC = "robot/cmd"
MQTT_USERNAME = "lzwc"
MQTT_PASSWORD = "Lzwc@4187."
# --- 要进行流式传输的文本 ---
# 您可以更改此文本
TEXT_TO_STREAM = "你好,我是一个先进的人工智能助理。我的设计目标是理解并生成自然语言,从而能够与人类进行流畅的对话。我可以回答问题、提供信息、撰写文章,甚至进行一些基础的编程任务。这个流式传输演示旨在展示我如何将长篇回复分解成小的数据块,并实时发送给客户端,从而创造出一种更具互动性和即时性的体验。希望这次演示能够清晰地展示我的能力。"
# --- 流媒体参数 ---
# 每个数据块中发送的字符长度范围
MIN_CHUNK_SIZE = 2
MAX_CHUNK_SIZE = 8
# 块之间的延迟(秒)
STREAM_DELAY = 0.1
def on_connect(client, userdata, flags, rc):
"""客户端连接到代理时的回调。"""
if rc == 0:
print("成功连接到 MQTT 代理!")
else:
print(f"连接失败, 返回码 {rc}\n")
sys.exit(1)
def on_publish(client, userdata, mid):
"""消息发布时的回调。"""
# 默认注释掉,因为可能会产生大量输出
# print(f"已发布消息mid: {mid}")
pass
def create_mqtt_client():
"""创建并配置 MQTT 客户端。"""
client = mqtt.Client()
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect
client.on_publish = on_publish
return client
def stream_text(client, text_to_stream):
"""将给定的文本以数据块的形式流式传输到 MQTT 主题。"""
print(f"开始向主题 '{MQTT_TOPIC}' 流式传输文本:")
print(f"文本: {text_to_stream}")
index = 0
while index < len(text_to_stream):
# 生成一个随机的块大小
chunk_size = random.randint(MIN_CHUNK_SIZE, MAX_CHUNK_SIZE)
# 获取块,确保不会超出文本长度
chunk = text_to_stream[index:index + chunk_size]
index += len(chunk)
# 创建 JSON 负载
payload = {
"action": "stream",
"text": chunk,
"lang": "zh" # 假设是中文,如果需要可以更改
}
payload_json = json.dumps(payload, ensure_ascii=False)
# 发布消息
result = client.publish(MQTT_TOPIC, payload_json)
# 检查发布是否成功
if result.rc != mqtt.MQTT_ERR_SUCCESS:
print(f"发送消息失败: {mqtt.error_string(result.rc)}")
else:
print(f"已发送块: '{chunk}'")
# 等待一小段时间以模拟流式传输
time.sleep(STREAM_DELAY)
print("\n流式传输完成。")
def main():
"""运行 MQTT 流式脚本的主函数。"""
client = create_mqtt_client()
try:
client.connect(BROKER_IP, BROKER_PORT)
except Exception as e:
print(f"连接到代理 {BROKER_IP}:{BROKER_PORT} 时出错。请检查 IP 地址。")
print(f"错误: {e}")
sys.exit(1)
stream_text(client, TEXT_TO_STREAM)
client.disconnect()
print("已从 MQTT 代理断开。")
if __name__ == "__main__":
main()