refactor: full cleanup — dead code, audio pipeline, helpers, ino translation

refactor: full cleanup — dead code, audio pipeline, helpers, ino translation
This commit is contained in:
m1ngsama 2026-02-20 21:46:25 +08:00 committed by GitHub
commit 0975d7da37
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 181 additions and 213 deletions

View file

@ -3,7 +3,6 @@ from collections import deque
import math import math
import time import time
import matplotlib.pyplot as plt
import numpy as np import numpy as np
import serial import serial
from scipy.optimize import minimize from scipy.optimize import minimize
@ -16,12 +15,10 @@ class RobotArmUltimate:
self.L1, self.L2, self.L3, self.L4 = 70.0, 75.0, 50.0, 130.0 self.L1, self.L2, self.L3, self.L4 = 70.0, 75.0, 50.0, 130.0
self.current_servos_logic = np.array([90.0, 90.0, 90.0, 90.0]) self.current_servos_logic = np.array([90.0, 90.0, 90.0, 90.0])
self.last_sent_servos = np.array([0.0, 0.0, 0.0, 0.0]) self.last_sent_servos = np.array([0.0, 0.0, 0.0, 0.0])
self.path_history = []
# --- 2. Tilt correction offsets --- # --- 2. Tilt correction offsets (loaded from config; tune via OFFSET_Y/OFFSET_Z env vars) ---
# Increase OFFSET_Y if end-effector droops during horizontal moves self.OFFSET_Y = config.OFFSET_Y
self.OFFSET_Y = 0.0 self.OFFSET_Z = config.OFFSET_Z
self.OFFSET_Z = 0.0
# --- 3. Damping parameters --- # --- 3. Damping parameters ---
self.buffer_size = config.DAMPING_BUFFER_SIZE self.buffer_size = config.DAMPING_BUFFER_SIZE
@ -103,8 +100,6 @@ class RobotArmUltimate:
self.last_sent_servos = s_hardware.astype(int) self.last_sent_servos = s_hardware.astype(int)
self.current_servos_logic = s_logic_final self.current_servos_logic = s_logic_final
_, _, pts = self.forward_kinematics(*s_logic_final)
self.path_history.append(pts)
def move_line(self, start_xyz, end_xyz, p_start=-90, p_end=-90, duration=3.0): def move_line(self, start_xyz, end_xyz, p_start=-90, p_end=-90, duration=3.0):
"""S-Curve trajectory planning with adaptive FPS based on distance.""" """S-Curve trajectory planning with adaptive FPS based on distance."""
@ -154,13 +149,6 @@ class RobotArmUltimate:
if __name__ == "__main__": if __name__ == "__main__":
arm = RobotArmUltimate() # uses config.SERIAL_PORT / SERIAL_BAUD arm = RobotArmUltimate() # uses config.SERIAL_PORT / SERIAL_BAUD
arm.set_damping_params(
buffer_size=config.DAMPING_BUFFER_SIZE,
max_speed=config.DAMPING_MAX_SPEED,
damping_factor=config.DAMPING_FACTOR,
)
arm.set_correction(offset_y=config.OFFSET_Y, offset_z=config.OFFSET_Z)
p_standby = np.array([110, 100, 20]) p_standby = np.array([110, 100, 20])
p_pick1 = np.array([210, 110, 20]) p_pick1 = np.array([210, 110, 20])
p_pick2 = np.array([210, -110, 20]) p_pick2 = np.array([210, -110, 20])

View file

@ -50,3 +50,22 @@ DAMPING_FACTOR = float(os.environ.get("DAMPING_FACTOR", "0.6"))
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
OFFSET_Y = float(os.environ.get("OFFSET_Y", "-10.0")) OFFSET_Y = float(os.environ.get("OFFSET_Y", "-10.0"))
OFFSET_Z = float(os.environ.get("OFFSET_Z", "0.0")) OFFSET_Z = float(os.environ.get("OFFSET_Z", "0.0"))
# ---------------------------------------------------------------------------
# Audio processing
# ---------------------------------------------------------------------------
AUDIO_SILENCE_THRESHOLD = float(os.environ.get("AUDIO_SILENCE_THRESHOLD", "0.01"))
AUDIO_SILENCE_MARGIN = float(os.environ.get("AUDIO_SILENCE_MARGIN", "0.3")) # seconds
AUDIO_MIN_DURATION = float(os.environ.get("AUDIO_MIN_DURATION", "0.5")) # seconds
AUDIO_MAX_DURATION = float(os.environ.get("AUDIO_MAX_DURATION", "15.0")) # seconds
# ---------------------------------------------------------------------------
# Default motion increment for fuzzy directional commands (no explicit distance)
# ---------------------------------------------------------------------------
DEFAULT_MOVE_MM = float(os.environ.get("DEFAULT_MOVE_MM", "50.0"))
# ---------------------------------------------------------------------------
# Gesture animation (nod / shake_head)
# ---------------------------------------------------------------------------
GESTURE_AMPLITUDE = float(os.environ.get("GESTURE_AMPLITUDE", "30.0")) # mm
GESTURE_CYCLES = int(os.environ.get("GESTURE_CYCLES", "3"))

View file

@ -1,45 +1,45 @@
/* /*
* ESP32 PWM ( Core 3.x ) * ESP32 native hardware PWM servo controller (ESP32 Arduino Core 3.x)
* 'ledcSetup' was not declared * Uses the updated ledcAttach/ledcWrite API (replaces deprecated ledcSetup).
*/ */
// --- 1. 引脚定义 --- // --- 1. Pin assignments ---
const int PIN_X = 14; const int PIN_X = 14; // X-axis servo
const int PIN_Y = 4; const int PIN_Y = 4; // Y-axis servo
const int PIN_Z = 5; const int PIN_Z = 5; // Z-axis servo
const int PIN_B = 18; const int PIN_B = 18; // Base rotation servo
const int PIN_G = 23; const int PIN_G = 23; // Gripper servo
// --- 2. PWM 参数 (舵机标准) --- // --- 2. PWM parameters (standard servo) ---
const int freq = 50; // 频率 50Hz (周期 20ms) const int freq = 50; // 50 Hz (20 ms period)
const int resolution = 12; // 分辨率 12位 (数值范围 0-4095) const int resolution = 12; // 12-bit resolution (04095)
// 舵机角度对应的占空比数值 (12位分辨率) // Duty-cycle values for servo angles (12-bit, 50 Hz):
// 0.5ms (0度) -> 约 102 // 0.5 ms ( 0°) → ~102
// 1.5ms (90度) -> 约 307 // 1.5 ms ( 90°) → ~307
// 2.5ms (180度) -> 约 512 // 2.5 ms (180°) → ~512
void setup() { void setup() {
Serial.begin(115200); Serial.begin(115200);
delay(1000); delay(1000);
Serial.println("\n--- 使用 ESP32 Core 3.x LEDC 驱动初始化 ---"); Serial.println("\n--- ESP32 Core 3.x LEDC driver initializing ---");
// 在新版 Core 中,直接使用 ledcAttach(引脚, 频率, 分辨率) // Core 3.x API: ledcAttach(pin, frequency, resolution)
ledcAttach(PIN_X, freq, resolution); ledcAttach(PIN_X, freq, resolution);
ledcAttach(PIN_Y, freq, resolution); ledcAttach(PIN_Y, freq, resolution);
ledcAttach(PIN_Z, freq, resolution); ledcAttach(PIN_Z, freq, resolution);
ledcAttach(PIN_B, freq, resolution); ledcAttach(PIN_B, freq, resolution);
ledcAttach(PIN_G, freq, resolution); ledcAttach(PIN_G, freq, resolution);
// 初始归中 // Center all servos at startup
writeAngle(PIN_X, 90); delay(500); writeAngle(PIN_X, 90); delay(500);
writeAngle(PIN_Y, 90); delay(500); writeAngle(PIN_Y, 90); delay(500);
writeAngle(PIN_Z, 90); delay(500); writeAngle(PIN_Z, 90); delay(500);
writeAngle(PIN_B, 90); delay(500); writeAngle(PIN_B, 90); delay(500);
writeAngle(PIN_G, 90); delay(500); writeAngle(PIN_G, 90); delay(500);
Serial.println("--- 5轴硬件 PWM 初始化完成 ---"); Serial.println("--- 5-axis hardware PWM ready ---");
} }
void loop() { void loop() {
@ -47,42 +47,39 @@ void loop() {
String cmd = Serial.readStringUntil('\n'); String cmd = Serial.readStringUntil('\n');
cmd.trim(); cmd.trim();
// 根据指令解析并控制 if (cmd.startsWith("Servo_ArmX")) writeAngle(PIN_X, extractAngle(cmd, "Servo_ArmX"));
if (cmd.startsWith("Servo_ArmX")) writeAngle(PIN_X, extractAngle(cmd, "Servo_ArmX")); else if (cmd.startsWith("Servo_ArmY")) writeAngle(PIN_Y, extractAngle(cmd, "Servo_ArmY"));
else if (cmd.startsWith("Servo_ArmY")) writeAngle(PIN_Y, extractAngle(cmd, "Servo_ArmY")); else if (cmd.startsWith("Servo_ArmZ")) writeAngle(PIN_Z, extractAngle(cmd, "Servo_ArmZ"));
else if (cmd.startsWith("Servo_ArmZ")) writeAngle(PIN_Z, extractAngle(cmd, "Servo_ArmZ")); else if (cmd.startsWith("Servo_ArmB")) writeAngle(PIN_B, extractAngle(cmd, "Servo_ArmB"));
else if (cmd.startsWith("Servo_ArmB")) writeAngle(PIN_B, extractAngle(cmd, "Servo_ArmB"));
else if (cmd.startsWith("Servo_Gripper")) { else if (cmd.startsWith("Servo_Gripper")) {
int a = extractAngle(cmd, "Servo_Gripper"); int a = extractAngle(cmd, "Servo_Gripper");
if (a != -1) { if (a != -1) {
writeAngle(PIN_G, a); writeAngle(PIN_G, a);
Serial.print("夹爪已转动至: "); Serial.println(a); Serial.print("Gripper moved to: "); Serial.println(a);
} }
} }
} }
} }
/** /**
* * Write a target angle to a servo pin.
* pin: * pin : GPIO pin number
* angle: (0-180) * angle: target angle in degrees (0180, clamped)
*/ */
void writeAngle(int pin, int angle) { void writeAngle(int pin, int angle) {
if (angle < 0) angle = 0; if (angle < 0) angle = 0;
if (angle > 180) angle = 180; if (angle > 180) angle = 180;
// 线性映射计算占空比 // Linear map: 0° → duty 102, 180° → duty 512
// 0度 = 102, 180度 = 512
int duty = (angle * (512 - 102) / 180) + 102; int duty = (angle * (512 - 102) / 180) + 102;
// 在新版 Core 中ledcWrite 直接接收引脚号和数值 // Core 3.x API: ledcWrite takes pin number directly
ledcWrite(pin, duty); ledcWrite(pin, duty);
} }
// 提取命令中的数字 /** Extract the numeric argument after a known command prefix. Returns -1 on failure. */
int extractAngle(String cmd, String prefix) { int extractAngle(String cmd, String prefix) {
int prefixLen = prefix.length(); String valPart = cmd.substring(prefix.length());
String valPart = cmd.substring(prefixLen);
if (valPart.length() > 0) return valPart.toInt(); if (valPart.length() > 0) return valPart.toInt();
return -1; return -1;
} }

View file

@ -5,7 +5,6 @@ import time
import cv2 import cv2
import numpy as np import numpy as np
import scipy.io.wavfile as wav
import sounddevice as sd import sounddevice as sd
import torch import torch
from transformers import AutoModelForCausalLM, AutoTokenizer from transformers import AutoModelForCausalLM, AutoTokenizer
@ -202,20 +201,19 @@ class RobotBrain:
if m: if m:
return [{"action": "move_inc", "axis": "x", "value": -extract_value(m, 2, 3)}] return [{"action": "move_inc", "axis": "x", "value": -extract_value(m, 2, 3)}]
# Fuzzy directional commands without explicit distance — use default 5 cm # Fuzzy directional commands without explicit distance — use config default
DEFAULT_MOVE = 50.0 # mm
if re.search(r'(向?上|抬起|举起|往上)', text): if re.search(r'(向?上|抬起|举起|往上)', text):
return [{"action": "move_inc", "axis": "z", "value": DEFAULT_MOVE}] return [{"action": "move_inc", "axis": "z", "value": config.DEFAULT_MOVE_MM}]
if re.search(r'(向?下|往下)', text): if re.search(r'(向?下|往下)', text):
return [{"action": "move_inc", "axis": "z", "value": -DEFAULT_MOVE}] return [{"action": "move_inc", "axis": "z", "value": -config.DEFAULT_MOVE_MM}]
if re.search(r'(向?左|往左)', text): if re.search(r'(向?左|往左)', text):
return [{"action": "move_inc", "axis": "y", "value": DEFAULT_MOVE}] return [{"action": "move_inc", "axis": "y", "value": config.DEFAULT_MOVE_MM}]
if re.search(r'(向?右|往右)', text): if re.search(r'(向?右|往右)', text):
return [{"action": "move_inc", "axis": "y", "value": -DEFAULT_MOVE}] return [{"action": "move_inc", "axis": "y", "value": -config.DEFAULT_MOVE_MM}]
if re.search(r'(向?前|往前)', text): if re.search(r'(向?前|往前)', text):
return [{"action": "move_inc", "axis": "x", "value": DEFAULT_MOVE}] return [{"action": "move_inc", "axis": "x", "value": config.DEFAULT_MOVE_MM}]
if re.search(r'(向?后|往后)', text): if re.search(r'(向?后|往后)', text):
return [{"action": "move_inc", "axis": "x", "value": -DEFAULT_MOVE}] return [{"action": "move_inc", "axis": "x", "value": -config.DEFAULT_MOVE_MM}]
return None # not a simple command — fall through to LLM return None # not a simple command — fall through to LLM
@ -326,12 +324,6 @@ class AutoGraspSystem:
def __init__(self): def __init__(self):
print("\n[1/3] Initializing robot arm...") print("\n[1/3] Initializing robot arm...")
self.arm = RobotArmUltimate() # uses config.SERIAL_PORT / SERIAL_BAUD self.arm = RobotArmUltimate() # uses config.SERIAL_PORT / SERIAL_BAUD
self.arm.set_damping_params(
buffer_size=config.DAMPING_BUFFER_SIZE,
max_speed=config.DAMPING_MAX_SPEED,
damping_factor=config.DAMPING_FACTOR,
)
self.arm.set_correction(offset_y=config.OFFSET_Y, offset_z=config.OFFSET_Z)
print("[2/3] Loading YOLO model...") print("[2/3] Loading YOLO model...")
self.model = YOLO(config.YOLO_MODEL_PATH) self.model = YOLO(config.YOLO_MODEL_PATH)
@ -463,22 +455,13 @@ class AutoGraspSystem:
return rx, ry return rx, ry
return None return None
def execute_pick(self, rx, ry, current_pos=None): def _approach_and_grasp(self, rx, ry, current_pos):
"""Execute pick action: move to target (rx, ry) and grasp. """Move to hover above target, descend to grab height, close gripper.
Args: Returns the grab position (p_down) so callers can continue from there.
rx, ry: robot-frame target coordinates (mm)
current_pos: actual current end-effector position (mm, [x, y, z]).
Falls back to REST_POS if None.
""" """
if current_pos is None: p_up = np.array([rx, ry, config.Z_HOVER])
current_pos = np.array([config.REST_X, config.REST_Y, config.REST_Z]) p_down = np.array([rx, ry, config.Z_GRAB])
p_up = np.array([rx, ry, config.Z_HOVER])
p_down = np.array([rx, ry, config.Z_GRAB])
p_after = np.array([rx, ry, config.Z_AFTER_PICK])
print(f"\n[Pick] target=({rx:.1f}, {ry:.1f}), from={current_pos}")
self.arm.gripper_control(70) self.arm.gripper_control(70)
self.gripper_closed = False self.gripper_closed = False
@ -493,43 +476,28 @@ class AutoGraspSystem:
self.gripper_closed = True self.gripper_closed = True
time.sleep(0.5) time.sleep(0.5)
self.arm.move_line(p_down, p_after, p_start=-90, p_end=-60, duration=1.0) return p_down
def execute_pick(self, rx, ry, current_pos=None):
"""Grasp target and lift to Z_AFTER_PICK clearance height."""
if current_pos is None:
current_pos = np.array([config.REST_X, config.REST_Y, config.REST_Z])
print(f"\n[Pick] target=({rx:.1f}, {ry:.1f}), from={current_pos}")
p_down = self._approach_and_grasp(rx, ry, current_pos)
p_after = np.array([rx, ry, config.Z_AFTER_PICK])
self.arm.move_line(p_down, p_after, p_start=-90, p_end=-60, duration=1.0)
return p_after return p_after
def execute_lift(self, rx, ry, height, current_pos=None): def execute_lift(self, rx, ry, height, current_pos=None):
"""Execute pick-and-lift: grasp target and raise by `height` mm. """Grasp target and raise by `height` mm above grab point."""
Args:
rx, ry: robot-frame target coordinates (mm)
height: lift distance above grab point (mm)
current_pos: actual current end-effector position (mm, [x, y, z]).
Falls back to REST_POS if None.
"""
if current_pos is None: if current_pos is None:
current_pos = np.array([config.REST_X, config.REST_Y, config.REST_Z]) current_pos = np.array([config.REST_X, config.REST_Y, config.REST_Z])
p_up = np.array([rx, ry, config.Z_HOVER])
p_down = np.array([rx, ry, config.Z_GRAB])
p_lifted = np.array([rx, ry, np.clip(config.Z_GRAB + height, config.WS_Z[0], config.WS_Z[1])])
print(f"\n[Lift] target=({rx:.1f}, {ry:.1f}), height={height}mm, from={current_pos}") print(f"\n[Lift] target=({rx:.1f}, {ry:.1f}), height={height}mm, from={current_pos}")
p_down = self._approach_and_grasp(rx, ry, current_pos)
self.arm.gripper_control(70) p_lifted = np.array([rx, ry, np.clip(config.Z_GRAB + height, config.WS_Z[0], config.WS_Z[1])])
self.gripper_closed = False
time.sleep(0.3)
self.arm.servo_buffer.clear()
self.arm.move_line(current_pos, p_up, p_start=-60, p_end=-60, duration=1.5)
self.arm.move_line(p_up, p_down, p_start=-60, p_end=-90, duration=1.5)
self.arm.gripper_control(120)
self.gripper_closed = True
time.sleep(0.5)
self.arm.move_line(p_down, p_lifted, p_start=-90, p_end=-60, duration=1.5) self.arm.move_line(p_down, p_lifted, p_start=-90, p_end=-60, duration=1.5)
print(f"[OK] Lifted to Z={p_lifted[2]:.1f}mm") print(f"[OK] Lifted to Z={p_lifted[2]:.1f}mm")
return p_lifted return p_lifted
@ -601,80 +569,29 @@ class RobotApp:
def get_audio_text(self): def get_audio_text(self):
"""Transcribe recorded audio frames to text.""" """Transcribe recorded audio frames to text."""
if not self.audio_frames: return self.ear.get_text(self.audio_frames)
return ""
audio_data = np.concatenate(self.audio_frames, axis=0) def _oscillate(self, axis, amplitude, cycles):
"""Oscillate end-effector along `axis` for nod/shake gestures.
# Trim leading/trailing silence to reduce Whisper hallucinations Args:
audio_flat = audio_data.flatten() axis: 'x', 'y', or 'z'
threshold = 0.01 amplitude: half-swing distance in mm
nonzero = np.where(np.abs(audio_flat) > threshold)[0] cycles: number of full back-and-forth cycles
if len(nonzero) == 0: """
print("[Audio] No speech detected") base_pos = self.current_pos.copy()
return "" idx = {'x': 0, 'y': 1, 'z': 2}[axis]
margin = int(16000 * 0.3) # 0.3 s padding on each side self.grasp_sys.arm.servo_buffer.clear()
start = max(0, nonzero[0] - margin)
end = min(len(audio_flat), nonzero[-1] + margin)
audio_trimmed = audio_flat[start:end]
duration = len(audio_trimmed) / 16000 for _ in range(cycles):
if duration < 0.5: pos_a = base_pos.copy(); pos_a[idx] += amplitude
print(f"[Audio] Too short ({duration:.1f}s), skipping") self.grasp_sys.arm.move_line(base_pos, pos_a, duration=0.5)
return "" pos_b = base_pos.copy(); pos_b[idx] -= amplitude
if duration > 15.0: self.grasp_sys.arm.move_line(pos_a, pos_b, duration=0.8)
print(f"[Audio] Too long ({duration:.1f}s), truncating to 15s") self.grasp_sys.arm.move_line(pos_b, base_pos, duration=0.5)
audio_trimmed = audio_trimmed[:16000 * 15]
temp_file = "temp_voice.wav" self.current_pos = base_pos
wav.write(temp_file, 16000, (audio_trimmed * 32767).astype(np.int16))
segments, _ = self.ear.model.transcribe(
temp_file,
beam_size=5,
language="zh",
no_speech_threshold=0.5,
condition_on_previous_text=False, # prevents "向右向右向右..." hallucination loop
# i18n: domain hint for Whisper — Chinese robot command vocabulary
initial_prompt="机械臂控制指令:抓取,抬起,放下,松开,复位,点头,摇头,削笔刀,盒子,零件,瓶子,厘米,毫米,向上,向下,向左,向右,向前,向后"
)
text = "".join(s.text for s in segments)
return self._fix_recognition(text.strip())
def _fix_recognition(self, text):
"""Post-process ASR output: punctuation removal, homophone correction, dedup."""
if not text:
return text
text = re.sub(r'[,,。!?!?、;]', '', text)
# i18n: Chinese homophone correction table (Whisper mishearings → correct words)
replacements = {
'小笔刀': '削笔刀', '消笔刀': '削笔刀', '销笔刀': '削笔刀',
'零米': '厘米', '里米': '厘米', '黎米': '厘米', '离米': '厘米',
'公分': '厘米', '利米': '厘米',
'电头': '点头', '点投': '点头', '店头': '点头', '垫头': '点头',
'药头': '摇头', '要头': '摇头', '右头': '摇头', '咬头': '摇头', '摇土': '摇头',
}
for wrong, right in replacements.items():
text = text.replace(wrong, right)
# Detect and strip repeated-phrase hallucinations like "向右向右向右..."
dedup_match = re.match(r'^(.{2,8}?)(.{2,8}?)\2{2,}', text)
if dedup_match:
text = dedup_match.group(1)
print(f"[Dedup] Repeated hallucination stripped, kept: {text}")
if len(text) > 30:
words = re.findall(r'向[上下左右前后]', text)
if len(words) > 3:
first_match = re.search(r'(.*?向[上下左右前后].*?\d+.*?厘米)', text)
text = first_match.group(1) if first_match else text[:20]
print(f"[Dedup] Overlong text truncated to: {text}")
return text.strip()
def execute_command(self, cmd): def execute_command(self, cmd):
"""Dispatch a single parsed action command to the appropriate executor.""" """Dispatch a single parsed action command to the appropriate executor."""
@ -738,36 +655,12 @@ class RobotApp:
elif action == "nod": elif action == "nod":
print("[Nod] Executing nod motion") print("[Nod] Executing nod motion")
base_pos = self.current_pos.copy() self._oscillate('z', config.GESTURE_AMPLITUDE, config.GESTURE_CYCLES)
dist = 30.0 # 3 cm amplitude
self.grasp_sys.arm.servo_buffer.clear()
for _ in range(3):
up_pos = base_pos.copy(); up_pos[2] += dist
self.grasp_sys.arm.move_line(base_pos, up_pos, duration=0.5)
down_pos = base_pos.copy(); down_pos[2] -= dist
self.grasp_sys.arm.move_line(up_pos, down_pos, duration=0.8)
self.grasp_sys.arm.move_line(down_pos, base_pos, duration=0.5)
self.current_pos = base_pos
print("[OK] Nod complete") print("[OK] Nod complete")
elif action == "shake_head": elif action == "shake_head":
print("[ShakeHead] Executing shake-head motion") print("[ShakeHead] Executing shake-head motion")
base_pos = self.current_pos.copy() self._oscillate('y', config.GESTURE_AMPLITUDE, config.GESTURE_CYCLES)
dist = 30.0 # 3 cm amplitude
self.grasp_sys.arm.servo_buffer.clear()
for _ in range(3):
left_pos = base_pos.copy(); left_pos[1] += dist # +y = left
self.grasp_sys.arm.move_line(base_pos, left_pos, duration=0.5)
right_pos = base_pos.copy(); right_pos[1] -= dist
self.grasp_sys.arm.move_line(left_pos, right_pos, duration=0.8)
self.grasp_sys.arm.move_line(right_pos, base_pos, duration=0.5)
self.current_pos = base_pos
print("[OK] Shake-head complete") print("[OK] Shake-head complete")
else: else:

View file

@ -1,8 +1,11 @@
import re
import numpy as np import numpy as np
import scipy.io.wavfile as wav import scipy.io.wavfile as wav
import sounddevice as sd
from faster_whisper import WhisperModel from faster_whisper import WhisperModel
import config
class RobotEar: class RobotEar:
"""Speech recognition module backed by faster-whisper.""" """Speech recognition module backed by faster-whisper."""
@ -11,17 +14,85 @@ class RobotEar:
self.model = WhisperModel(model_size, device="cuda", compute_type="float16") self.model = WhisperModel(model_size, device="cuda", compute_type="float16")
self.fs = 16000 self.fs = 16000
def get_text(self, audio_data): def get_text(self, audio_frames):
"""Transcribe audio frames to text. """Transcribe audio frames to text with silence trimming and duration guards.
Args: Args:
audio_data: list of numpy arrays captured from sounddevice InputStream. audio_frames: list of numpy arrays captured from sounddevice InputStream.
Returns: Returns:
Transcribed string (stripped). Transcribed string (stripped), or "" if audio is empty/too short/silent.
""" """
if not audio_frames:
return ""
audio_data = np.concatenate(audio_frames, axis=0)
audio_flat = audio_data.flatten()
# Trim leading/trailing silence to reduce Whisper hallucinations
nonzero = np.where(np.abs(audio_flat) > config.AUDIO_SILENCE_THRESHOLD)[0]
if len(nonzero) == 0:
print("[Audio] No speech detected")
return ""
margin = int(self.fs * config.AUDIO_SILENCE_MARGIN)
start = max(0, nonzero[0] - margin)
end = min(len(audio_flat), nonzero[-1] + margin)
audio_trimmed = audio_flat[start:end]
duration = len(audio_trimmed) / self.fs
if duration < config.AUDIO_MIN_DURATION:
print(f"[Audio] Too short ({duration:.1f}s), skipping")
return ""
if duration > config.AUDIO_MAX_DURATION:
print(f"[Audio] Too long ({duration:.1f}s), truncating to {config.AUDIO_MAX_DURATION:.0f}s")
audio_trimmed = audio_trimmed[:int(self.fs * config.AUDIO_MAX_DURATION)]
temp_file = "temp_voice.wav" temp_file = "temp_voice.wav"
audio_np = np.concatenate(audio_data, axis=0) wav.write(temp_file, self.fs, (audio_trimmed * 32767).astype(np.int16))
wav.write(temp_file, self.fs, (audio_np * 32767).astype(np.int16))
segments, _ = self.model.transcribe(temp_file, beam_size=5, language="zh") segments, _ = self.model.transcribe(
return "".join(s.text for s in segments).strip() temp_file,
beam_size=5,
language="zh",
no_speech_threshold=0.5,
condition_on_previous_text=False, # prevents "向右向右向右..." hallucination loop
# i18n: domain hint for Whisper — Chinese robot command vocabulary
initial_prompt="机械臂控制指令:抓取,抬起,放下,松开,复位,点头,摇头,削笔刀,盒子,零件,瓶子,厘米,毫米,向上,向下,向左,向右,向前,向后"
)
text = "".join(s.text for s in segments)
return self._fix_recognition(text.strip())
def _fix_recognition(self, text):
"""Post-process ASR output: punctuation removal, homophone correction, dedup."""
if not text:
return text
text = re.sub(r'[,,。!?!?、;]', '', text)
# i18n: Chinese homophone correction table (Whisper mishearings → correct words)
replacements = {
'小笔刀': '削笔刀', '消笔刀': '削笔刀', '销笔刀': '削笔刀',
'零米': '厘米', '里米': '厘米', '黎米': '厘米', '离米': '厘米',
'公分': '厘米', '利米': '厘米',
'电头': '点头', '点投': '点头', '店头': '点头', '垫头': '点头',
'药头': '摇头', '要头': '摇头', '右头': '摇头', '咬头': '摇头', '摇土': '摇头',
}
for wrong, right in replacements.items():
text = text.replace(wrong, right)
# Detect and strip repeated-phrase hallucinations like "向右向右向右..."
dedup_match = re.match(r'^(.{2,8}?)(.{2,8}?)\2{2,}', text)
if dedup_match:
text = dedup_match.group(1)
print(f"[Dedup] Repeated hallucination stripped, kept: {text}")
if len(text) > 30:
words = re.findall(r'向[上下左右前后]', text)
if len(words) > 3:
first_match = re.search(r'(.*?向[上下左右前后].*?\d+.*?厘米)', text)
text = first_match.group(1) if first_match else text[:20]
print(f"[Dedup] Overlong text truncated to: {text}")
return text.strip()