6 机械臂使用场景案例
本章节呈现了经典的机械臂使用案例,以展示产品在富有代表性的场景中的应用。这包括了机械臂在不同领域的典型应用,突显了产品的多功能性和适用性。通过这些案例,用户可以深入了解机械臂在实际应用中的灵活性和效能,为他们在特定场景中的应用提供参考。
移动抓取木块案例
相关依赖文件可在 https://github.com/elephantrobotics/mercury_demo 下载
import time
from pymycobot import Mercury
from uvc_camera import UVCCamera
from marker_utils import *
import stag
# 夹爪工具长度
Tool_LEN = 175
# 相机中心到法兰距离
Camera_LEN = 78
np.set_printoptions(suppress=True, formatter={'float_kind': '{:.2f}'.format})
# 相机配置文件
camera_params = np.load("src/camera_params.npz")
mtx, dist = camera_params["mtx"], camera_params["dist"]
# 二维码大小
MARKER_SIZE = 32
# 设置左臂端口
ml = Mercury("/dev/left_arm")
# 将旋转矩阵转为欧拉角
def CvtRotationMatrixToEulerAngle(pdtRotationMatrix):
pdtEulerAngle = np.zeros(3)
pdtEulerAngle[2] = np.arctan2(pdtRotationMatrix[1, 0], pdtRotationMatrix[0, 0])
fCosRoll = np.cos(pdtEulerAngle[2])
fSinRoll = np.sin(pdtEulerAngle[2])
pdtEulerAngle[1] = np.arctan2(-pdtRotationMatrix[2, 0],
(fCosRoll * pdtRotationMatrix[0, 0]) + (fSinRoll * pdtRotationMatrix[1, 0]))
pdtEulerAngle[0] = np.arctan2((fSinRoll * pdtRotationMatrix[0, 2]) - (fCosRoll * pdtRotationMatrix[1, 2]),
(-fSinRoll * pdtRotationMatrix[0, 1]) + (fCosRoll * pdtRotationMatrix[1, 1]))
return pdtEulerAngle
# 将欧拉角转为旋转矩阵
def CvtEulerAngleToRotationMatrix(ptrEulerAngle):
ptrSinAngle = np.sin(ptrEulerAngle)
ptrCosAngle = np.cos(ptrEulerAngle)
ptrRotationMatrix = np.zeros((3, 3))
ptrRotationMatrix[0, 0] = ptrCosAngle[2] * ptrCosAngle[1]
ptrRotationMatrix[0, 1] = ptrCosAngle[2] * ptrSinAngle[1] * ptrSinAngle[0] - ptrSinAngle[2] * ptrCosAngle[0]
ptrRotationMatrix[0, 2] = ptrCosAngle[2] * ptrSinAngle[1] * ptrCosAngle[0] + ptrSinAngle[2] * ptrSinAngle[0]
ptrRotationMatrix[1, 0] = ptrSinAngle[2] * ptrCosAngle[1]
ptrRotationMatrix[1, 1] = ptrSinAngle[2] * ptrSinAngle[1] * ptrSinAngle[0] + ptrCosAngle[2] * ptrCosAngle[0]
ptrRotationMatrix[1, 2] = ptrSinAngle[2] * ptrSinAngle[1] * ptrCosAngle[0] - ptrCosAngle[2] * ptrSinAngle[0]
ptrRotationMatrix[2, 0] = -ptrSinAngle[1]
ptrRotationMatrix[2, 1] = ptrCosAngle[1] * ptrSinAngle[0]
ptrRotationMatrix[2, 2] = ptrCosAngle[1] * ptrCosAngle[0]
return ptrRotationMatrix
# 坐标转换为齐次变换矩阵,(x,y,z,rx,ry,rz)单位rad
def Transformation_matrix(coord):
position_robot = coord[:3]
pose_robot = coord[3:]
# 将欧拉角转为旋转矩阵
RBT = CvtEulerAngleToRotationMatrix(pose_robot)
PBT = np.array([[position_robot[0]],
[position_robot[1]],
[position_robot[2]]])
temp = np.concatenate((RBT, PBT), axis=1)
array_1x4 = np.array([[0, 0, 0, 1]])
# 将两个数组按行拼接起来
matrix = np.concatenate((temp, array_1x4), axis=0)
return matrix
def Eyes_in_hand_left(coord, camera):
# 相机坐标
Position_Camera = np.transpose(camera[:3])
# 机械臂坐标矩阵
Matrix_BT = Transformation_matrix(coord)
# 手眼矩阵
Matrix_TC = np.array([[0, -1, 0, Camera_LEN],
[1, 0, 0, 0],
[0, 0, 1, -Tool_LEN],
[0, 0, 0, 1]])
# 物体坐标(相机系)
Position_Camera = np.append(Position_Camera, 1)
# 物体坐标(基坐标系)
Position_B = Matrix_BT @ Matrix_TC @ Position_Camera
return Position_B
# 等待机械臂运行结束
def waitl():
time.sleep(0.2)
while (ml.is_moving()):
time.sleep(0.03)
# 获取物体坐标(相机系)
def calc_markers_base_position(corners: NDArray, ids: T.List, marker_size: int, mtx: NDArray, dist: NDArray) -> T.List:
if len(corners) == 0:
return []
# 通过二维码角点获取物体旋转向量和平移向量
rvecs, tvecs = solve_marker_pnp(corners, marker_size, mtx, dist)
for i, tvec, rvec in zip(ids, tvecs, rvecs):
tvec = tvec.squeeze().tolist()
rvec = rvec.squeeze().tolist()
rotvector = np.array([[rvec[0], rvec[1], rvec[2]]])
# 将旋转向量转为旋转矩阵
Rotation = cv2.Rodrigues(rotvector)[0]
# 将旋转矩阵转为欧拉角
Euler = CvtRotationMatrixToEulerAngle(Rotation)
# 物体坐标(相机系)
target_coords = np.array([tvec[0], tvec[1], tvec[2], Euler[0], Euler[1], Euler[2]])
return target_coords
if __name__ == "__main__":
# 设置摄像头id
camera = UVCCamera(5, mtx, dist)
# 打开摄像头
camera.capture()
# 设置左臂观察点
origin_anglesL = [-44.24, 15.56, 0.0, -102.59, 65.28, 52.06, 23.49]
# 设置夹爪运动模式
ml.set_gripper_mode(0)
# 设置工具坐标系
ml.set_tool_reference([0, 0, Tool_LEN, 0, 0, 0])
# 将末端坐标系设置为工具
ml.set_end_type(1)
# 设置移动速度
sp = 40
# 移动到观测点
ml.send_angles(origin_anglesL, sp)
# 等待机械臂运动结束
waitl()
# 刷新相机界面
camera.update_frame()
# 获取当前帧
frame = camera.color_frame()
# 获取画面中二维码的角度和id
(corners, ids, rejected_corners) = stag.detectMarkers(frame, 11)
# 获取物的坐标(相机系)
marker_pos_pack = calc_markers_base_position(corners, ids, MARKER_SIZE, mtx, dist)
# 获取机械臂当前坐标
cur_coords = np.array(ml.get_base_coords())
# 将角度值转为弧度值
cur_bcl = cur_coords.copy()
cur_bcl[-3:] *= (np.pi / 180)
# 通过矩阵变化将物体坐标(相机系)转成(基坐标系)
fact_bcl = Eyes_in_hand_left(cur_bcl, marker_pos_pack)
target_coords = cur_coords.copy()
target_coords[0] = fact_bcl[0]
target_coords[1] = fact_bcl[1]
target_coords[2] = fact_bcl[2] + 50
# 机械臂移动到二维码上方
ml.send_base_coords(target_coords, 30)
# 等待机械臂运动结束
waitl()
# 打开夹爪
ml.set_gripper_value(100, 100)
# 机械臂沿z轴向下移动
ml.send_base_coord(3, fact_bcl[2], 10)
# 等待机械臂运动结束
waitl()
# 闭合夹爪
ml.set_gripper_value(20, 100)
# 等待夹爪闭合
time.sleep(2)
# 抬起夹爪
ml.send_base_coord(3, fact_bcl[2] + 50, 10)
脚本及手套控制灵巧手案例
案例1:脚本控制灵巧手代码
import time
from pymycobot import Mercury
# 连接左臂(或手)
ml = Mercury("/dev/left_arm")
ml.set_movement_type(3)
ml.set_vr_mode(1)
# 手指姿态(开/闭)
banana = [
[0, 10000, 10000, 10000, 10000, 65535], # 手张开
[31000, 25000, 26000, 24000, 24000, 65535], # 手握拳
[0, 10000, 10000, 10000, 10000, 65535] # 再张开
]
class Command:
ID = 2
Code = 16
Address_High = 4
Address_LOW = 111
Len = 12
Number_High = 0
Number_LOW = 6
Value_High = 0
Value_LOW = 0
Joint_High = 0
Joint_LOW = 0
cmd_list = [ID, Code, Address_High, Address_LOW, Number_High, Number_LOW, Len,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
]
cmd_list = Command().cmd_list
def byte_deal(*args):
result = []
for value in args:
high_byte = (value >> 8) & 0xFF
low_byte = value & 0xFF
result.extend([high_byte, low_byte])
return result
def crc16_modbus(data: bytes) -> bytes:
crc = 0xFFFF
for pos in data:
crc ^= pos
for _ in range(8):
if (crc & 0x0001) != 0:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc.to_bytes(2, byteorder='little')
def set_hand_joints_value(value, a1=None):
tmp = []
for i in range(len(value)):
tmp.extend(byte_deal(value[i]))
for i in range(len(tmp)):
cmd_list[i + 7] = tmp[i]
send_data = bytes(cmd_list) + crc16_modbus(cmd_list)
send_data = send_data.hex().upper()
hex_list = [int(send_data[i:i + 2], 16) for i in range(0, len(send_data), 2)]
if a1 is not None:
a1.tool_serial_write_data(hex_list)
else:
print("发送指令:", send_data)
# 简单循环测试张开与闭合
def main():
print("开始测试手指动作...")
while True:
set_hand_joints_value(banana[0], ml)
print("手张开")
time.sleep(2)
set_hand_joints_value(banana[1], ml)
print("手握拳")
time.sleep(2)
if __name__ == "__main__":
main()
案例2:手套控制灵巧手代码
import asyncio
import platform
from time import time
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
import numpy as np
import pyZlBus2 as zlb
from pymycobot import Mercury
# BLE UUID
CmdCtrl_WriteNotify_UUID = "AEC91001-6E7A-4BC2-9A4C-4CDA7A728F58"
UploadData_Notify_UUID = "AEC91002-6E7A-4BC2-9A4C-4CDA7A728F58"
TxData_Notify_UUID = "AEC91003-6E7A-4BC2-9A4C-4CDA7A728F58"
DEBUG = 0
class ZlBusSdk:
def __init__(self, advNameStr: str = '', searchStr: str = 'zl', bleScannerPass: bool = False):
if platform.system() == 'Windows':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
self.advNameStr = advNameStr
self.searchStr = searchStr
self.bFound = False
self.bQuit = False
self.blueAddr = ""
self.findAddr = {}
self.client = None
self.scannerPass = bleScannerPass
self.pkt = zlb.ZlBusUnPack(tracker_nums=1, user_id=0xFF, fifoMaxSize=10)
self.pkt.setFlowIdFormat(zlb.api.e_TK_FlowIdFormat.TK_FLOW_ID_FORMAT_8)
self.notification_handler = self.default_notification_handler
def detection_callback(self, device, advertisement_data):
if self.advNameStr != '':
if device.name == self.advNameStr:
self.blueAddr = device.address
self.bFound = True
elif device and device.name and (
device.name.startswith(self.searchStr) or device.name.startswith(self.searchStr.upper())):
if device.address not in self.findAddr:
self.findAddr[device.address] = device.name
print(device.name, " | ", device.address)
if not self.bFound:
self.blueAddr = device.address
self.bFound = True
def default_notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
if DEBUG:
print("low level rev data:", data)
async def find(self, timeout: float = 8.0):
try:
print(' 搜索Ble设备...')
self.findAddr.clear()
self.bFound = False
self.client = None
endTime = time() + timeout
async with BleakScanner(detection_callback=self.detection_callback) as scanner:
if platform.system() == 'Windows' and self.scannerPass == False:
await scanner.start()
while not self.bFound and time() < endTime:
await asyncio.sleep(0.1)
if self.bFound:
print(f'搜索到设备 MacAddress: {self.blueAddr}')
else:
print('搜索Ble设备超时 [Error]')
print('未搜索到设备 [Error]')
except Exception as e:
print(f'搜索Ble设备异常 [Error] -> {e}')
async def find_multiple(self, timeout: float = 8.0):
"""搜索多个设备"""
try:
print(' 搜索所有Ble设备...')
self.findAddr.clear()
self.bFound = False
self.client = None
endTime = time() + timeout
async with BleakScanner(detection_callback=self.detection_callback) as scanner:
if platform.system() == 'Windows' and self.scannerPass == False:
await scanner.start()
while time() < endTime:
await asyncio.sleep(0.1)
if len(self.findAddr) > 0:
print(f'搜索到 {len(self.findAddr)} 个设备')
return self.findAddr
else:
print('未搜索到设备 [Error]')
return {}
except Exception as e:
print(f'搜索Ble设备异常 [Error] -> {e}')
return {}
async def listenData(self):
try:
print(f' 连接BLE设备 {self.blueAddr}...')
async with BleakClient(self.blueAddr) as self.client:
print(f' 连接BLE设备成功: {self.blueAddr}')
await asyncio.sleep(0.2)
await self.client.start_notify(UploadData_Notify_UUID, self.notification_handler)
await asyncio.sleep(0.1)
await self.client.start_notify(CmdCtrl_WriteNotify_UUID, self.notification_handler)
await asyncio.sleep(0.1)
await self.client.start_notify(TxData_Notify_UUID, self.notification_handler)
await asyncio.sleep(0.5)
await self.client.write_gatt_char(CmdCtrl_WriteNotify_UUID, zlb.api.ul_modifyDataFormat(
zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_TIME |
zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_QUATERNION |
zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_ACC |
zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_GYRO |
zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_LIN_ACC |
zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_ADCx), response=True)
await asyncio.sleep(0.1)
await self.client.write_gatt_char(CmdCtrl_WriteNotify_UUID, zlb.api.ul_getDataFormat(), response=True)
await asyncio.sleep(0.1)
await self.client.write_gatt_char(CmdCtrl_WriteNotify_UUID, zlb.api.hl_configOutDataPort(
zlb.api.e_DataOutPort.TK_RF_PORT), response=True)
await asyncio.sleep(0.1)
print(f"蓝牙设备监听中: {self.blueAddr}")
while not self.bQuit:
await asyncio.sleep(0.1)
except Exception as e:
print(f'连接BLE设备异常 [Error] -> {e}')
def stop(self):
self.bQuit = True
# ====== 机械手部分 ======
class Command():
ID = 2
Code = 16
Address_High = 4
Address_LOW = 111
Len = 12
Number_High = 0
Number_LOW = 6
Value_High = 0
Value_LOW = 0
Joint_High = 0
Joint_LOW = 0
cmd_list = [ID, Code, Address_High, Address_LOW, Number_High, Number_LOW, Len,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
]
def byte_deal(*args):
result = []
for value in args:
high_byte = (value >> 8) & 0xFF
low_byte = value & 0xFF
result.extend([high_byte, low_byte])
return result
def crc16_modbus(data: bytes) -> bytes:
crc = 0xFFFF
for pos in data:
crc ^= pos
for _ in range(8):
if (crc & 0x0001) != 0:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc.to_bytes(2, byteorder='little')
def set_hand_joints_value(value, robot_hand):
cmd_list = Command().cmd_list.copy()
tmp = []
for i in range(len(value)):
tmp.extend(byte_deal(value[i]))
for i in range(len(tmp)):
cmd_list[i + 7] = tmp[i]
send_data = bytes(cmd_list) + crc16_modbus(cmd_list)
send_data = send_data.hex().upper()
hex_list = [int(send_data[i:i + 2], 16) for i in range(0, len(send_data), 2)]
if robot_hand is not None:
robot_hand.tool_serial_write_data(hex_list)
else:
print(send_data)
# ====== 控制器主类 ======
class GloveToRobotHandController:
def __init__(self, glove_mac_address: str, hand_type: str = 'left'):
"""
hand_type: 'left' 或 'right'
"""
self.hand_type = hand_type
self.glove_sdk = ZlBusSdk('', 'zl', True)
self.glove_sdk.blueAddr = glove_mac_address
self.glove_sdk.bFound = True
# 根据手的类型初始化机械手
if hand_type == 'left':
self.robot_hand = Mercury("/dev/left_arm")
print(f"初始化左手机械臂: /dev/left_arm")
else:
self.robot_hand = Mercury("/dev/right_arm")
print(f"初始化右手机械臂: /dev/right_arm")
self.robot_hand.set_movement_type(1)
self.robot_hand.set_vr_mode(1)
self.current_glove_values = [0] * 6
self.glove_open_ref = [0] * 6
self.glove_close_ref = [100] * 6
self.robot_open_values = [0, 0, 0, 0, 0, 0]
self.robot_close_values = [65535, 65535, 65535, 65535, 65535, 2000]
self._sample_hook = None
self.data_queue = asyncio.Queue(maxsize=50)
def on_glove_data_received(self, ant_values):
if hasattr(self, "_sample_hook") and self._sample_hook is not None:
self._sample_hook(ant_values)
return
try:
self.data_queue.put_nowait(ant_values)
except asyncio.QueueFull:
try:
self.data_queue.get_nowait()
except asyncio.QueueEmpty:
pass
self.data_queue.put_nowait(ant_values)
async def process_data_loop(self):
while True:
ant_values = await self.data_queue.get()
while not self.data_queue.empty():
ant_values = self.data_queue.get_nowait()
self.current_glove_values = ant_values[:6]
robot_hand_values = self.map_glove_to_robot()
self.control_robot_hand(robot_hand_values)
def map_glove_to_robot(self):
mapped_values = []
for i in range(6):
glove_min = min(self.glove_open_ref[i], self.glove_close_ref[i])
glove_max = max(self.glove_open_ref[i], self.glove_close_ref[i])
if glove_max == glove_min:
ratio = 0.0
else:
ratio = (self.current_glove_values[i] - glove_min) / (glove_max - glove_min)
ratio = max(0.0, min(1.0, ratio))
robot_min = min(self.robot_open_values[i], self.robot_close_values[i])
robot_max = max(self.robot_open_values[i], self.robot_close_values[i])
robot_value = int(robot_min + ratio * (robot_max - robot_min))
mapped_values.append(robot_value)
return mapped_values
def control_robot_hand(self, values):
print(f"[{self.hand_type.upper()}] 发命令到机械手", values, time())
try:
set_hand_joints_value(values, self.robot_hand)
except Exception as e:
pass
async def calibrate(self):
print(f"\n========== 开始校准 {self.hand_type.upper()} 手 ==========")
# 张开手采样
print(f"请张开 {self.hand_type.upper()} 手掌,2秒后将自动记录张开值,请保持动作!")
await asyncio.sleep(2)
open_samples = []
def open_sample_hook(ant_values):
if len(ant_values) == 6:
open_samples.append(list(ant_values))
self._sample_hook = open_sample_hook
await asyncio.sleep(2)
self._sample_hook = None
if open_samples:
avg = np.mean(open_samples, axis=0)
self.glove_open_ref = [int(x) for x in avg]
print(f"[{self.hand_type.upper()}] 已记录张开手掌参考值:", self.glove_open_ref)
else:
print(f"[{self.hand_type.upper()}] 未采集到张开手数据,校准失败。")
return False
# 握紧手采样
print(f"请握紧 {self.hand_type.upper()} 拳头,2秒后将自动记录握紧值,请保持动作!")
await asyncio.sleep(2)
close_samples = []
def close_sample_hook(ant_values):
if len(ant_values) == 6:
close_samples.append(list(ant_values))
self._sample_hook = close_sample_hook
await asyncio.sleep(2)
self._sample_hook = None
if close_samples:
avg = np.mean(close_samples, axis=0)
self.glove_close_ref = [int(x) for x in avg]
print(f"[{self.hand_type.upper()}] 已记录握紧拳头参考值:", self.glove_close_ref)
else:
print(f"[{self.hand_type.upper()}] 未采集到握紧手数据,校准失败。")
return False
print(f"[{self.hand_type.upper()}] 校准完成!\n")
return True
def stop(self):
self.glove_sdk.stop()
async def main():
# ====== 1. 搜索所有设备 ======
print("=" * 60)
print("开始搜索BLE手套设备...")
print("=" * 60)
scanner = ZlBusSdk('', 'zl', True)
devices = await scanner.find_multiple(timeout=10.0)
if len(devices) < 2:
print(f"错误:只找到 {len(devices)} 个设备,需要至少2个设备(左手和右手)")
if len(devices) == 1:
print("提示:请确保两个手套都已开机并处于可连接状态")
return
# ====== 2. 让用户选择左右手设备 ======
print("\n找到以下设备:")
device_list = list(devices.items())
for idx, (addr, name) in enumerate(device_list):
print(f" [{idx + 1}] {name} | {addr}")
print("\n请选择左手设备 (输入序号):")
left_idx = int(input("左手设备序号: ")) - 1
left_mac = device_list[left_idx][0]
left_name = device_list[left_idx][1]
print("\n请选择右手设备 (输入序号):")
right_idx = int(input("右手设备序号: ")) - 1
right_mac = device_list[right_idx][0]
right_name = device_list[right_idx][1]
print(f"\n已选择:")
print(f" 左手: {left_name} ({left_mac})")
print(f" 右手: {right_name} ({right_mac})")
# ====== 3. 创建左右手控制器 ======
left_controller = GloveToRobotHandController(left_mac, 'left')
right_controller = GloveToRobotHandController(right_mac, 'right')
# ====== 4. 设置BLE回调 ======
def create_callback(controller):
def callback(characteristic, data):
if len(data) >= 14 and data[0] == 0xAA and data[1] == 0x15:
ant_values = []
for i in range(6):
start = 12 + i * 2
if start + 2 <= len(data):
val = int.from_bytes(data[start:start + 2], byteorder='little', signed=False)
ant_values.append(val)
else:
ant_values.append(0)
controller.on_glove_data_received(ant_values)
return callback
left_controller.glove_sdk.notification_handler = create_callback(left_controller)
right_controller.glove_sdk.notification_handler = create_callback(right_controller)
# ====== 5. 开启BLE监听(异步同时启动) ======
print("\n正在连接BLE设备...")
left_ble_task = asyncio.create_task(left_controller.glove_sdk.listenData())
right_ble_task = asyncio.create_task(right_controller.glove_sdk.listenData())
# 等待连接建立
await asyncio.sleep(2)
# ====== 6. 依次校准左手、右手 ======
print("\n" + "=" * 60)
print("开始校准流程")
print("=" * 60)
# 先校准左手
left_success = await left_controller.calibrate()
if not left_success:
print("左手校准失败,程序退出")
left_controller.stop()
right_controller.stop()
return
# 再校准右手
right_success = await right_controller.calibrate()
if not right_success:
print("右手校准失败,程序退出")
left_controller.stop()
right_controller.stop()
return
# ====== 7. 启动数据处理循环(异步同时处理) ======
print("\n" + "=" * 60)
print("校准完成,开始实时控制双手机械臂")
print("=" * 60)
left_process_task = asyncio.create_task(left_controller.process_data_loop())
right_process_task = asyncio.create_task(right_controller.process_data_loop())
print("正在实时映射手套到机械手动作,按 Ctrl+C 退出")
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n程序被用户中断")
finally:
left_controller.stop()
right_controller.stop()
print("程序已退出")
if __name__ == '__main__':
asyncio.run(main())