merge dev: consolidate all fixes + config.py refactor

Merges all fix/feat branches:
- fix/exception-handling-and-deque  (closes #1, #6)
- fix/whisper-dead-code-and-import  (closes #2)
- fix/current-pos-tracking-and-requirements (closes #3, #4)
- feat/i18n-english-first           (closes #5)
- refactor: config.py magic constants (closes #7)
This commit is contained in:
m1ngsama 2026-02-20 21:04:34 +08:00
commit 4eb716c8ad
6 changed files with 576 additions and 599 deletions

1
.gitignore vendored
View file

@ -45,6 +45,7 @@ temp_voice.wav
!voice_main.py
!arm_main.py
!whisper_main.py
!config.py
!项目介绍文档.md
!ck.md
!lora.md

View file

@ -1,37 +1,42 @@
import numpy as np
from scipy.optimize import minimize
import serial
import time
from collections import deque
import math
import time
import matplotlib.pyplot as plt
import numpy as np
import serial
from scipy.optimize import minimize
import config
class RobotArmUltimate:
def __init__(self, port='COM3', baud=115200):
# --- 1. 物理参数 ---
def __init__(self, port=config.SERIAL_PORT, baud=config.SERIAL_BAUD):
# --- 1. Physical parameters (mm) ---
self.L1, self.L2, self.L3, self.L4 = 70.0, 75.0, 50.0, 130.0
self.current_servos_logic = np.array([90.0, 90.0, 90.0, 90.0])
self.last_sent_servos = np.array([0.0, 0.0, 0.0, 0.0]) # 记录上次发送的值
self.last_sent_servos = np.array([0.0, 0.0, 0.0, 0.0])
self.path_history = []
# --- 【新增】倾斜修正参数 ---
# 如果水平移动时往下掉,增加 OFFSET_Y (例如 2.0)
self.OFFSET_Y = 0.0
self.OFFSET_Z = 0.0
# --- 2. 减震参数 ---
self.servo_buffer = [] # 指令缓冲用于平滑滤波
self.buffer_size = 3 # 移动平均窗口大小
self.max_servo_speed = 30.0 # 舵机最大速度
self.damping_factor = 0.7 # 阻尼因子
# --- 2. Tilt correction offsets ---
# Increase OFFSET_Y if end-effector droops during horizontal moves
self.OFFSET_Y = 0.0
self.OFFSET_Z = 0.0
# --- 3. Damping parameters ---
self.buffer_size = config.DAMPING_BUFFER_SIZE
self.servo_buffer = deque(maxlen=self.buffer_size)
self.max_servo_speed = config.DAMPING_MAX_SPEED # max servo speed (deg/frame)
self.damping_factor = config.DAMPING_FACTOR
try:
self.ser = serial.Serial(port, baud, timeout=1)
time.sleep(4)
time.sleep(4)
self.ser.flushInput()
print(">>> [系统就绪] 已加载 S-Curve 平滑减震算法 + 倾斜修正。")
print("[System Ready] S-Curve smoothing + tilt correction loaded.")
self.gripper_control(70)
except:
print(">>> [警告] 串口未连接,进入仿真。")
except serial.SerialException as e:
print(f"[Warning] Serial connection failed ({e}), entering simulation mode.")
self.ser = None
def dh_matrix(self, theta_deg, d, a, alpha_deg):
@ -63,61 +68,57 @@ class RobotArmUltimate:
return res.x
def _send_and_audit(self, s, target_xyz):
"""带有多层减震的发送函数"""
# --- 【修改点】应用倾斜修正 ---
# 在进入滤波前,先加上固定的偏移量
"""Send servo command through multi-layer damping pipeline."""
# Apply tilt correction before filtering
s_corrected = s.copy()
s_corrected[1] += self.OFFSET_Y
s_corrected[2] += self.OFFSET_Z
s_logic = np.array([np.clip(x, 0, 180) for x in s_corrected])
# --- 层1: 移动平均滤波器 ---
# Layer 1: moving average filter (deque auto-discards oldest)
self.servo_buffer.append(s_logic)
if len(self.servo_buffer) > self.buffer_size:
self.servo_buffer.pop(0)
s_smoothed = np.mean(self.servo_buffer, axis=0)
# --- 层2: 速度限制 ---
# Layer 2: speed limit
delta = s_smoothed - self.current_servos_logic
delta_norm = np.linalg.norm(delta)
if delta_norm > self.max_servo_speed:
s_smoothed = self.current_servos_logic + delta * (self.max_servo_speed / delta_norm)
# --- 层3: 阻尼因子 ---
# Layer 3: damping factor
s_damped = self.current_servos_logic * (1 - self.damping_factor) + s_smoothed * self.damping_factor
s_logic_final = np.array([int(round(np.clip(x, 0, 180))) for x in s_damped])
s_hardware = np.array([s_logic_final[0], s_logic_final[1], 180 - s_logic_final[2], 180 - s_logic_final[3]])
# --- 层4: 死区过滤 ---
# Layer 4: dead-zone filter - skip if total delta < 1 degree
if np.sum(np.abs(s_hardware - self.last_sent_servos)) < 1.0:
return
return
if self.ser:
cmd = f"Servo_ArmX{s_hardware[0]}\nServo_ArmY{s_hardware[1]}\n" \
f"Servo_ArmZ{s_hardware[2]}\nServo_ArmB{s_hardware[3]}\n"
cmd = (f"Servo_ArmX{s_hardware[0]}\nServo_ArmY{s_hardware[1]}\n"
f"Servo_ArmZ{s_hardware[2]}\nServo_ArmB{s_hardware[3]}\n")
self.ser.write(cmd.encode())
self.last_sent_servos = s_hardware.astype(int)
self.current_servos_logic = s_logic_final # 这里的逻辑值已经是修正后的,保证迭代平滑
self.current_servos_logic = s_logic_final
_, _, pts = self.forward_kinematics(*s_logic_final)
self.path_history.append(pts)
def move_line(self, start_xyz, end_xyz, p_start=-90, p_end=-90, duration=3.0):
"""核心改进多层S-Curve轨迹规划 + 动态自适应FPS"""
"""S-Curve trajectory planning with adaptive FPS based on distance."""
distance = np.linalg.norm(end_xyz - start_xyz)
if distance < 50:
fps = 20
fps = 20
elif distance < 150:
fps = 40
fps = 40
else:
fps = 60
steps = max(int(duration * fps), 10)
print(f"规划平滑路径: {start_xyz} -> {end_xyz}, 距离={distance:.1f}mm, FPS={fps}")
fps = 60
steps = max(int(duration * fps), 10)
print(f"[Path] {start_xyz} -> {end_xyz}, dist={distance:.1f}mm, fps={fps}")
for i in range(steps + 1):
t = i / steps
@ -137,60 +138,53 @@ class RobotArmUltimate:
def set_damping_params(self, buffer_size=3, max_speed=30.0, damping_factor=0.7):
self.buffer_size = buffer_size
self.servo_buffer = deque(maxlen=buffer_size)
self.max_servo_speed = max_speed
self.damping_factor = damping_factor
print(f"✓ 减震参数已更新: 滤波窗口={buffer_size}, 限速={max_speed}度/帧, 阻尼={damping_factor}")
print(f"[OK] Damping params updated: window={buffer_size}, speed_limit={max_speed}deg/frame, damping={damping_factor}")
# --- 【新增】设置修正系数的接口 ---
def set_correction(self, offset_y=0.0, offset_z=0.0):
self.OFFSET_Y = offset_y
self.OFFSET_Z = offset_z
print(f"✓ 倾斜修正已更新: Y_OFFSET={offset_y}, Z_OFFSET={offset_z}")
print(f"[OK] Tilt correction updated: Y_OFFSET={offset_y}, Z_OFFSET={offset_z}")
def close(self):
if self.ser: self.ser.close()
if __name__ == "__main__":
arm = RobotArmUltimate(port='COM3')
# 1. 设置减震参数
arm.set_damping_params(buffer_size=3, max_speed=30.0, damping_factor=0.7)
arm = RobotArmUltimate() # uses config.SERIAL_PORT / SERIAL_BAUD
# 2. 【核心调试】在这里设置修正值
# 如果正向移动时 Z 往下掉说明大臂Y低了给它加一点
arm.set_correction(offset_y=-10.0, offset_z=0.0)
p_standby = np.array([110, 100, 20])
p_pick1 = np.array([210, 110, 20])
p_pick2 = np.array([210, -110, 20])
p_drop = np.array([110, -100, 20])
arm.set_damping_params(
buffer_size=config.DAMPING_BUFFER_SIZE,
max_speed=config.DAMPING_MAX_SPEED,
damping_factor=config.DAMPING_FACTOR,
)
arm.set_correction(offset_y=config.OFFSET_Y, offset_z=config.OFFSET_Z)
p_standby = np.array([110, 100, 20])
p_pick1 = np.array([210, 110, 20])
p_pick2 = np.array([210, -110, 20])
p_drop = np.array([110, -100, 20])
try:
print("\n=== 开始平滑动作序列 (带倾斜修正) ===")
# 1. 初始定位
# s_init = arm.inverse_kinematics(p_standby, target_pitch=-90)
# arm._send_and_audit(s_init, p_standby)
# time.sleep(2)
print("\n=== Smooth motion sequence (with tilt correction) ===")
# 2. 减震下降
arm.move_line(p_standby, p_pick1, p_start=-60, p_end=-90, duration=1.0)
arm.gripper_control(120)
# 3. 减震平移
arm.move_line(p_pick1, p_pick2, p_start=-90, p_end=-30, duration=1.0)
time.sleep(1)
arm.move_line(p_pick2, p_drop, p_start=-30, p_end=-90, duration=1.0)
arm.gripper_control(70)
time.sleep(1)
# 4. 减震返回待命
arm.move_line(p_drop, p_standby, p_start=-90, p_end=-60, duration=1.0)
time.sleep(1)
print("\n>>> 任务结束。")
print("\n>>> Done.")
except KeyboardInterrupt:
pass
finally:
if arm.ser: arm.ser.close()
if arm.ser:
arm.ser.close()

52
config.py Normal file
View file

@ -0,0 +1,52 @@
# config.py — hardware constants and motion parameters
# All values can be overridden via environment variables.
import os
# ---------------------------------------------------------------------------
# Serial
# ---------------------------------------------------------------------------
SERIAL_PORT = os.environ.get("ROBOT_PORT", "COM3")
SERIAL_BAUD = int(os.environ.get("ROBOT_BAUD", "115200"))
# ---------------------------------------------------------------------------
# Model paths
# ---------------------------------------------------------------------------
LLM_MODEL_PATH = os.environ.get("LLM_MODEL_PATH", r"D:\lora\2")
YOLO_MODEL_PATH = os.environ.get("YOLO_MODEL_PATH", "best.pt")
# ---------------------------------------------------------------------------
# Z-axis key heights (mm), relative to robot base
# ---------------------------------------------------------------------------
Z_HOVER = float(os.environ.get("Z_HOVER", "120.0")) # safe transit height
Z_GRAB = float(os.environ.get("Z_GRAB", "-15.0")) # table / grasp surface
Z_AFTER_PICK = float(os.environ.get("Z_AFTER_PICK", "50.0")) # clearance after grasp
# ---------------------------------------------------------------------------
# Rest (home) position [x, y, z] in mm
# Exposed as three scalars so config.py has no numpy dependency.
# Callers: np.array([config.REST_X, config.REST_Y, config.REST_Z])
# ---------------------------------------------------------------------------
REST_X = float(os.environ.get("REST_X", "120.0"))
REST_Y = float(os.environ.get("REST_Y", "0.0"))
REST_Z = float(os.environ.get("REST_Z", "60.0"))
# ---------------------------------------------------------------------------
# Workspace hard limits (mm) — movements are clipped to these bounds
# ---------------------------------------------------------------------------
WS_X = (float(os.environ.get("WS_X_MIN", "80.0")), float(os.environ.get("WS_X_MAX", "250.0")))
WS_Y = (float(os.environ.get("WS_Y_MIN", "-120.0")), float(os.environ.get("WS_Y_MAX", "120.0")))
WS_Z = (float(os.environ.get("WS_Z_MIN", "-20.0")), float(os.environ.get("WS_Z_MAX", "200.0")))
# ---------------------------------------------------------------------------
# Damping / smoothing
# ---------------------------------------------------------------------------
DAMPING_BUFFER_SIZE = int(os.environ.get("DAMPING_BUFFER", "3"))
DAMPING_MAX_SPEED = float(os.environ.get("DAMPING_MAX_SPEED", "25.0")) # deg/frame
DAMPING_FACTOR = float(os.environ.get("DAMPING_FACTOR", "0.6"))
# ---------------------------------------------------------------------------
# Tilt correction offsets (servo degrees)
# Tune OFFSET_Y > 0 if the end-effector droops during horizontal moves.
# ---------------------------------------------------------------------------
OFFSET_Y = float(os.environ.get("OFFSET_Y", "-10.0"))
OFFSET_Z = float(os.environ.get("OFFSET_Z", "0.0"))

33
requirements.txt Normal file
View file

@ -0,0 +1,33 @@
# Robot Arm Voice Control System - Dependencies
# Python 3.10+ required
#
# GPU acceleration (CUDA 11.8 / 12.x) is strongly recommended.
# Install PyTorch with CUDA from: https://pytorch.org/get-started/locally/
# Example (CUDA 12.1):
# pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# --- Core scientific stack ---
numpy>=1.24
scipy>=1.11
# --- Serial communication ---
pyserial>=3.5
# --- Computer vision ---
opencv-python>=4.8
ultralytics>=8.0 # YOLOv8
# --- Audio ---
sounddevice>=0.4.6
# --- Speech recognition ---
faster-whisper>=1.0.0 # NOTE: this is faster-whisper, NOT openai-whisper
# --- LLM inference ---
torch>=2.0 # install with CUDA wheels from pytorch.org
transformers>=4.40
accelerate>=0.27
peft>=0.10 # LoRA adapter loading
# --- Visualization (arm_main.py demo) ---
matplotlib>=3.7

File diff suppressed because it is too large Load diff

View file

@ -1,31 +1,27 @@
# 文件名: whisper_main.py
import sounddevice as sd
import numpy as np
import scipy.io.wavfile as wav
import sounddevice as sd
from faster_whisper import WhisperModel
class RobotEar:
"""Speech recognition module backed by faster-whisper."""
def __init__(self, model_size="base"):
self.model = WhisperModel(model_size, device="cuda", compute_type="float16")
self.fs = 16000
self.recording_buffer = []
def start_recording(self):
self.recording_buffer = []
# 开始长录音
sd.start_stream(samplerate=self.fs, channels=1)
print(">>> [耳朵] 录音中...")
def record_callback(self, indata, frames, time, status):
self.recording_buffer.append(indata.copy())
def get_text(self, audio_data):
"""将传入的音频数组转为文字"""
"""Transcribe audio frames to text.
Args:
audio_data: list of numpy arrays captured from sounddevice InputStream.
Returns:
Transcribed string (stripped).
"""
temp_file = "temp_voice.wav"
# 归一化音频数据
audio_np = np.concatenate(audio_data, axis=0)
wav.write(temp_file, self.fs, (audio_np * 32767).astype(np.int16))
segments, info = self.model.transcribe(temp_file, beam_size=5, language="zh")
text = "".join([s.text for s in segments])
return text.strip()
segments, _ = self.model.transcribe(temp_file, beam_size=5, language="zh")
return "".join(s.text for s in segments).strip()