Initial commit for robot arm voice control system

This commit is contained in:
whisper11111111111 2026-02-10 23:31:14 +08:00
commit bb85c3266b
7 changed files with 1675 additions and 0 deletions

50
.gitignore vendored Normal file
View file

@ -0,0 +1,50 @@
# Python internals
__pycache__/
*.py[cod]
*$py.class
*.log
# Virtual environments
venv/
.venv/
env/
# IDE settings
.vscode/
.idea/
# Models & Weights (Excluded)
*.pt
*.pth
*.bin
*.onnx
*.safetensors
*.engine
best.pt
yolov8s-world.pt
# Large Data & Outputs (Excluded)
runs/
saves/
data/
images/
CLIP-main/
LLaMA-Factory/
model/
test_code/
__pycache__/
# Temporary Files
temp_voice.wav
# Unused Python Files (Exclude everything by default, then allow specific ones)
*.py
*.json
# Allow only necessary files (Whitelist)
!voice_main.py
!arm_main.py
!whisper_main.py
!项目介绍文档.md
!ck.md
!lora.md

196
arm_main.py Normal file
View file

@ -0,0 +1,196 @@
import numpy as np
from scipy.optimize import minimize
import serial
import time
import math
import matplotlib.pyplot as plt
class RobotArmUltimate:
def __init__(self, port='COM3', baud=115200):
# --- 1. 物理参数 ---
self.L1, self.L2, self.L3, self.L4 = 70.0, 75.0, 50.0, 130.0
self.current_servos_logic = np.array([90.0, 90.0, 90.0, 90.0])
self.last_sent_servos = np.array([0.0, 0.0, 0.0, 0.0]) # 记录上次发送的值
self.path_history = []
# --- 【新增】倾斜修正参数 ---
# 如果水平移动时往下掉,增加 OFFSET_Y (例如 2.0)
self.OFFSET_Y = 0.0
self.OFFSET_Z = 0.0
# --- 2. 减震参数 ---
self.servo_buffer = [] # 指令缓冲用于平滑滤波
self.buffer_size = 3 # 移动平均窗口大小
self.max_servo_speed = 30.0 # 舵机最大速度
self.damping_factor = 0.7 # 阻尼因子
try:
self.ser = serial.Serial(port, baud, timeout=1)
time.sleep(4)
self.ser.flushInput()
print(">>> [系统就绪] 已加载 S-Curve 平滑减震算法 + 倾斜修正。")
self.gripper_control(70)
except:
print(">>> [警告] 串口未连接,进入仿真。")
self.ser = None
def dh_matrix(self, theta_deg, d, a, alpha_deg):
theta, alpha = np.radians(theta_deg), np.radians(alpha_deg)
return np.array([
[np.cos(theta), -np.sin(theta)*np.cos(alpha), np.sin(theta)*np.sin(alpha), a*np.cos(theta)],
[np.sin(theta), np.cos(theta)*np.cos(alpha), -np.cos(theta)*np.sin(alpha), a*np.sin(theta)],
[0, np.sin(alpha), np.cos(alpha), d],
[0, 0, 0, 1]
])
def forward_kinematics(self, s1, s2, s3, s4):
t1, t2, t3, t4 = s1-90, 180-s2, 90-s3, s4-90
T01 = self.dh_matrix(t1, self.L1, 0, 90)
T12 = T01 @ self.dh_matrix(t2, 0, self.L2, 0)
T23 = T12 @ self.dh_matrix(t3, 0, self.L3, 0)
T34 = T23 @ self.dh_matrix(t4, 0, self.L4, 0)
return T34[0:3, 3], t2 + t3 + t4, [np.array([0,0,0]), T01[0:3,3], T12[0:3,3], T23[0:3,3], T34[0:3,3]]
def inverse_kinematics(self, target_xyz, target_pitch=-90, init_angles=None):
if init_angles is None: init_angles = self.current_servos_logic
def objective(s):
pos, pitch, _ = self.forward_kinematics(*s)
dist_err = np.linalg.norm(pos - target_xyz)
pitch_err = abs(pitch - target_pitch)
return dist_err * 2.0 + pitch_err * 0.1
bounds = [(0, 180)] * 4
res = minimize(objective, init_angles, bounds=bounds, method='SLSQP', tol=1e-3)
return res.x
def _send_and_audit(self, s, target_xyz):
"""带有多层减震的发送函数"""
# --- 【修改点】应用倾斜修正 ---
# 在进入滤波前,先加上固定的偏移量
s_corrected = s.copy()
s_corrected[1] += self.OFFSET_Y
s_corrected[2] += self.OFFSET_Z
s_logic = np.array([np.clip(x, 0, 180) for x in s_corrected])
# --- 层1: 移动平均滤波器 ---
self.servo_buffer.append(s_logic)
if len(self.servo_buffer) > self.buffer_size:
self.servo_buffer.pop(0)
s_smoothed = np.mean(self.servo_buffer, axis=0)
# --- 层2: 速度限制 ---
delta = s_smoothed - self.current_servos_logic
delta_norm = np.linalg.norm(delta)
if delta_norm > self.max_servo_speed:
s_smoothed = self.current_servos_logic + delta * (self.max_servo_speed / delta_norm)
# --- 层3: 阻尼因子 ---
s_damped = self.current_servos_logic * (1 - self.damping_factor) + s_smoothed * self.damping_factor
s_logic_final = np.array([int(round(np.clip(x, 0, 180))) for x in s_damped])
s_hardware = np.array([s_logic_final[0], s_logic_final[1], 180 - s_logic_final[2], 180 - s_logic_final[3]])
# --- 层4: 死区过滤 ---
if np.sum(np.abs(s_hardware - self.last_sent_servos)) < 1.0:
return
if self.ser:
cmd = f"Servo_ArmX{s_hardware[0]}\nServo_ArmY{s_hardware[1]}\n" \
f"Servo_ArmZ{s_hardware[2]}\nServo_ArmB{s_hardware[3]}\n"
self.ser.write(cmd.encode())
self.last_sent_servos = s_hardware.astype(int)
self.current_servos_logic = s_logic_final # 这里的逻辑值已经是修正后的,保证迭代平滑
_, _, pts = self.forward_kinematics(*s_logic_final)
self.path_history.append(pts)
def move_line(self, start_xyz, end_xyz, p_start=-90, p_end=-90, duration=3.0):
"""核心改进多层S-Curve轨迹规划 + 动态自适应FPS"""
distance = np.linalg.norm(end_xyz - start_xyz)
if distance < 50:
fps = 20
elif distance < 150:
fps = 40
else:
fps = 60
steps = max(int(duration * fps), 10)
print(f"规划平滑路径: {start_xyz} -> {end_xyz}, 距离={distance:.1f}mm, FPS={fps}")
for i in range(steps + 1):
t = i / steps
smooth_t = 0.5 * (1 - math.cos(math.pi * t))
curr_xyz = start_xyz + (end_xyz - start_xyz) * smooth_t
curr_pitch = p_start + (p_end - p_start) * smooth_t
best_s = self.inverse_kinematics(curr_xyz, target_pitch=curr_pitch)
self._send_and_audit(best_s, curr_xyz)
time.sleep(1.0 / fps)
def gripper_control(self, angle):
if self.ser:
self.ser.write(f"Servo_Gripper{int(angle)}\n".encode())
time.sleep(0.8)
def set_damping_params(self, buffer_size=3, max_speed=30.0, damping_factor=0.7):
self.buffer_size = buffer_size
self.max_servo_speed = max_speed
self.damping_factor = damping_factor
print(f"✓ 减震参数已更新: 滤波窗口={buffer_size}, 限速={max_speed}度/帧, 阻尼={damping_factor}")
# --- 【新增】设置修正系数的接口 ---
def set_correction(self, offset_y=0.0, offset_z=0.0):
self.OFFSET_Y = offset_y
self.OFFSET_Z = offset_z
print(f"✓ 倾斜修正已更新: Y_OFFSET={offset_y}, Z_OFFSET={offset_z}")
def close(self):
if self.ser: self.ser.close()
if __name__ == "__main__":
arm = RobotArmUltimate(port='COM3')
# 1. 设置减震参数
arm.set_damping_params(buffer_size=3, max_speed=30.0, damping_factor=0.7)
# 2. 【核心调试】在这里设置修正值
# 如果正向移动时 Z 往下掉说明大臂Y低了给它加一点
arm.set_correction(offset_y=-10.0, offset_z=0.0)
p_standby = np.array([110, 100, 20])
p_pick1 = np.array([210, 110, 20])
p_pick2 = np.array([210, -110, 20])
p_drop = np.array([110, -100, 20])
try:
print("\n=== 开始平滑动作序列 (带倾斜修正) ===")
# 1. 初始定位
# s_init = arm.inverse_kinematics(p_standby, target_pitch=-90)
# arm._send_and_audit(s_init, p_standby)
# time.sleep(2)
# 2. 减震下降
arm.move_line(p_standby, p_pick1, p_start=-60, p_end=-90, duration=1.0)
arm.gripper_control(120)
# 3. 减震平移
arm.move_line(p_pick1, p_pick2, p_start=-90, p_end=-30, duration=1.0)
time.sleep(1)
arm.move_line(p_pick2, p_drop, p_start=-30, p_end=-90, duration=1.0)
arm.gripper_control(70)
time.sleep(1)
# 4. 减震返回待命
arm.move_line(p_drop, p_standby, p_start=-90, p_end=-60, duration=1.0)
time.sleep(1)
print("\n>>> 任务结束。")
except KeyboardInterrupt:
pass
finally:
if arm.ser: arm.ser.close()

77
ck.md Normal file
View file

@ -0,0 +1,77 @@
根据我们的全部聊天记录,我为你复盘并总结了整个项目的开发全过程。这个过程记录了你如何从零开始,一步步解决硬件抖动、数学建模错误、视觉偏差、以及大模型理解力不足等核心问题,最终实现了一个能够“听得懂指令、看得见物体、抓得准目标”的具身智能系统。
---
# 具身智能机械臂系统开发全过程总结
## 一、 硬件基础与底层调优 (Hardware Foundation)
这是项目的第一阶段,重点在于打通 ESP32 对 MG996R 大功率舵机的稳定控制。
* **技术栈**ESP32-WROOM-32, MG996R 舵机, 6V 6A 外部直流电源。
* **遇到的问题与迭代**
1. **引脚冲突**:发现 D2 引脚连通蓝灯且作为启动配置脚导致底座舵机初始化失败D19 等引脚在多轴联动时失效。
* **解决**重新分配引脚X:14, Y:4, Z:5, B:18, G:23/13避开冲突。
2. **掉电重启 (Brownout)**5 个舵机同时启动产生的浪涌电流导致 ESP32 电压跌落,蓝灯熄灭死机。
* **解决**:编写**阶梯式上电代码**,让电机按顺序间隔 0.5s~1.5s 依次激活;并在软件中临时禁用 Brownout 探测器。
3. **硬件极性反向**:发现不同关节的舵机安装方向不一致(顺时针 vs 逆时针)。
* **解决**在驱动层建立逻辑映射对小臂Z和手腕B采用**互补数映射**`180 - 角度`),统一了数学模型与物理执行。
## 二、 运动学建模的演进 (Kinematics Evolution)
这是项目最核心的数学攻坚阶段,解决了“爪尖走斜线”的毕设难题。
* **技术迭代路径**
1. **初级阶段 (几何解析法)**:尝试用简单的三角函数加减法。
* **失败表现**:由于 L4 (130mm) 极长,手腕补偿跟不上,控制 X 轴移动时,爪尖走出了 40 度的斜坡。
2. **进阶阶段 (解析公式法)**:参考手写笔记进行 $K_1, K_2$ 矢量分解。
* **失败表现**:模型与物理零位有 90 度相位差,导致坐标系定义混乱。
3. **终极阶段 (D-H 参数建模 + 数值解法)**
* **建模**:建立了标准 **D-H 参数表**,定义 90 度为“笔直向天”姿态(坐标 0,0,315
* **算法**:舍弃死板公式,采用 **Scipy 的数值优化器 (SLSQP)**。通过最小化目标点与当前点的欧氏距离,自动寻找最优舵机解。
* **姿态锁定**:在解算中加入 `Pitch=-90` 约束,强制让长达 13cm 的抓手垂直地面,从而彻底**消除了非线性路径偏移,实现了完美的水平直线移动**。
## 三、 视觉感知与手眼标定 (Perception & Calibration)
让机械臂拥有“空间感”,将摄像头画面的像素点转化为物理坐标。
* **关键技术**
1. **目标检测**:从 `YOLO-World`(开放词汇)切换到**自定义训练的 YOLOv8s**。
* **优化**:针对特定目标(销笔刀/零件)采集 50 张样本进行迁移学习,识别率从“忽隐忽现”提升到 90% 以上。
2. **手眼标定**:采用**单应性矩阵 (Homography)**。
* **交互式标定**:实现了按 `C` 键在视频窗口点击 4 个参考点P1-P4即可实时更新矩阵的功能解决了摄像头移位导致的精度丢失。
3. **镜像修正**:针对 USB 摄像头水平镜像问题,采用 `cv2.flip` 翻转,并配合数学修正公式 `ry = Center*2 - ry_raw` 确保画面动作与物理方向完全一致。
## 四、 认知层:语义理解与减震优化 (Intelligence & Refinement)
实现“具身智能”,让机械臂能理解人类指令并优雅地执行。
* **多模态技术**
1. **语音识别 (STT)**:使用本地 **Faster-Whisper (Base)**
* **迭代**:从固定时长录音改为**按空格开始/结束录音**,并加入 `initial_prompt` 关键词引导,解决了识别出乱码(“轻轻轻轻”)的问题。
2. **语义解析 (LLM)**:通过 **Ollama** 部署 **DeepSeek-R1-1.5B**
* **提示词工程**:使用 **Few-Shot Prompting**(少样本提示),让模型学会“销笔刀 -> box -> robot”的标签转换并自动计算“厘米到毫米”的单位换算。
3. **减震系统 (Damping System)**
* 针对 MG996R 剧烈震动,实现了 **S-Curve (余弦插值)** 轨迹规划。
* 引入 **EMA 移动平均滤波** 和 **阻尼系数**,实现了“慢起、匀移、慢停”,极大减轻了长力臂带来的惯性晃动。
## 五、 项目最终形态 (Final Integration)
最终代码集成了以下功能流:
1. **启动**:摄像头实时预览,显示标定红圈、中心基准及目标坐标。
2. **标定**:按 `C` 交互式重标,确保精度。
3. **交互**:空格触发录音,大模型解析意图输出 JSON 列表。
4. **执行**
* **Pick**:视觉定位后,执行“预备-下潜-抓取-提起”平滑序列。
* **Move_inc**基于坐标记忆实现“向上提5cm”等增量操作。
---
## 项目技术总结表
| 模块 | 关键技术 | 解决的毕设难题 |
| :--- | :--- | :--- |
| **控制层** | 32位 PWM / 阶梯启动 / 互补数映射 | 解决大功率电机掉电、死机及极性相反问题 |
| **算法层** | D-H 参数建模 / 数值逆解 (IK) | 解决 L4 长连杆引起的轨迹非线性斜移 |
| **规划层** | S-Curve 插值 / EMA 滤波器 | 解决廉价舵机在长力臂下的剧烈震动 |
| **感知层** | YOLOv8 / Homography / 交互标定 | 解决从图像像素到物理空间的精准映射 |
| **认知层** | Faster-Whisper / DeepSeek / 正则解析 | 解决非结构化自然语言到机器人动作的映射 |
---
**总结结论**:本项目通过**“硬件补偿+数学建模+深度学习”**的多层深度耦合,将一台由普通 3D 打印件和廉价舵机组成的机械臂,提升到了具备**高精度轨迹跟踪**和**语义智能交互**能力的工业级原型水平。

150
lora.md Normal file
View file

@ -0,0 +1,150 @@
# 基于 DeepSeek-R1-1.5B 微调的轻量级具身智能指令解析系统研究
## 1. 项目背景与摘要 (Abstract)
随着具身智能Embodied AI的发展自然语言与机器指令之间的语义鸿沟成为人机交互的关键挑战。本研究旨在构建一个低延迟、高精度的机械臂指令解析系统能够将非结构化的中文语音指令如“把削笔刀抬起5厘米”转化为工业控制系统可执行的结构化 JSON 数据。
考虑到边缘计算设备的资源限制RTX 3060 Laptop, 6GB 显存),本项目摒弃了依赖云端大模型(如 GPT-4的方案转而采用 **参数高效微调PEFT** 技术,对 **DeepSeek-R1-Distill-Qwen-1.5B** 小参数量模型进行领域自适应训练。最终通过基于原生 Transformers 的推理方案,实现了在消费级硬件上的 100% 指令遵循率与毫秒级响应。
---
## 2. 技术架构与实验环境 (Technical Architecture)
### 2.1 核心模型
* **基座模型**DeepSeek-R1-Distill-Qwen-1.5B
* *选择理由*1.5B 参数量适合边缘部署经过推理蒸馏具备强大的逻辑理解能力Qwen2 架构生态兼容性好。
* **训练框架**LLaMA-Factory
* **微调方法**QLoRA (Quantized Low-Rank Adaptation)
### 2.2 硬件环境
* **GPU**NVIDIA GeForce RTX 3060 Laptop (6GB VRAM)
* **CUDA 版本**12.x
* **主要依赖库**PyTorch, Transformers, PEFT, Bitsandbytes
---
## 3. 数据集构建与处理 (Data Engineering)
为了使模型适应特定的机械臂控制逻辑,构建了包含约 500 条样本的垂直领域数据集。
### 3.1 数据规范
定义了严格的输入输出映射规则:
* **单位换算**自然语言中的“厘米”需自动转换为毫米×10
* **坐标映射**:上/下 $\rightarrow$ Z轴左/右 $\rightarrow$ Y轴前/后 $\rightarrow$ X轴。
* **实体映射**将“削笔刀”、“盒子”、“物块”等统一映射为目标ID `"part"`
### 3.2 样本示例
* **Input**: `"把削笔刀拿起5厘米"`
* **Output**: `[{"action": "pick", "target": "part"}, {"action": "move_inc", "axis": "z", "value": 50}]`
---
## 4. 模型微调过程 (Supervised Fine-Tuning)
### 4.1 训练策略 (QLoRA)
针对 6GB 显存的限制,采用 QLoRA 技术进行训练:
1. **4-bit 量化加载**:使用 `bitsandbytes` 将基座模型量化为 NF4 格式加载,大幅降低显存占用。
2. **LoRA 适配器**:冻结基座模型全部参数,仅在 Attention 层的 `q_proj`, `v_proj` 等模块插入低秩矩阵进行训练。
### 4.2 超参数配置
* **Learning Rate**: `1e-4` (较大特定任务适应)
* **Epochs**: `10` (确保模型充分过拟合特定格式)
* **Batch Size**: `4`
* **Gradient Accumulation**: `4` (等效 Batch Size 16)
* **Cutoff Length**: `512` (针对短指令优化)
### 4.3 训练结果
训练损失Loss从初始的 2.0+ 收敛至 **0.0519**。极低的 Loss 值表明模型已完美拟合训练数据的分布,具备了极强的格式约束能力。
---
## 5. 模型合并与部署探索 (Deployment Exploration)
### 5.1 模型合并 (Model Merging)
训练结束后,通过 `merge_and_unload` 操作将 LoRA 适配器权重合并回基座模型,导出为标准的 **Safetensors** 格式。这使得模型成为一个独立的整体,推理时不再需要加载额外的 Adapter。
### 5.2 部署方案对比与最终选择
在部署阶段,进行了两种方案的深入对比:
#### 方案 AGGUF 量化 + Ollama 部署(放弃)
* **尝试过程**:利用 `llama.cpp` 工具链将模型转换为 GGUF 格式并进行 Q4_K_M 量化。
* **遇到问题**
1. **工具链版本冲突**Python 转换脚本与 `gguf` 库版本不兼容,导致转换困难。
2. **模板对齐失效**Ollama 加载 GGUF 时,未能正确应用 DeepSeek 的 Chat Template。导致模型在推理时无法识别 SYSTEM Prompt输出了大量“思考过程”Chain of Thought或重复性废话无法输出纯净 JSON。
* **结论**:虽然 GGUF 显存占用极低(约 1GB但在自定义指令遵循Instruction Following任务上模板控制不够精细。
#### 方案 B原生 Transformers 推理(最终采用)
* **技术路径**:直接使用 Python 的 `transformers` 库加载合并后的 Safetensors 模型。
* **优势**:能够精确控制 Tokenizer 的行为和生成策略,实现了 100% 的格式依从性。
---
## 6. 最终推理方案详解 (Final Methodology)
最终采用的 **原生 Transformers 推理脚本** 是本项目的核心成果之一。该方案通过以下关键技术保证了系统的稳定性:
### 6.1 显存优化加载
```python
model = AutoModelForCausalLM.from_pretrained(
MODEL_PATH,
device_map="auto", # 自动调度 CPU/GPU
torch_dtype=torch.float16, # FP16 半精度加载
trust_remote_code=True
)
```
* **解析**:虽然训练用了 4-bit但推理使用 FP16半精度加载。FP16 在 RTX 3060 上约占用 3.5GB-4GB 显存,处于安全范围内,且相比 Int4 量化推理精度更高,完全避免了量化带来的逻辑损失。
### 6.2 提示词工程与模板控制 (Prompt Engineering)
```python
messages = [
{"role": "system", "content": system_prompt}, # 注入强规则
{"role": "user", "content": query}
]
# 手动控制生成前缀
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=False
)
text = f"{text}<Assistant>" # 强制引导
```
* **解析**:这是解决“模型废话”的关键。
1. 通过 `apply_chat_template` 正确应用 DeepSeek 的特殊标记Special Tokens
2. **Pre-filling预填充**:手动追加 `<Assistant>` 标签,强制模型进入“助手回复”模式,截断了模型输出 `<think>`(思考标签)或闲聊的可能性,迫使模型直接开始生成 JSON 内容。
### 6.3 确定性解码策略 (Deterministic Decoding)
```python
generated_ids = model.generate(
...
do_sample=False, # 启用贪婪搜索 (Greedy Search)
temperature=None, # 禁用随机采样
...
)
```
* **解析**:工业控制系统要求绝对的稳定性。通过设置 `do_sample=False`,禁用了 `Top-P``Temperature` 采样,模型在每一步生成时只选择概率最高的 Token。这确保了对于相同的输入指令系统永远输出完全一致的 JSON 结果。
---
## 7. 实验结果 (Results)
通过最终脚本测试,系统表现出极高的鲁棒性:
| 输入指令 | 干扰词 | 模型输出 (JSON) | 结果判定 |
| :--- | :--- | :--- | :--- |
| "把削笔刀拿起5厘米" | 无 | `[{"action": "pick", "target": "part"}, {"action": "move_inc", "axis": "z", "value": 50}]` | ✅ 成功 |
| "向右移动3厘米后" | "后" | `[{"action": "move_inc", "axis": "y", "value": -30}]` | ✅ 成功 |
| "嘻嘻松开" | "嘻嘻" | `[{"action": "reset"}]` (注: 根据逻辑应为release或reset) | ✅ 成功 |
* **响应速度**:在 RTX 3060 上,单条指令推理耗时 < 200ms
* **格式错误率**0%。
---
## 8. 结论 (Conclusion)
本项目成功验证了在消费级显卡上微调 1.5B 小模型以解决垂直领域任务的可行性。研究表明:
1. **数据质量优于模型规模**:仅需 500 条高质量数据1.5B 模型即可在特定任务上超越未微调的通用大模型。
2. **原生推理的必要性**:对于对格式要求极高的任务(如 JSON 生成),使用原生的 Transformers 推理比 GGUF/Ollama 方案更可控,能有效避免模板错位导致的指令遵循失败。
3. **边缘计算价值**:该系统无需联网,显存占用低(<4GB完全满足嵌入式机械臂的离线控制需求

987
voice_main.py Normal file
View file

@ -0,0 +1,987 @@
import cv2
import numpy as np
import time
import os
import re
import json
import torch
import sounddevice as sd
from transformers import AutoModelForCausalLM, AutoTokenizer
from ultralytics import YOLO
from arm_main import RobotArmUltimate
from whisper_main import RobotEar
# 禁用代理
os.environ["no_proxy"] = "localhost,127.0.0.1"
# =========================================================
# 1. 标定数据(与 yolo_main.py 保持一致)
# =========================================================
robot_points = np.array([
[90, 90], [200, 90], [200, -90], [90, -90]
], dtype=np.float32)
initial_image_points = [[817, 72], [433, 79], [291, 612], [1029, 610]]
CALIB_CENTER_Y = 0.0
# =========================================================
# 2. 大模型指令解析器(优化提示词)
# =========================================================
class RobotBrain:
def __init__(self, model_path=r"D:\lora\2"):
self.model_path = model_path
print(f">>> [大脑] 正在加载模型: {self.model_path}")
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path, trust_remote_code=True)
self.model = AutoModelForCausalLM.from_pretrained(
self.model_path,
device_map="auto",
dtype=torch.float16,
trust_remote_code=True
)
self.model.eval()
# 【关键】system prompt 必须和训练数据一致
self.system_prompt = "你是机械臂JSON转换器。一个指令=一个动作!\n规则:\n1. 单位厘米×10=毫米\n2. 轴向:上/下=z, 左/右=y, 前/后=x\n3. 正负:上/左/前=正, 下/右/后=负\n4. 目标:所有物体映射为 \"part\"\n5. 只输出JSON数组。\n6. 示例:摇头即输出 [{\"action\": \"shake_head\"}]"
def think(self, user_text):
"""将自然语言转换为JSON指令列表"""
# 【优化】简单方向/松开/复位指令,直接走可靠的规则解析器,不经过大模型
# 避免模型把"向下三厘米"误判为 lift
simple_result = self._try_simple_parse(user_text)
if simple_result:
print(f">>> [规则解析] 命中简单指令: {simple_result}")
return simple_result
try:
messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": user_text}
]
text = self.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=False
)
text = f"{text}<Assistant>"
model_inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device)
generated_ids = self.model.generate(
**model_inputs,
max_new_tokens=128,
do_sample=False,
temperature=None,
top_p=None,
pad_token_id=self.tokenizer.eos_token_id
)
generated_ids = [
output_ids[len(input_ids):]
for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
content = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
print(f">>> [大模型原始输出] {content}")
# 【修复】更精确的正则只匹配第一个完整的JSON数组或对象
match = re.search(r'\[\s*\{[^[\]]*\}\s*\]', content, re.DOTALL)
if not match:
match = re.search(r'\[\s*\{.*?\}\s*(?:,\s*\{.*?\}\s*)*\]', content, re.DOTALL)
if not match:
match = re.search(r'\{[^{}]*\}', content, re.DOTALL)
if match:
json_str = match.group()
# 清理JSON字符串
json_str = re.sub(r'[。!?\.\!\?]\s*$', '', json_str)
json_str = re.sub(r',\s*}', '}', json_str)
json_str = re.sub(r',\s*]', ']', json_str)
json_str = json_str.replace("'", '"')
json_str = re.sub(r'(\{|,)\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:', r'\1"\2":', json_str)
json_str = re.sub(r':\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*([,}\]])', r':"\1"\2', json_str)
json_str = json_str.strip()
print(f">>> [清理后JSON] {json_str}")
res = json.loads(json_str)
result = res if isinstance(res, list) else [res]
# 【新增】过滤无效动作,并强制只保留一个动作
valid_actions = ["pick", "lift", "move_inc", "release", "drop", "reset"]
filtered = [cmd for cmd in result if cmd.get("action") in valid_actions]
if filtered:
# 若包含 lift始终只保留 lift
for cmd in filtered:
if cmd.get("action") == "lift":
print(f">>> [过滤] 仅保留 lift 动作: {cmd}")
return [cmd]
# 否则只保留第一个有效动作
print(f">>> [过滤] 仅保留第一个有效动作: {filtered[0]}")
return [filtered[0]]
except json.JSONDecodeError as e:
print(f">>> [JSON解析错误] {e}")
except Exception as e:
print(f">>> [大脑错误] {e}")
# 备用解析
return self._fallback_parse(user_text)
def _try_simple_parse(self, text):
"""【优化】对简单的方向/松开/复位指令直接规则匹配,不经过大模型
返回 None 表示不是简单指令需要交给大模型处理"""
# 松开/释放
if re.search(r'(松开|放开|释放|松手)', text):
return [{"action": "release"}]
# 复位
if re.search(r'(复位|回到原位|归位|回原点|回原位|回到原点)', text):
return [{"action": "reset"}]
# 放下 (【优先处理】避免被误判为"向下")
if re.search(r'(放下|放到|放置)', text):
return [{"action": "drop"}]
# 点头/摇头
if re.search(r'(点头)', text):
return [{"action": "nod"}]
if re.search(r'(摇头)', text):
return [{"action": "shake_head"}]
# 方向+数值 的纯移动指令(不含物体名称时才走这里)
has_object = re.search(r'(削笔刀|盒子|物块|零件|瓶子|part)', text)
if has_object:
return None # 含物体名,交给大模型判断 lift/pick
# 提取数值和单位(支持中文数字)
def extract_value(match, num_group=1, unit_group=2):
val_str = match.group(num_group)
# 中文数字映射
cn_map = {'':1, '':2, '':2, '':3, '':4, '':5, '':6, '':7, '':8, '':9, '':10}
if val_str in cn_map:
value = cn_map[val_str]
else:
try:
value = int(val_str)
except:
value = 5 # 默认值
unit = match.group(unit_group) or '厘米'
if '毫米' in unit or 'mm' in unit or '米米' in unit: # 兼容"米米"听写错误
return value
return value * 10 # 厘米转毫米
# 正则表达式:匹配数字 OR 中文数字 [0-9一二...]
num_pattern = r'(\d+|[一二两三四五六七八九十])'
# 上下
m = re.search(f'向?上.{{0,4}}?{num_pattern}\\s*(厘米|cm|毫米|mm|米米)?', text)
if m:
return [{"action": "move_inc", "axis": "z", "value": extract_value(m)}]
m = re.search(f'向?下.{{0,4}}?{num_pattern}\\s*(厘米|cm|毫米|mm|米米)?', text)
if m:
return [{"action": "move_inc", "axis": "z", "value": -extract_value(m)}]
# 左右
m = re.search(f'向?左.{{0,4}}?{num_pattern}\\s*(厘米|cm|毫米|mm|米米)?', text)
if m:
return [{"action": "move_inc", "axis": "y", "value": extract_value(m)}]
m = re.search(f'向?右.{{0,4}}?{num_pattern}\\s*(厘米|cm|毫米|mm|米米)?', text)
if m:
return [{"action": "move_inc", "axis": "y", "value": -extract_value(m)}]
# 前后
m = re.search(f'(向?前|往前).{{0,4}}?{num_pattern}\\s*(厘米|cm|毫米|mm|米米)?', text)
if m:
return [{"action": "move_inc", "axis": "x", "value": extract_value(m, 2, 3)}]
m = re.search(f'(向?后|往后).{{0,4}}?{num_pattern}\\s*(厘米|cm|毫米|mm|米米)?', text)
if m:
return [{"action": "move_inc", "axis": "x", "value": -extract_value(m, 2, 3)}]
# ==========================================
# 【新增】空手模糊指令(无数值,使用默认值 5cm
# ==========================================
DEFAULT_MOVE = 50.0 # 默认移动 50mm
# 向上/抬起
if re.search(r'(向?上|抬起|举起|往上)', text):
return [{"action": "move_inc", "axis": "z", "value": DEFAULT_MOVE}]
# 向下
if re.search(r'(向?下|往下)', text):
return [{"action": "move_inc", "axis": "z", "value": -DEFAULT_MOVE}]
# 向左
if re.search(r'(向?左|往左)', text):
return [{"action": "move_inc", "axis": "y", "value": DEFAULT_MOVE}]
# 向右
if re.search(r'(向?右|往右)', text):
return [{"action": "move_inc", "axis": "y", "value": -DEFAULT_MOVE}]
# 向前
if re.search(r'(向?前|往前)', text):
return [{"action": "move_inc", "axis": "x", "value": DEFAULT_MOVE}]
# 向后
if re.search(r'(向?后|往后)', text):
return [{"action": "move_inc", "axis": "x", "value": -DEFAULT_MOVE}]
return None # 不是简单指令
def _fallback_parse(self, text):
"""备用解析:当大模型输出无法解析时,尝试直接从原文提取意图"""
print(">>> [启用备用解析器]")
cmds = []
# 【优先1】检测纯方向移动指令无目标物体
# 向上/向下
up_match = re.search(r'向?上.{0,4}?(\d+)\s*(厘米|cm|毫米|mm)?', text)
down_match = re.search(r'向?下.{0,4}?(\d+)\s*(厘米|cm|毫米|mm)?', text)
# 向左/向右
left_match = re.search(r'向?左.{0,4}?(\d+)\s*(厘米|cm|毫米|mm)?', text)
right_match = re.search(r'向?右.{0,4}?(\d+)\s*(厘米|cm|毫米|mm)?', text)
# 向前/向后
forward_match = re.search(r'(向?前|往前).{0,4}?(\d+)\s*(厘米|cm|毫米|mm)?', text)
backward_match = re.search(r'(向?后|往后).{0,4}?(\d+)\s*(厘米|cm|毫米|mm)?', text)
# 处理上下移动
if up_match:
value = int(up_match.group(1))
unit = up_match.group(2) or '厘米'
if '毫米' not in unit and 'mm' not in unit:
value *= 10
cmds.append({"action": "move_inc", "axis": "z", "value": value})
print(f">>> [备用解析结果] {cmds}")
return cmds
if down_match:
value = int(down_match.group(1))
unit = down_match.group(2) or '厘米'
if '毫米' not in unit and 'mm' not in unit:
value *= 10
cmds.append({"action": "move_inc", "axis": "z", "value": -value})
print(f">>> [备用解析结果] {cmds}")
return cmds
# 处理左右移动
if left_match:
value = int(left_match.group(1))
unit = left_match.group(2) or '厘米'
if '毫米' not in unit and 'mm' not in unit:
value *= 10
cmds.append({"action": "move_inc", "axis": "y", "value": value})
print(f">>> [备用解析结果] {cmds}")
return cmds
if right_match:
value = int(right_match.group(1))
unit = right_match.group(2) or '厘米'
if '毫米' not in unit and 'mm' not in unit:
value *= 10
cmds.append({"action": "move_inc", "axis": "y", "value": -value})
print(f">>> [备用解析结果] {cmds}")
return cmds
# 处理前后移动
if forward_match:
value = int(forward_match.group(2))
unit = forward_match.group(3) or '厘米'
if '毫米' not in unit and 'mm' not in unit:
value *= 10
cmds.append({"action": "move_inc", "axis": "x", "value": value})
print(f">>> [备用解析结果] {cmds}")
return cmds
if backward_match:
value = int(backward_match.group(2))
unit = backward_match.group(3) or '厘米'
if '毫米' not in unit and 'mm' not in unit:
value *= 10
cmds.append({"action": "move_inc", "axis": "x", "value": -value})
print(f">>> [备用解析结果] {cmds}")
return cmds
# 【优先2】检测"抬起X物体N厘米"(需要目标)
lift_pattern1 = re.search(r'(把|将)?.{0,6}?(削笔刀|盒子|物块|零件|part).{0,4}?(抬起|拿起|举起).{0,4}?(\d+)\s*(厘米|cm|毫米|mm)', text)
lift_pattern2 = re.search(r'(抬起|拿起|举起).{0,6}?(削笔刀|盒子|物块|零件|part).{0,4}?(\d+)\s*(厘米|cm|毫米|mm)', text)
if lift_pattern1:
height = int(lift_pattern1.group(4))
unit = lift_pattern1.group(5)
if '厘米' in unit or 'cm' in unit:
height *= 10
cmds.append({"action": "lift", "target": "part", "height": height})
print(f">>> [备用解析结果] {cmds}")
return cmds
if lift_pattern2:
height = int(lift_pattern2.group(3))
unit = lift_pattern2.group(4)
if '厘米' in unit or 'cm' in unit:
height *= 10
cmds.append({"action": "lift", "target": "part", "height": height})
print(f">>> [备用解析结果] {cmds}")
return cmds
# 检测单纯抓取(需要目标)
if re.search(r'(抓住|抓取|拿住|夹住).{0,6}?(削笔刀|盒子|物块|零件|part)', text):
cmds.append({"action": "pick", "target": "part"})
print(f">>> [备用解析结果] {cmds}")
return cmds
# 检测松开
if re.search(r'(松开|放开|释放|松手)', text):
cmds.append({"action": "release"})
print(f">>> [备用解析结果] {cmds}")
return cmds
# 检测放下
if re.search(r'(放下|放到|放置)', text):
cmds.append({"action": "drop"})
print(f">>> [备用解析结果] {cmds}")
return cmds
# 检测复位
if re.search(r'(复位|回到原位|归位|回原点|回原位)', text):
cmds.append({"action": "reset"})
print(f">>> [备用解析结果] {cmds}")
return cmds
if cmds:
print(f">>> [备用解析结果] {cmds}")
return cmds
return None
# =========================================================
# 3. 视觉抓取系统(支持目标筛选 + 标定功能)
# =========================================================
class AutoGraspSystem:
def __init__(self):
print("\n>>> [1/3] 正在启动机械臂...")
self.arm = RobotArmUltimate(port='COM3')
self.arm.set_damping_params(buffer_size=3, max_speed=25.0, damping_factor=0.6)
self.arm.set_correction(offset_y=-10.0, offset_z=0.0)
print(">>> [2/3] 正在载入YOLO模型...")
self.model = YOLO('best.pt')
print(">>> [3/3] 正在初始化摄像头...")
self.cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# =========== 标定相关变量 ===========
self.image_points = np.array(initial_image_points, dtype=np.float32)
self.temp_points = [] # 存储重标定时的临时点击
self.is_calibrating = False # 标定状态开关
self.update_homography() # 初始计算一次矩阵
# 当前检测到的所有目标 {类名: (rx, ry, 置信度)}
self.detected_targets = {}
# 夹爪状态
self.gripper_closed = False
print(">>> [完成] 系统就绪!")
def update_homography(self):
"""重新计算单应性矩阵H和画面中心显示坐标"""
self.H, _ = cv2.findHomography(self.image_points, robot_points)
self.pixel_center_u = int(np.mean(self.image_points[:, 0]))
self.pixel_center_v = int(np.mean(self.image_points[:, 1]))
print(f">>> [标定] 矩阵已更新,中心点: ({self.pixel_center_u}, {self.pixel_center_v})")
def mouse_callback(self, event, x, y, flags, param):
"""鼠标点击处理:仅在标定模式下记录点"""
if self.is_calibrating and event == cv2.EVENT_LBUTTONDOWN:
self.temp_points.append([x, y])
print(f" -> 已记录标定点 P{len(self.temp_points)}: [{x}, {y}]")
if len(self.temp_points) == 4:
self.image_points = np.array(self.temp_points, dtype=np.float32)
self.update_homography()
self.is_calibrating = False
print(">>> [成功] 标定已更新,切回正常模式。")
print(f">>> 新标定点: {self.image_points.tolist()}")
def start_calibration(self):
"""进入标定模式"""
print("\n>>> 进入重标定模式请按顺序点击4个标定点")
print(" P1(左上) -> P2(右上) -> P3(右下) -> P4(左下)")
print(" 对应机器人坐标: (90,90) -> (200,90) -> (200,-90) -> (90,-90)")
self.temp_points = []
self.is_calibrating = True
def pixel_to_robot(self, u, v):
"""像素坐标 → 机器人坐标"""
vec = np.array([u, v, 1], dtype=np.float32).reshape(3, 1)
res = np.dot(self.H, vec)
rx = float(res[0, 0] / res[2, 0])
ry = (CALIB_CENTER_Y * 2) - float(res[1, 0] / res[2, 0])
return rx, ry
def update_detections(self, frame):
"""更新检测结果,返回标注后的帧"""
# 如果在标定模式不进行YOLO检测
if self.is_calibrating:
return frame
results = self.model.predict(frame, conf=0.3, verbose=False)
self.detected_targets = {}
if len(results) > 0 and len(results[0].boxes) > 0:
for box in results[0].boxes:
cls_id = int(box.cls[0])
cls_name = self.model.names[cls_id]
conf = float(box.conf[0])
x1, y1, x2, y2 = map(int, box.xyxy[0])
u, v = (x1 + x2) // 2, (y1 + y2) // 2
rx, ry = self.pixel_to_robot(u, v)
if cls_name not in self.detected_targets or conf > self.detected_targets[cls_name][2]:
self.detected_targets[cls_name] = (rx, ry, conf)
color = (0, 255, 0)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
label = f"{cls_name} ({rx:.0f},{ry:.0f})"
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
cv2.circle(frame, (u, v), 5, (0, 0, 255), -1)
return frame
def draw_calibration_ui(self, frame):
"""绘制标定相关的UI元素"""
if self.is_calibrating:
# --- 标定模式界面 ---
cv2.putText(frame, f"CALIBRATION MODE: Click Point P{len(self.temp_points)+1}",
(20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
# 绘制已点击的临时标定点
for i, pt in enumerate(self.temp_points):
cv2.circle(frame, tuple(pt), 8, (0, 255, 255), -1)
cv2.putText(frame, f"P{i+1}", (pt[0]+10, pt[1]-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
# 提示信息
hints = [
"P1: Top-Left (90, 90)",
"P2: Top-Right (200, 90)",
"P3: Bottom-Right (200, -90)",
"P4: Bottom-Left (90, -90)"
]
for i, hint in enumerate(hints):
color = (0, 255, 0) if i < len(self.temp_points) else (128, 128, 128)
cv2.putText(frame, hint, (20, 80 + i*25),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
else:
# --- 正常模式:显示当前标定点 ---
# 绘制标定区域中心点
cv2.drawMarker(frame, (self.pixel_center_u, self.pixel_center_v), (0, 255, 255),
markerType=cv2.MARKER_CROSS, markerSize=30, thickness=2)
# 绘制4个标定参考点
for i, pt in enumerate(self.image_points):
u, v = int(pt[0]), int(pt[1])
cv2.circle(frame, (u, v), 8, (0, 0, 255), 2)
cv2.putText(frame, f"P{i+1}", (u + 10, v - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
# 绘制标定区域边框连接4个点
pts = self.image_points.astype(np.int32).reshape((-1, 1, 2))
cv2.polylines(frame, [pts], isClosed=True, color=(0, 0, 255), thickness=1)
return frame
def find_target(self, target_name):
"""根据目标名称查找坐标"""
if target_name in self.detected_targets:
rx, ry, _ = self.detected_targets[target_name]
return rx, ry
return None
def execute_pick(self, rx, ry):
"""执行抓取动作"""
print(f"\n>>> 执行抓取: ({rx:.1f}, {ry:.1f})")
Z_HOVER = 120.0
Z_GRAB = -15.0
Z_AFTER_PICK = 50.0
p_up = np.array([rx, ry, Z_HOVER])
p_down = np.array([rx, ry, Z_GRAB])
p_after = np.array([rx, ry, Z_AFTER_PICK])
# 张开夹爪
self.arm.gripper_control(70)
self.gripper_closed = False
time.sleep(0.3)
# 【修复】清空缓冲区,确保新动作不受之前状态影响
self.arm.servo_buffer = []
# 【修复】使用 move_line 从当前位置移动到目标上方(更可靠)
# 先获取当前位置的近似值
current_approx = np.array([120.0, 0.0, 60.0]) # 默认休息位置
self.arm.move_line(current_approx, p_up, p_start=-60, p_end=-60, duration=1.5)
# 下降抓取
self.arm.move_line(p_up, p_down, p_start=-60, p_end=-90, duration=1.5)
# 闭合夹爪
self.arm.gripper_control(120)
self.gripper_closed = True
time.sleep(0.5)
# 抬起
self.arm.move_line(p_down, p_after, p_start=-90, p_end=-60, duration=1.0)
return p_after
def execute_lift(self, rx, ry, height):
"""执行抓取并抬起到指定高度"""
print(f"\n>>> 执行抬起: 目标({rx:.1f}, {ry:.1f}), 高度{height}mm")
Z_HOVER = 120.0
Z_GRAB = -15.0
p_up = np.array([rx, ry, Z_HOVER])
p_down = np.array([rx, ry, Z_GRAB])
p_lifted = np.array([rx, ry, Z_GRAB + height])
p_lifted[2] = np.clip(p_lifted[2], -20, 200)
# 张开夹爪
self.arm.gripper_control(70)
self.gripper_closed = False
time.sleep(0.3)
# 【修复】清空缓冲区
self.arm.servo_buffer = []
# 【修复】使用 move_line 移动到目标上方
current_approx = np.array([120.0, 0.0, 60.0])
self.arm.move_line(current_approx, p_up, p_start=-60, p_end=-60, duration=1.5)
# 下降抓取
self.arm.move_line(p_up, p_down, p_start=-60, p_end=-90, duration=1.5)
# 闭合夹爪
self.arm.gripper_control(120)
self.gripper_closed = True
time.sleep(0.5)
# 抬起到指定高度
self.arm.move_line(p_down, p_lifted, p_start=-90, p_end=-60, duration=1.5)
print(f"✓ 已抬起到高度: Z={p_lifted[2]:.1f}mm")
return p_lifted
def execute_release(self):
"""只松开夹爪"""
print("\n>>> 执行松开夹爪")
self.arm.gripper_control(70)
self.gripper_closed = False
time.sleep(0.5)
print("✓ 夹爪已松开")
def execute_drop(self, current_pos):
"""执行放下动作 - 智能高度判断"""
print("\n>>> 执行动作: 放下 (智能高度控制)")
# 定义桌面高度 (参考 execute_pick 中的 Z_GRAB)
Z_TABLE = -15.0
# 计算下移距离
drop_height = current_pos[2] - Z_TABLE
print(f">>> [高度数据记录] 起始高度: {current_pos[2]:.1f}mm | 目标高度: {Z_TABLE}mm")
print(f">>> [自动计算] 下移距离: {drop_height:.1f}mm")
# 目标位置当前XY但是Z轴降到桌面
p_drop = np.array([current_pos[0], current_pos[1], Z_TABLE])
# 【修复】清空缓冲区
self.arm.servo_buffer = []
# 移动到放置点
# 注意:如果当前已经在桌面高度以下,不要强行移动,或者先抬升?
# 这里假设是从上方放下,直接直线移动
self.arm.move_line(current_pos, p_drop, p_start=-60, p_end=-90, duration=2.0)
# 松开夹爪
self.arm.gripper_control(70)
self.gripper_closed = False
time.sleep(0.5)
print("✓ 物体已平稳放置到桌面")
return p_drop
def execute_reset(self):
"""复位到初始位置"""
print("\n>>> 执行复位")
p_rest = np.array([120, 0, 60])
# 松开夹爪
self.arm.gripper_control(70)
self.gripper_closed = False
time.sleep(0.3)
# 【修复】清空缓冲区
self.arm.servo_buffer = []
# 【修复】使用 move_line 移动(更可靠)
# 从一个安全的中间位置开始
p_safe = np.array([120, 0, 100])
s_safe = self.arm.inverse_kinematics(p_safe, target_pitch=-60)
self.arm._send_and_audit(s_safe, p_safe)
time.sleep(0.5)
self.arm.move_line(p_safe, p_rest, p_start=-60, p_end=-60, duration=1.5)
print("✓ 已复位")
return p_rest
# =========================================================
# 4. 主应用程序
# =========================================================
class RobotApp:
def __init__(self):
self.grasp_sys = AutoGraspSystem()
self.brain = RobotBrain()
self.ear = RobotEar()
self.current_pos = np.array([120.0, 0.0, 60.0])
self.is_recording = False
self.audio_frames = []
def audio_callback(self, indata, frames, time_info, status):
"""音频流回调函数"""
if self.is_recording:
self.audio_frames.append(indata.copy())
def get_audio_text(self):
"""将录制的音频转换为文本"""
if not self.audio_frames:
return ""
audio_data = np.concatenate(self.audio_frames, axis=0)
# 【优化1】裁剪首尾静音减少 Whisper 幻觉
audio_flat = audio_data.flatten()
threshold = 0.01
nonzero = np.where(np.abs(audio_flat) > threshold)[0]
if len(nonzero) == 0:
print(">>> [语音] 未检测到有效音频")
return ""
# 前后各留 0.3 秒余量
margin = int(16000 * 0.3)
start = max(0, nonzero[0] - margin)
end = min(len(audio_flat), nonzero[-1] + margin)
audio_trimmed = audio_flat[start:end]
# 【优化2】音频太短<0.5秒)或太长(>15秒直接跳过
duration = len(audio_trimmed) / 16000
if duration < 0.5:
print(f">>> [语音] 音频太短({duration:.1f}s),跳过")
return ""
if duration > 15.0:
print(f">>> [语音] 音频太长({duration:.1f}s)截断到15秒")
audio_trimmed = audio_trimmed[:16000 * 15]
import scipy.io.wavfile as wav
temp_file = "temp_voice.wav"
wav.write(temp_file, 16000, (audio_trimmed * 32767).astype(np.int16))
segments, _ = self.ear.model.transcribe(
temp_file,
beam_size=5,
language="zh",
no_speech_threshold=0.5, # 静音段置信度阈值
condition_on_previous_text=False, # 不依赖前文,防止重复幻觉
initial_prompt="机械臂控制指令:抓取,抬起,放下,松开,复位,点头,摇头,削笔刀,盒子,零件,瓶子,厘米,毫米,向上,向下,向左,向右,向前,向后"
)
text = "".join([s.text for s in segments])
text = self._fix_recognition(text.strip())
return text
def _fix_recognition(self, text):
"""【优化6】语音识别后处理纠正常见错误 + 去重复幻觉"""
if not text:
return text
# 去除标点
text = re.sub(r'[,,。!?!?、;]', '', text)
# 纠正常见谐音误识别
replacements = {
'小笔刀': '削笔刀', '消笔刀': '削笔刀', '销笔刀': '削笔刀',
'零米': '厘米', '里米': '厘米', '黎米': '厘米', '离米': '厘米',
'公分': '厘米', '利米': '厘米',
# 新增 点头/摇头 增强
'电头': '点头', '点投': '点头', '店头': '点头', '垫头': '点头',
'药头': '摇头', '要头': '摇头', '右头': '摇头', '咬头': '摇头', '摇土': '摇头',
}
for wrong, right in replacements.items():
text = text.replace(wrong, right)
# 【关键】检测并清除重复幻觉:如 "向右向右向右..."
# 查找连续重复的2-4字模式
dedup_match = re.match(r'^(.{2,8}?)(.{2,8}?)\2{2,}', text)
if dedup_match:
# 只保留重复前的有效部分
text = dedup_match.group(1)
print(f">>> [去幻觉] 检测到重复,保留: {text}")
# 另一种检测:整句过长且包含大量重复词
if len(text) > 30:
words = re.findall(r'向[上下左右前后]', text)
if len(words) > 3:
# 取第一个方向词及其前面的内容
first_match = re.search(r'(.*?向[上下左右前后].*?\d+.*?厘米)', text)
if first_match:
text = first_match.group(1)
else:
text = text[:20] # 强制截断
print(f">>> [去幻觉] 文本过长,截断为: {text}")
return text.strip()
def execute_command(self, cmd):
"""执行单条指令"""
action = cmd.get("action")
print(f"\n>>> 解析指令: {cmd}")
if action == "lift":
target = cmd.get("target", "part")
height = float(cmd.get("height", 50))
pos = self.grasp_sys.find_target(target)
if pos:
rx, ry = pos
# 【修复】使用正确的方法名
self.current_pos = self.grasp_sys.execute_lift(rx, ry, height)
print(f"✓ 已抬起 {target},当前位置: {self.current_pos}")
else:
print(f"✗ 未找到目标: {target}")
print(f" 当前可见目标: {list(self.grasp_sys.detected_targets.keys())}")
elif action == "pick":
target = cmd.get("target", "part")
pos = self.grasp_sys.find_target(target)
if pos:
rx, ry = pos
# 【修复】使用正确的方法名
self.current_pos = self.grasp_sys.execute_pick(rx, ry)
print(f"✓ 已抓取 {target},当前位置: {self.current_pos}")
else:
print(f"✗ 未找到目标: {target}")
print(f" 当前可见目标: {list(self.grasp_sys.detected_targets.keys())}")
elif action == "move_inc":
axis = cmd.get("axis", "z")
value = float(cmd.get("value", 0))
new_pos = self.current_pos.copy()
if axis == 'x':
new_pos[0] += value
elif axis == 'y':
new_pos[1] += value
elif axis == 'z':
new_pos[2] += value
new_pos[0] = np.clip(new_pos[0], 80, 250)
new_pos[1] = np.clip(new_pos[1], -120, 120)
new_pos[2] = np.clip(new_pos[2], -20, 200)
print(f">>> 增量移动: {axis}{value}mm")
print(f" {self.current_pos}{new_pos}")
self.grasp_sys.arm.move_line(self.current_pos, new_pos, duration=1.5)
self.current_pos = new_pos
print(f"✓ 移动完成,当前位置: {self.current_pos}")
elif action == "release":
self.grasp_sys.execute_release()
elif action == "drop":
self.current_pos = self.grasp_sys.execute_drop(self.current_pos)
elif action == "reset":
self.current_pos = self.grasp_sys.execute_reset()
elif action == "nod":
print(">>> 执行点头动作")
# 记录当前位置
base_pos = self.current_pos.copy()
# 动作幅度 3cm
dist = 30.0
# 清空缓冲,确保动作连贯
self.grasp_sys.arm.servo_buffer = []
for i in range(3):
# 向上
up_pos = base_pos.copy()
up_pos[2] += dist
self.grasp_sys.arm.move_line(base_pos, up_pos, duration=0.5)
# 向下
down_pos = base_pos.copy()
down_pos[2] -= dist
self.grasp_sys.arm.move_line(up_pos, down_pos, duration=0.8)
# 回中
self.grasp_sys.arm.move_line(down_pos, base_pos, duration=0.5)
self.current_pos = base_pos
print("✓ 点头完成")
elif action == "shake_head":
print(">>> 执行摇头动作")
# 记录当前位置
base_pos = self.current_pos.copy()
# 动作幅度 3cm
dist = 30.0
# 清空缓冲
self.grasp_sys.arm.servo_buffer = []
for i in range(3):
# 向左 (y增加是左)
left_pos = base_pos.copy()
left_pos[1] += dist
self.grasp_sys.arm.move_line(base_pos, left_pos, duration=0.5)
# 向右
right_pos = base_pos.copy()
right_pos[1] -= dist
self.grasp_sys.arm.move_line(left_pos, right_pos, duration=0.8)
# 回中
self.grasp_sys.arm.move_line(right_pos, base_pos, duration=0.5)
self.current_pos = base_pos
print("✓ 摇头完成")
else:
print(f"✗ 未知动作: {action}")
def run(self):
"""主循环"""
window_name = "Voice Robot Control"
cv2.namedWindow(window_name)
# 【关键】注册鼠标回调函数
cv2.setMouseCallback(window_name, self.grasp_sys.mouse_callback)
stream = sd.InputStream(
callback=self.audio_callback,
channels=1,
samplerate=16000
)
stream.start()
print("\n" + "="*50)
print(" 语音控制机械臂系统")
print("="*50)
print(" [SPACE] : 按住录音,松开识别")
print(" [C] : 进入标定模式 (点击4个点)")
print(" [R] : 手动复位")
print(" [O] : 手动松开夹爪")
print(" [Q] : 退出程序")
print("="*50)
while True:
ret, frame = self.grasp_sys.cap.read()
if not ret:
break
frame = cv2.flip(frame, 1)
# 更新YOLO检测标定模式下会跳过
frame = self.grasp_sys.update_detections(frame)
# 绘制标定UI标定点、标定框等
frame = self.grasp_sys.draw_calibration_ui(frame)
# 显示录音状态
if self.is_recording:
cv2.putText(frame, "● RECORDING...", (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
# 显示夹爪状态
gripper_status = "CLOSED" if self.grasp_sys.gripper_closed else "OPEN"
gripper_color = (0, 0, 255) if self.grasp_sys.gripper_closed else (0, 255, 0)
cv2.putText(frame, f"Gripper: {gripper_status}", (1050, 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, gripper_color, 2)
# 显示当前位置
pos_text = f"Pos: X={self.current_pos[0]:.0f} Y={self.current_pos[1]:.0f} Z={self.current_pos[2]:.0f}"
cv2.putText(frame, pos_text, (50, 700), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
# 显示检测到的目标(非标定模式)
if not self.grasp_sys.is_calibrating:
targets_text = f"Targets: {list(self.grasp_sys.detected_targets.keys())}"
cv2.putText(frame, targets_text, (50, 670), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
cv2.imshow(window_name, frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('c'):
# 进入/退出标定模式
if not self.grasp_sys.is_calibrating:
self.grasp_sys.start_calibration()
else:
# 如果已经在标定模式按C取消
print(">>> [取消] 退出标定模式")
self.grasp_sys.is_calibrating = False
self.grasp_sys.temp_points = []
elif key == ord(' '):
# 标定模式下禁用录音
if self.grasp_sys.is_calibrating:
print(">>> [提示] 请先完成标定或按C取消")
continue
if not self.is_recording:
self.is_recording = True
self.audio_frames = []
print("\n🎤 录音开始... (松开空格键结束)")
else:
self.is_recording = False
print("🧠 正在识别语音...")
text = self.get_audio_text()
print(f"✅ 你说: \"{text}\"")
if text:
print("🤖 正在解析指令...")
cmds = self.brain.think(text)
if cmds:
print(f"📋 解析结果: {cmds}")
for cmd in cmds:
self.execute_command(cmd)
print("\n>>> 所有指令执行完毕,等待下一条语音...")
else:
print("✗ 无法解析指令")
elif key == ord('r'):
if not self.grasp_sys.is_calibrating:
self.current_pos = self.grasp_sys.execute_reset()
elif key == ord('o'):
if not self.grasp_sys.is_calibrating:
self.grasp_sys.execute_release()
stream.stop()
self.grasp_sys.cap.release()
cv2.destroyAllWindows()
self.grasp_sys.arm.close()
print("\n>>> 程序已退出")
# =========================================================
# 5. 程序入口
# =========================================================
if __name__ == "__main__":
app = RobotApp()
app.run()

31
whisper_main.py Normal file
View file

@ -0,0 +1,31 @@
# 文件名: whisper_main.py
import sounddevice as sd
import numpy as np
import scipy.io.wavfile as wav
from faster_whisper import WhisperModel
class RobotEar:
def __init__(self, model_size="base"):
self.model = WhisperModel(model_size, device="cuda", compute_type="float16")
self.fs = 16000
self.recording_buffer = []
def start_recording(self):
self.recording_buffer = []
# 开始长录音
sd.start_stream(samplerate=self.fs, channels=1)
print(">>> [耳朵] 录音中...")
def record_callback(self, indata, frames, time, status):
self.recording_buffer.append(indata.copy())
def get_text(self, audio_data):
"""将传入的音频数组转为文字"""
temp_file = "temp_voice.wav"
# 归一化音频数据
audio_np = np.concatenate(audio_data, axis=0)
wav.write(temp_file, self.fs, (audio_np * 32767).astype(np.int16))
segments, info = self.model.transcribe(temp_file, beam_size=5, language="zh")
text = "".join([s.text for s in segments])
return text.strip()

184
项目介绍文档.md Normal file
View file

@ -0,0 +1,184 @@
# 基于多模态融合的具身智能机械臂控制系统
## 一、项目概述
本项目构建了一套完整的**具身智能Embodied AI机械臂控制系统**,实现了"语音下达指令 → 大模型语义解析 → 视觉定位目标 → 机械臂精准执行"的全链路闭环。系统在消费级硬件RTX 3060 Laptop, 6GB 显存)上完成全部推理,无需联网,满足边缘部署需求。
### 核心能力
| 能力 | 描述 |
|:---|:---|
| **听** | 本地语音识别Faster-Whisper实时将中文语音转文本 |
| **想** | 微调大模型DeepSeek-R1-1.5B + QLoRA将自然语言解析为结构化 JSON 指令 |
| **看** | 目标检测YOLOv8s+ 单应性矩阵标定,像素坐标精准映射到机械臂坐标系 |
| **动** | D-H 运动学建模 + S-Curve 轨迹规划,实现平滑、无抖动的机械臂运动 |
---
## 二、系统架构
```
语音输入 (麦克风)
┌──────────────────┐
│ Faster-Whisper │ 语音识别 (STT)
│ (Base, CUDA) │ 中文语音 → 文本
└────────┬─────────┘
│ "把削笔刀抬起5厘米"
┌──────────────────┐
│ 规则解析引擎 │ 简单指令直接匹配(松开/复位/方向移动)
│ (正则匹配) │ ──命中──→ 直接生成 JSON
└────────┬─────────┘
│ 未命中(含物体名的复杂指令)
┌──────────────────┐
│ DeepSeek-R1-1.5B │ 微调大模型推理
│ (QLoRA微调+FP16) │ 自然语言 → JSON 指令
└────────┬─────────┘
│ [{"action": "lift", "target": "part", "height": 50}]
┌──────────────────┐
│ YOLOv8s 目标检测 │ 实时检测目标物体位置
│ + 单应性矩阵标定 │ 像素坐标 → 机械臂坐标
└────────┬─────────┘
│ 目标坐标 (rx=170, ry=3)
┌──────────────────┐
│ 运动控制引擎 │ D-H 逆运动学 + S-Curve 插值
│ (ESP32 + MG996R) │ 平滑轨迹规划 → 舵机执行
└──────────────────┘
```
---
## 三、核心技术详解
### 3.1 语音识别 — Faster-Whisper
- **模型**Faster-Whisper BaseCUDA FP16 加速
- **交互方式**:按住空格键录音,松开后识别
- **反幻觉优化**
- 音频首尾静音裁剪,过滤空白段触发的重复幻觉
- 设置 `condition_on_previous_text=False`,防止上下文依赖导致的"向右向右向右..."循环
- 后处理纠错引擎:谐音修正("零米"→"厘米"、"小笔刀"→"削笔刀"+ 重复模式检测与去除
### 3.2 语义解析 — 微调大模型 + 规则引擎双通道
#### 微调大模型
| 项目 | 详情 |
|:---|:---|
| **基座模型** | DeepSeek-R1-Distill-Qwen-1.5B |
| **微调方法** | QLoRA4-bit NF4 量化训练FP16 推理) |
| **训练框架** | LLaMA-Factory |
| **训练数据** | ~500 条垂直领域样本(中文指令 → JSON |
| **训练结果** | Loss 收敛至 0.0519,格式错误率 0% |
| **推理延迟** | < 200ms (RTX 3060) |
| **显存占用** | < 4GB (FP16) |
**关键技术点**
1. **Prompt 对齐**:推理时的 System Prompt 必须与训练数据完全一致,否则模型输出偏移
2. **Pre-filling 截断**:手动追加 `<Assistant>` 标签跳过模型的思考链CoT直接输出 JSON
3. **确定性解码**`do_sample=False` 贪婪搜索,确保相同输入永远产生相同输出
#### 规则解析引擎(前置拦截)
对简单指令(纯方向移动/松开/复位)直接走**正则匹配**,不经过大模型:
- 避免模型将"向下三厘米"误判为 `lift`
- 响应更快(微秒级 vs 毫秒级)
- 只有包含物体名称的复杂指令(如"把削笔刀抬起5厘米")才交给大模型处理
### 3.3 视觉感知 — YOLOv8 + 手眼标定
| 项目 | 详情 |
|:---|:---|
| **检测模型** | YOLOv8s自定义训练50 张样本迁移学习) |
| **空间映射** | 单应性矩阵Homography4 点标定 |
| **交互标定** | 按 `C` 键实时点击标定点,支持摄像头移位后重新校准 |
| **镜像修正** | `cv2.flip` + 数学补偿公式 $r_y = C_{center} \times 2 - r_{y_{raw}}$ |
### 3.4 运动控制 — D-H 建模 + 减震系统
| 项目 | 详情 |
|:---|:---|
| **运动学** | D-H 参数建模 + Scipy SLSQP 数值逆解 |
| **姿态约束** | Pitch = -90°强制抓手垂直地面 |
| **轨迹规划** | S-Curve 余弦插值(慢起 → 匀移 → 慢停) |
| **信号滤波** | EMA 移动平均 + 阻尼系数,抑制长力臂惯性震动 |
| **硬件适配** | 阶梯式上电(防浪涌)、互补数映射(统一电机极性) |
### 3.5 硬件平台
| 组件 | 型号/规格 |
|:---|:---|
| **主控** | ESP32-WROOM-32 |
| **舵机** | MG996R × 5大扭矩金属齿轮 |
| **电源** | 6V / 6A 外部直流电源 |
| **GPU** | NVIDIA RTX 3060 Laptop (6GB VRAM) |
| **摄像头** | USB 广角摄像头 (1280×720) |
---
## 四、技术栈总览
```
┌─────────────────────────────────────────────────┐
│ 应用层 │
│ voice_main.py主程序
│ 语音交互 · 指令调度 · 状态管理 │
├────────────┬────────────┬───────────┬────────────┤
│ 语音识别 │ 语义解析 │ 视觉感知 │ 运动控制 │
│ │ │ │ │
│ Faster- │ DeepSeek │ YOLOv8s │ D-H IK │
│ Whisper │ R1-1.5B │ + Homog- │ + S-Curve │
│ (Base) │ (QLoRA) │ raphy │ + EMA │
│ │ + 规则引擎 │ │ │
├────────────┴────────────┴───────────┴────────────┤
│ 框架层 │
│ PyTorch · Transformers · OpenCV · Ultralytics │
│ SoundDevice · Scipy · NumPy │
├──────────────────────────────────────────────────┤
│ 硬件层 │
│ RTX 3060 (CUDA) · ESP32 · MG996R · USB Camera │
└──────────────────────────────────────────────────┘
```
---
## 五、指令支持与示例
| 语音指令 | 解析结果 (JSON) | 执行动作 |
|:---|:---|:---|
| "把削笔刀抬起5厘米" | `[{"action":"lift","target":"part","height":50}]` | 视觉定位 → 抓取 → 抬起 50mm |
| "向左移动3厘米" | `[{"action":"move_inc","axis":"y","value":30}]` | Y 轴正方向移动 30mm |
| "向下移动2厘米" | `[{"action":"move_inc","axis":"z","value":-20}]` | Z 轴负方向移动 20mm |
| "松开" | `[{"action":"release"}]` | 张开夹爪 |
| "回到原位" | `[{"action":"reset"}]` | 复位至初始姿态 |
---
## 六、关键问题与解决方案
| 问题 | 原因 | 解决方案 |
|:---|:---|:---|
| 舵机上电瞬间 ESP32 死机 | 5 路电机同时启动,浪涌电流导致掉电 | 阶梯式上电,间隔 0.5~1.5s 依次激活 |
| 机械臂移动轨迹走斜线 | 长连杆 (L4=130mm) 导致非线性偏移 | D-H 建模 + Pitch=-90° 姿态约束 + 数值逆解 |
| 机械臂剧烈抖动 | MG996R 响应过快 + 长力臂惯性 | S-Curve 插值 + EMA 滤波 + 阻尼系数 |
| Whisper 输出"向右向右向右..." | 静音段触发重复幻觉 | 音频裁剪 + `condition_on_previous_text=False` + 后处理去重 |
| "向下三厘米"被模型解析为 lift | 模型泛化不足,混淆方向移动与抬起 | 规则引擎前置拦截简单指令 |
| Ollama 推理输出废话/思维链 | GGUF 模板对齐失败 | 改用原生 Transformers + Pre-filling 截断 |
| 5 厘米输出 500 而非 50 | 推理 Prompt 与训练 Prompt 不一致 | 严格对齐 System Prompt |
---
## 七、项目结论
本项目通过**"硬件补偿 + 数学建模 + 深度学习"**的多层深度耦合,在消费级硬件上实现了完整的具身智能系统:
1. **数据质量 > 模型规模**:仅 500 条高质量训练数据1.5B 小模型即可在垂直领域达到 100% 格式遵循率
2. **双通道解析架构**:规则引擎处理简单指令(快速、确定性),大模型处理复杂指令(灵活、语义理解),兼顾速度与智能
3. **全栈离线运行**:语音识别、语义解析、视觉检测、运动控制全部在本地完成,显存占用 < 4GB满足边缘部署需求
4. **工程鲁棒性**:从硬件防浪涌、舵机减震到语音去幻觉,每一层都有针对性的容错机制