6 机械臂使用场景案例

本章节呈现了经典的机械臂使用案例,以展示产品在富有代表性的场景中的应用。这包括了机械臂在不同领域的典型应用,突显了产品的多功能性和适用性。通过这些案例,用户可以深入了解机械臂在实际应用中的灵活性和效能,为他们在特定场景中的应用提供参考。

移动抓取木块案例

相关依赖文件可在 https://github.com/elephantrobotics/mercury_demo 下载

import time
from pymycobot import Mercury
from uvc_camera import UVCCamera
from marker_utils import *
import stag

# 夹爪工具长度
Tool_LEN = 175
# 相机中心到法兰距离
Camera_LEN = 78
np.set_printoptions(suppress=True, formatter={'float_kind': '{:.2f}'.format})
# 相机配置文件
camera_params = np.load("src/camera_params.npz")
mtx, dist = camera_params["mtx"], camera_params["dist"]
# 二维码大小
MARKER_SIZE = 32
# 设置左臂端口
ml = Mercury("/dev/left_arm")


# 将旋转矩阵转为欧拉角
def CvtRotationMatrixToEulerAngle(pdtRotationMatrix):
    pdtEulerAngle = np.zeros(3)
    pdtEulerAngle[2] = np.arctan2(pdtRotationMatrix[1, 0], pdtRotationMatrix[0, 0])
    fCosRoll = np.cos(pdtEulerAngle[2])
    fSinRoll = np.sin(pdtEulerAngle[2])
    pdtEulerAngle[1] = np.arctan2(-pdtRotationMatrix[2, 0],
                                  (fCosRoll * pdtRotationMatrix[0, 0]) + (fSinRoll * pdtRotationMatrix[1, 0]))
    pdtEulerAngle[0] = np.arctan2((fSinRoll * pdtRotationMatrix[0, 2]) - (fCosRoll * pdtRotationMatrix[1, 2]),
                                  (-fSinRoll * pdtRotationMatrix[0, 1]) + (fCosRoll * pdtRotationMatrix[1, 1]))
    return pdtEulerAngle


# 将欧拉角转为旋转矩阵
def CvtEulerAngleToRotationMatrix(ptrEulerAngle):
    ptrSinAngle = np.sin(ptrEulerAngle)
    ptrCosAngle = np.cos(ptrEulerAngle)
    ptrRotationMatrix = np.zeros((3, 3))
    ptrRotationMatrix[0, 0] = ptrCosAngle[2] * ptrCosAngle[1]
    ptrRotationMatrix[0, 1] = ptrCosAngle[2] * ptrSinAngle[1] * ptrSinAngle[0] - ptrSinAngle[2] * ptrCosAngle[0]
    ptrRotationMatrix[0, 2] = ptrCosAngle[2] * ptrSinAngle[1] * ptrCosAngle[0] + ptrSinAngle[2] * ptrSinAngle[0]
    ptrRotationMatrix[1, 0] = ptrSinAngle[2] * ptrCosAngle[1]
    ptrRotationMatrix[1, 1] = ptrSinAngle[2] * ptrSinAngle[1] * ptrSinAngle[0] + ptrCosAngle[2] * ptrCosAngle[0]
    ptrRotationMatrix[1, 2] = ptrSinAngle[2] * ptrSinAngle[1] * ptrCosAngle[0] - ptrCosAngle[2] * ptrSinAngle[0]
    ptrRotationMatrix[2, 0] = -ptrSinAngle[1]
    ptrRotationMatrix[2, 1] = ptrCosAngle[1] * ptrSinAngle[0]
    ptrRotationMatrix[2, 2] = ptrCosAngle[1] * ptrCosAngle[0]
    return ptrRotationMatrix


# 坐标转换为齐次变换矩阵,(x,y,z,rx,ry,rz)单位rad
def Transformation_matrix(coord):
    position_robot = coord[:3]
    pose_robot = coord[3:]
    # 将欧拉角转为旋转矩阵
    RBT = CvtEulerAngleToRotationMatrix(pose_robot)
    PBT = np.array([[position_robot[0]],
                    [position_robot[1]],
                    [position_robot[2]]])
    temp = np.concatenate((RBT, PBT), axis=1)
    array_1x4 = np.array([[0, 0, 0, 1]])
    # 将两个数组按行拼接起来
    matrix = np.concatenate((temp, array_1x4), axis=0)
    return matrix


def Eyes_in_hand_left(coord, camera):
    # 相机坐标
    Position_Camera = np.transpose(camera[:3])
    # 机械臂坐标矩阵
    Matrix_BT = Transformation_matrix(coord)
    # 手眼矩阵
    Matrix_TC = np.array([[0, -1, 0, Camera_LEN],
                          [1, 0, 0, 0],
                          [0, 0, 1, -Tool_LEN],
                          [0, 0, 0, 1]])
    # 物体坐标(相机系)
    Position_Camera = np.append(Position_Camera, 1)
    # 物体坐标(基坐标系)
    Position_B = Matrix_BT @ Matrix_TC @ Position_Camera
    return Position_B


# 等待机械臂运行结束
def waitl():
    time.sleep(0.2)
    while (ml.is_moving()):
        time.sleep(0.03)


# 获取物体坐标(相机系)
def calc_markers_base_position(corners: NDArray, ids: T.List, marker_size: int, mtx: NDArray, dist: NDArray) -> T.List:
    if len(corners) == 0:
        return []
    # 通过二维码角点获取物体旋转向量和平移向量
    rvecs, tvecs = solve_marker_pnp(corners, marker_size, mtx, dist)
    for i, tvec, rvec in zip(ids, tvecs, rvecs):
        tvec = tvec.squeeze().tolist()
        rvec = rvec.squeeze().tolist()
        rotvector = np.array([[rvec[0], rvec[1], rvec[2]]])
        # 将旋转向量转为旋转矩阵
        Rotation = cv2.Rodrigues(rotvector)[0]
        # 将旋转矩阵转为欧拉角
        Euler = CvtRotationMatrixToEulerAngle(Rotation)
        # 物体坐标(相机系)
        target_coords = np.array([tvec[0], tvec[1], tvec[2], Euler[0], Euler[1], Euler[2]])
    return target_coords


if __name__ == "__main__":
    # 设置摄像头id
    camera = UVCCamera(5, mtx, dist)
    # 打开摄像头
    camera.capture()
    # 设置左臂观察点
    origin_anglesL = [-44.24, 15.56, 0.0, -102.59, 65.28, 52.06, 23.49]
    # 设置夹爪运动模式
    ml.set_gripper_mode(0)
    # 设置工具坐标系
    ml.set_tool_reference([0, 0, Tool_LEN, 0, 0, 0])
    # 将末端坐标系设置为工具
    ml.set_end_type(1)
    # 设置移动速度
    sp = 40
    # 移动到观测点
    ml.send_angles(origin_anglesL, sp)
    # 等待机械臂运动结束
    waitl()
    # 刷新相机界面
    camera.update_frame()
    # 获取当前帧
    frame = camera.color_frame()
    # 获取画面中二维码的角度和id
    (corners, ids, rejected_corners) = stag.detectMarkers(frame, 11)
    # 获取物的坐标(相机系)
    marker_pos_pack = calc_markers_base_position(corners, ids, MARKER_SIZE, mtx, dist)
    # 获取机械臂当前坐标
    cur_coords = np.array(ml.get_base_coords())
    # 将角度值转为弧度值
    cur_bcl = cur_coords.copy()
    cur_bcl[-3:] *= (np.pi / 180)
    # 通过矩阵变化将物体坐标(相机系)转成(基坐标系)
    fact_bcl = Eyes_in_hand_left(cur_bcl, marker_pos_pack)
    target_coords = cur_coords.copy()
    target_coords[0] = fact_bcl[0]
    target_coords[1] = fact_bcl[1]
    target_coords[2] = fact_bcl[2] + 50
    # 机械臂移动到二维码上方
    ml.send_base_coords(target_coords, 30)
    # 等待机械臂运动结束
    waitl()
    # 打开夹爪
    ml.set_gripper_value(100, 100)
    # 机械臂沿z轴向下移动
    ml.send_base_coord(3, fact_bcl[2], 10)
    # 等待机械臂运动结束
    waitl()
    # 闭合夹爪
    ml.set_gripper_value(20, 100)
    # 等待夹爪闭合
    time.sleep(2)
    # 抬起夹爪
    ml.send_base_coord(3, fact_bcl[2] + 50, 10)

脚本及手套控制灵巧手案例

案例1:脚本控制灵巧手代码

import time
from pymycobot import Mercury

# 连接左臂(或手)
ml = Mercury("/dev/left_arm")
ml.set_movement_type(3)
ml.set_vr_mode(1)

# 手指姿态(开/闭)
banana = [
    [0, 10000, 10000, 10000, 10000, 65535],       # 手张开
    [31000, 25000, 26000, 24000, 24000, 65535],   # 手握拳
    [0, 10000, 10000, 10000, 10000, 65535]        # 再张开
]

class Command:
    ID = 2
    Code = 16
    Address_High = 4
    Address_LOW = 111
    Len = 12
    Number_High = 0
    Number_LOW = 6
    Value_High = 0
    Value_LOW = 0
    Joint_High = 0
    Joint_LOW = 0
    cmd_list = [ID, Code, Address_High, Address_LOW, Number_High, Number_LOW, Len,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                ]

cmd_list = Command().cmd_list

def byte_deal(*args):
    result = []
    for value in args:
        high_byte = (value >> 8) & 0xFF
        low_byte = value & 0xFF
        result.extend([high_byte, low_byte])
    return result

def crc16_modbus(data: bytes) -> bytes:
    crc = 0xFFFF
    for pos in data:
        crc ^= pos
        for _ in range(8):
            if (crc & 0x0001) != 0:
                crc >>= 1
                crc ^= 0xA001
            else:
                crc >>= 1
    return crc.to_bytes(2, byteorder='little')

def set_hand_joints_value(value, a1=None):
    tmp = []
    for i in range(len(value)):
        tmp.extend(byte_deal(value[i]))
    for i in range(len(tmp)):
        cmd_list[i + 7] = tmp[i]
    send_data = bytes(cmd_list) + crc16_modbus(cmd_list)
    send_data = send_data.hex().upper()
    hex_list = [int(send_data[i:i + 2], 16) for i in range(0, len(send_data), 2)]
    if a1 is not None:
        a1.tool_serial_write_data(hex_list)
    else:
        print("发送指令:", send_data)

# 简单循环测试张开与闭合
def main():
    print("开始测试手指动作...")
    while True:
        set_hand_joints_value(banana[0], ml)
        print("手张开")
        time.sleep(2)
        set_hand_joints_value(banana[1], ml)
        print("手握拳")
        time.sleep(2)

if __name__ == "__main__":
    main()

案例2:手套控制灵巧手代码

import asyncio
import platform
from time import time
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
import numpy as np
import pyZlBus2 as zlb
from pymycobot import Mercury

# BLE UUID
CmdCtrl_WriteNotify_UUID = "AEC91001-6E7A-4BC2-9A4C-4CDA7A728F58"
UploadData_Notify_UUID = "AEC91002-6E7A-4BC2-9A4C-4CDA7A728F58"
TxData_Notify_UUID = "AEC91003-6E7A-4BC2-9A4C-4CDA7A728F58"

DEBUG = 0


class ZlBusSdk:
    def __init__(self, advNameStr: str = '', searchStr: str = 'zl', bleScannerPass: bool = False):
        if platform.system() == 'Windows':
            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
        self.advNameStr = advNameStr
        self.searchStr = searchStr
        self.bFound = False
        self.bQuit = False
        self.blueAddr = ""
        self.findAddr = {}
        self.client = None
        self.scannerPass = bleScannerPass
        self.pkt = zlb.ZlBusUnPack(tracker_nums=1, user_id=0xFF, fifoMaxSize=10)
        self.pkt.setFlowIdFormat(zlb.api.e_TK_FlowIdFormat.TK_FLOW_ID_FORMAT_8)
        self.notification_handler = self.default_notification_handler

    def detection_callback(self, device, advertisement_data):
        if self.advNameStr != '':
            if device.name == self.advNameStr:
                self.blueAddr = device.address
                self.bFound = True
        elif device and device.name and (
                device.name.startswith(self.searchStr) or device.name.startswith(self.searchStr.upper())):
            if device.address not in self.findAddr:
                self.findAddr[device.address] = device.name
                print(device.name, " | ", device.address)
                if not self.bFound:
                    self.blueAddr = device.address
                    self.bFound = True

    def default_notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
        if DEBUG:
            print("low level rev data:", data)

    async def find(self, timeout: float = 8.0):
        try:
            print(' 搜索Ble设备...')
            self.findAddr.clear()
            self.bFound = False
            self.client = None
            endTime = time() + timeout
            async with BleakScanner(detection_callback=self.detection_callback) as scanner:
                if platform.system() == 'Windows' and self.scannerPass == False:
                    await scanner.start()
                while not self.bFound and time() < endTime:
                    await asyncio.sleep(0.1)
            if self.bFound:
                print(f'搜索到设备 MacAddress: {self.blueAddr}')
            else:
                print('搜索Ble设备超时 [Error]')
                print('未搜索到设备 [Error]')
        except Exception as e:
            print(f'搜索Ble设备异常 [Error] -> {e}')

    async def find_multiple(self, timeout: float = 8.0):
        """搜索多个设备"""
        try:
            print(' 搜索所有Ble设备...')
            self.findAddr.clear()
            self.bFound = False
            self.client = None
            endTime = time() + timeout
            async with BleakScanner(detection_callback=self.detection_callback) as scanner:
                if platform.system() == 'Windows' and self.scannerPass == False:
                    await scanner.start()
                while time() < endTime:
                    await asyncio.sleep(0.1)
            if len(self.findAddr) > 0:
                print(f'搜索到 {len(self.findAddr)} 个设备')
                return self.findAddr
            else:
                print('未搜索到设备 [Error]')
                return {}
        except Exception as e:
            print(f'搜索Ble设备异常 [Error] -> {e}')
            return {}

    async def listenData(self):
        try:
            print(f' 连接BLE设备 {self.blueAddr}...')
            async with BleakClient(self.blueAddr) as self.client:
                print(f'    连接BLE设备成功: {self.blueAddr}')
                await asyncio.sleep(0.2)
                await self.client.start_notify(UploadData_Notify_UUID, self.notification_handler)
                await asyncio.sleep(0.1)
                await self.client.start_notify(CmdCtrl_WriteNotify_UUID, self.notification_handler)
                await asyncio.sleep(0.1)
                await self.client.start_notify(TxData_Notify_UUID, self.notification_handler)
                await asyncio.sleep(0.5)
                await self.client.write_gatt_char(CmdCtrl_WriteNotify_UUID, zlb.api.ul_modifyDataFormat(
                    zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_TIME |
                    zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_QUATERNION |
                    zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_ACC |
                    zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_GYRO |
                    zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_LIN_ACC |
                    zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_ADCx), response=True)
                await asyncio.sleep(0.1)
                await self.client.write_gatt_char(CmdCtrl_WriteNotify_UUID, zlb.api.ul_getDataFormat(), response=True)
                await asyncio.sleep(0.1)
                await self.client.write_gatt_char(CmdCtrl_WriteNotify_UUID, zlb.api.hl_configOutDataPort(
                    zlb.api.e_DataOutPort.TK_RF_PORT), response=True)
                await asyncio.sleep(0.1)
                print(f"蓝牙设备监听中: {self.blueAddr}")
                while not self.bQuit:
                    await asyncio.sleep(0.1)
        except Exception as e:
            print(f'连接BLE设备异常 [Error] -> {e}')

    def stop(self):
        self.bQuit = True


# ====== 机械手部分 ======
class Command():
    ID = 2
    Code = 16
    Address_High = 4
    Address_LOW = 111
    Len = 12
    Number_High = 0
    Number_LOW = 6
    Value_High = 0
    Value_LOW = 0
    Joint_High = 0
    Joint_LOW = 0
    cmd_list = [ID, Code, Address_High, Address_LOW, Number_High, Number_LOW, Len,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                ]


def byte_deal(*args):
    result = []
    for value in args:
        high_byte = (value >> 8) & 0xFF
        low_byte = value & 0xFF
        result.extend([high_byte, low_byte])
    return result


def crc16_modbus(data: bytes) -> bytes:
    crc = 0xFFFF
    for pos in data:
        crc ^= pos
        for _ in range(8):
            if (crc & 0x0001) != 0:
                crc >>= 1
                crc ^= 0xA001
            else:
                crc >>= 1
    return crc.to_bytes(2, byteorder='little')


def set_hand_joints_value(value, robot_hand):
    cmd_list = Command().cmd_list.copy()
    tmp = []
    for i in range(len(value)):
        tmp.extend(byte_deal(value[i]))
    for i in range(len(tmp)):
        cmd_list[i + 7] = tmp[i]
    send_data = bytes(cmd_list) + crc16_modbus(cmd_list)
    send_data = send_data.hex().upper()
    hex_list = [int(send_data[i:i + 2], 16) for i in range(0, len(send_data), 2)]
    if robot_hand is not None:
        robot_hand.tool_serial_write_data(hex_list)
    else:
        print(send_data)


# ====== 控制器主类 ======
class GloveToRobotHandController:
    def __init__(self, glove_mac_address: str, hand_type: str = 'left'):
        """
        hand_type: 'left' 或 'right'
        """
        self.hand_type = hand_type
        self.glove_sdk = ZlBusSdk('', 'zl', True)
        self.glove_sdk.blueAddr = glove_mac_address
        self.glove_sdk.bFound = True

        # 根据手的类型初始化机械手
        if hand_type == 'left':
            self.robot_hand = Mercury("/dev/left_arm")
            print(f"初始化左手机械臂: /dev/left_arm")
        else:
            self.robot_hand = Mercury("/dev/right_arm")
            print(f"初始化右手机械臂: /dev/right_arm")

        self.robot_hand.set_movement_type(1)
        self.robot_hand.set_vr_mode(1)

        self.current_glove_values = [0] * 6
        self.glove_open_ref = [0] * 6
        self.glove_close_ref = [100] * 6
        self.robot_open_values = [0, 0, 0, 0, 0, 0]
        self.robot_close_values = [65535, 65535, 65535, 65535, 65535, 2000]
        self._sample_hook = None
        self.data_queue = asyncio.Queue(maxsize=50)

    def on_glove_data_received(self, ant_values):
        if hasattr(self, "_sample_hook") and self._sample_hook is not None:
            self._sample_hook(ant_values)
            return
        try:
            self.data_queue.put_nowait(ant_values)
        except asyncio.QueueFull:
            try:
                self.data_queue.get_nowait()
            except asyncio.QueueEmpty:
                pass
            self.data_queue.put_nowait(ant_values)

    async def process_data_loop(self):
        while True:
            ant_values = await self.data_queue.get()
            while not self.data_queue.empty():
                ant_values = self.data_queue.get_nowait()
            self.current_glove_values = ant_values[:6]
            robot_hand_values = self.map_glove_to_robot()
            self.control_robot_hand(robot_hand_values)

    def map_glove_to_robot(self):
        mapped_values = []
        for i in range(6):
            glove_min = min(self.glove_open_ref[i], self.glove_close_ref[i])
            glove_max = max(self.glove_open_ref[i], self.glove_close_ref[i])
            if glove_max == glove_min:
                ratio = 0.0
            else:
                ratio = (self.current_glove_values[i] - glove_min) / (glove_max - glove_min)
                ratio = max(0.0, min(1.0, ratio))
            robot_min = min(self.robot_open_values[i], self.robot_close_values[i])
            robot_max = max(self.robot_open_values[i], self.robot_close_values[i])
            robot_value = int(robot_min + ratio * (robot_max - robot_min))
            mapped_values.append(robot_value)
        return mapped_values

    def control_robot_hand(self, values):
        print(f"[{self.hand_type.upper()}] 发命令到机械手", values, time())
        try:
            set_hand_joints_value(values, self.robot_hand)
        except Exception as e:
            pass

    async def calibrate(self):
        print(f"\n========== 开始校准 {self.hand_type.upper()} 手 ==========")

        # 张开手采样
        print(f"请张开 {self.hand_type.upper()} 手掌,2秒后将自动记录张开值,请保持动作!")
        await asyncio.sleep(2)
        open_samples = []

        def open_sample_hook(ant_values):
            if len(ant_values) == 6:
                open_samples.append(list(ant_values))

        self._sample_hook = open_sample_hook
        await asyncio.sleep(2)
        self._sample_hook = None
        if open_samples:
            avg = np.mean(open_samples, axis=0)
            self.glove_open_ref = [int(x) for x in avg]
            print(f"[{self.hand_type.upper()}] 已记录张开手掌参考值:", self.glove_open_ref)
        else:
            print(f"[{self.hand_type.upper()}] 未采集到张开手数据,校准失败。")
            return False

        # 握紧手采样
        print(f"请握紧 {self.hand_type.upper()} 拳头,2秒后将自动记录握紧值,请保持动作!")
        await asyncio.sleep(2)
        close_samples = []

        def close_sample_hook(ant_values):
            if len(ant_values) == 6:
                close_samples.append(list(ant_values))

        self._sample_hook = close_sample_hook
        await asyncio.sleep(2)
        self._sample_hook = None
        if close_samples:
            avg = np.mean(close_samples, axis=0)
            self.glove_close_ref = [int(x) for x in avg]
            print(f"[{self.hand_type.upper()}] 已记录握紧拳头参考值:", self.glove_close_ref)
        else:
            print(f"[{self.hand_type.upper()}] 未采集到握紧手数据,校准失败。")
            return False

        print(f"[{self.hand_type.upper()}] 校准完成!\n")
        return True

    def stop(self):
        self.glove_sdk.stop()


async def main():
    # ====== 1. 搜索所有设备 ======
    print("=" * 60)
    print("开始搜索BLE手套设备...")
    print("=" * 60)

    scanner = ZlBusSdk('', 'zl', True)
    devices = await scanner.find_multiple(timeout=10.0)

    if len(devices) < 2:
        print(f"错误:只找到 {len(devices)} 个设备,需要至少2个设备(左手和右手)")
        if len(devices) == 1:
            print("提示:请确保两个手套都已开机并处于可连接状态")
        return

    # ====== 2. 让用户选择左右手设备 ======
    print("\n找到以下设备:")
    device_list = list(devices.items())
    for idx, (addr, name) in enumerate(device_list):
        print(f"  [{idx + 1}] {name} | {addr}")

    print("\n请选择左手设备 (输入序号):")
    left_idx = int(input("左手设备序号: ")) - 1
    left_mac = device_list[left_idx][0]
    left_name = device_list[left_idx][1]

    print("\n请选择右手设备 (输入序号):")
    right_idx = int(input("右手设备序号: ")) - 1
    right_mac = device_list[right_idx][0]
    right_name = device_list[right_idx][1]

    print(f"\n已选择:")
    print(f"  左手: {left_name} ({left_mac})")
    print(f"  右手: {right_name} ({right_mac})")

    # ====== 3. 创建左右手控制器 ======
    left_controller = GloveToRobotHandController(left_mac, 'left')
    right_controller = GloveToRobotHandController(right_mac, 'right')

    # ====== 4. 设置BLE回调 ======
    def create_callback(controller):
        def callback(characteristic, data):
            if len(data) >= 14 and data[0] == 0xAA and data[1] == 0x15:
                ant_values = []
                for i in range(6):
                    start = 12 + i * 2
                    if start + 2 <= len(data):
                        val = int.from_bytes(data[start:start + 2], byteorder='little', signed=False)
                        ant_values.append(val)
                    else:
                        ant_values.append(0)
                controller.on_glove_data_received(ant_values)

        return callback

    left_controller.glove_sdk.notification_handler = create_callback(left_controller)
    right_controller.glove_sdk.notification_handler = create_callback(right_controller)

    # ====== 5. 开启BLE监听(异步同时启动) ======
    print("\n正在连接BLE设备...")
    left_ble_task = asyncio.create_task(left_controller.glove_sdk.listenData())
    right_ble_task = asyncio.create_task(right_controller.glove_sdk.listenData())

    # 等待连接建立
    await asyncio.sleep(2)

    # ====== 6. 依次校准左手、右手 ======
    print("\n" + "=" * 60)
    print("开始校准流程")
    print("=" * 60)

    # 先校准左手
    left_success = await left_controller.calibrate()
    if not left_success:
        print("左手校准失败,程序退出")
        left_controller.stop()
        right_controller.stop()
        return

    # 再校准右手
    right_success = await right_controller.calibrate()
    if not right_success:
        print("右手校准失败,程序退出")
        left_controller.stop()
        right_controller.stop()
        return

    # ====== 7. 启动数据处理循环(异步同时处理) ======
    print("\n" + "=" * 60)
    print("校准完成,开始实时控制双手机械臂")
    print("=" * 60)

    left_process_task = asyncio.create_task(left_controller.process_data_loop())
    right_process_task = asyncio.create_task(right_controller.process_data_loop())

    print("正在实时映射手套到机械手动作,按 Ctrl+C 退出")
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        print("\n程序被用户中断")
    finally:
        left_controller.stop()
        right_controller.stop()
        print("程序已退出")


if __name__ == '__main__':
    asyncio.run(main())

← 上一章 | 下一页 →

results matching ""

    No results matching ""