脚本控制灵巧手和手套控制灵巧手

案例1:脚本控制灵巧手代码

import time
from pymycobot import Mercury

# 连接左臂(或手)
ml = Mercury("/dev/left_arm")
ml.set_movement_type(3)
ml.set_vr_mode(1)

# 手指姿态(开/闭)
banana = [
    [0, 10000, 10000, 10000, 10000, 65535],       # 手张开
    [31000, 25000, 26000, 24000, 24000, 65535],   # 手握拳
    [0, 10000, 10000, 10000, 10000, 65535]        # 再张开
]

class Command:
    ID = 2
    Code = 16
    Address_High = 4
    Address_LOW = 111
    Len = 12
    Number_High = 0
    Number_LOW = 6
    Value_High = 0
    Value_LOW = 0
    Joint_High = 0
    Joint_LOW = 0
    cmd_list = [ID, Code, Address_High, Address_LOW, Number_High, Number_LOW, Len,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                ]

cmd_list = Command().cmd_list

def byte_deal(*args):
    result = []
    for value in args:
        high_byte = (value >> 8) & 0xFF
        low_byte = value & 0xFF
        result.extend([high_byte, low_byte])
    return result

def crc16_modbus(data: bytes) -> bytes:
    crc = 0xFFFF
    for pos in data:
        crc ^= pos
        for _ in range(8):
            if (crc & 0x0001) != 0:
                crc >>= 1
                crc ^= 0xA001
            else:
                crc >>= 1
    return crc.to_bytes(2, byteorder='little')

def set_hand_joints_value(value, a1=None):
    tmp = []
    for i in range(len(value)):
        tmp.extend(byte_deal(value[i]))
    for i in range(len(tmp)):
        cmd_list[i + 7] = tmp[i]
    send_data = bytes(cmd_list) + crc16_modbus(cmd_list)
    send_data = send_data.hex().upper()
    hex_list = [int(send_data[i:i + 2], 16) for i in range(0, len(send_data), 2)]
    if a1 is not None:
        a1.tool_serial_write_data(hex_list)
    else:
        print("发送指令:", send_data)

# 简单循环测试张开与闭合
def main():
    print("开始测试手指动作...")
    while True:
        set_hand_joints_value(banana[0], ml)
        print("手张开")
        time.sleep(2)
        set_hand_joints_value(banana[1], ml)
        print("手握拳")
        time.sleep(2)

if __name__ == "__main__":
    main()

案例2:手套控制灵巧手代码

import asyncio
import platform
from time import time
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
import numpy as np
import pyZlBus2 as zlb
from pymycobot import Mercury

# BLE UUID
CmdCtrl_WriteNotify_UUID = "AEC91001-6E7A-4BC2-9A4C-4CDA7A728F58"
UploadData_Notify_UUID = "AEC91002-6E7A-4BC2-9A4C-4CDA7A728F58"
TxData_Notify_UUID = "AEC91003-6E7A-4BC2-9A4C-4CDA7A728F58"

DEBUG = 0


class ZlBusSdk:
    def __init__(self, advNameStr: str = '', searchStr: str = 'zl', bleScannerPass: bool = False):
        if platform.system() == 'Windows':
            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
        self.advNameStr = advNameStr
        self.searchStr = searchStr
        self.bFound = False
        self.bQuit = False
        self.blueAddr = ""
        self.findAddr = {}
        self.client = None
        self.scannerPass = bleScannerPass
        self.pkt = zlb.ZlBusUnPack(tracker_nums=1, user_id=0xFF, fifoMaxSize=10)
        self.pkt.setFlowIdFormat(zlb.api.e_TK_FlowIdFormat.TK_FLOW_ID_FORMAT_8)
        self.notification_handler = self.default_notification_handler

    def detection_callback(self, device, advertisement_data):
        if self.advNameStr != '':
            if device.name == self.advNameStr:
                self.blueAddr = device.address
                self.bFound = True
        elif device and device.name and (
                device.name.startswith(self.searchStr) or device.name.startswith(self.searchStr.upper())):
            if device.address not in self.findAddr:
                self.findAddr[device.address] = device.name
                print(device.name, " | ", device.address)
                if not self.bFound:
                    self.blueAddr = device.address
                    self.bFound = True

    def default_notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
        if DEBUG:
            print("low level rev data:", data)

    async def find(self, timeout: float = 8.0):
        try:
            print(' 搜索Ble设备...')
            self.findAddr.clear()
            self.bFound = False
            self.client = None
            endTime = time() + timeout
            async with BleakScanner(detection_callback=self.detection_callback) as scanner:
                if platform.system() == 'Windows' and self.scannerPass == False:
                    await scanner.start()
                while not self.bFound and time() < endTime:
                    await asyncio.sleep(0.1)
            if self.bFound:
                print(f'搜索到设备 MacAddress: {self.blueAddr}')
            else:
                print('搜索Ble设备超时 [Error]')
                print('未搜索到设备 [Error]')
        except Exception as e:
            print(f'搜索Ble设备异常 [Error] -> {e}')

    async def find_multiple(self, timeout: float = 8.0):
        """搜索多个设备"""
        try:
            print(' 搜索所有Ble设备...')
            self.findAddr.clear()
            self.bFound = False
            self.client = None
            endTime = time() + timeout
            async with BleakScanner(detection_callback=self.detection_callback) as scanner:
                if platform.system() == 'Windows' and self.scannerPass == False:
                    await scanner.start()
                while time() < endTime:
                    await asyncio.sleep(0.1)
            if len(self.findAddr) > 0:
                print(f'搜索到 {len(self.findAddr)} 个设备')
                return self.findAddr
            else:
                print('未搜索到设备 [Error]')
                return {}
        except Exception as e:
            print(f'搜索Ble设备异常 [Error] -> {e}')
            return {}

    async def listenData(self):
        try:
            print(f' 连接BLE设备 {self.blueAddr}...')
            async with BleakClient(self.blueAddr) as self.client:
                print(f'    连接BLE设备成功: {self.blueAddr}')
                await asyncio.sleep(0.2)
                await self.client.start_notify(UploadData_Notify_UUID, self.notification_handler)
                await asyncio.sleep(0.1)
                await self.client.start_notify(CmdCtrl_WriteNotify_UUID, self.notification_handler)
                await asyncio.sleep(0.1)
                await self.client.start_notify(TxData_Notify_UUID, self.notification_handler)
                await asyncio.sleep(0.5)
                await self.client.write_gatt_char(CmdCtrl_WriteNotify_UUID, zlb.api.ul_modifyDataFormat(
                    zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_TIME |
                    zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_QUATERNION |
                    zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_ACC |
                    zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_GYRO |
                    zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_LIN_ACC |
                    zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_ADCx), response=True)
                await asyncio.sleep(0.1)
                await self.client.write_gatt_char(CmdCtrl_WriteNotify_UUID, zlb.api.ul_getDataFormat(), response=True)
                await asyncio.sleep(0.1)
                await self.client.write_gatt_char(CmdCtrl_WriteNotify_UUID, zlb.api.hl_configOutDataPort(
                    zlb.api.e_DataOutPort.TK_RF_PORT), response=True)
                await asyncio.sleep(0.1)
                print(f"蓝牙设备监听中: {self.blueAddr}")
                while not self.bQuit:
                    await asyncio.sleep(0.1)
        except Exception as e:
            print(f'连接BLE设备异常 [Error] -> {e}')

    def stop(self):
        self.bQuit = True


# ====== 机械手部分 ======
class Command():
    ID = 2
    Code = 16
    Address_High = 4
    Address_LOW = 111
    Len = 12
    Number_High = 0
    Number_LOW = 6
    Value_High = 0
    Value_LOW = 0
    Joint_High = 0
    Joint_LOW = 0
    cmd_list = [ID, Code, Address_High, Address_LOW, Number_High, Number_LOW, Len,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                Joint_High, Joint_LOW,
                ]


def byte_deal(*args):
    result = []
    for value in args:
        high_byte = (value >> 8) & 0xFF
        low_byte = value & 0xFF
        result.extend([high_byte, low_byte])
    return result


def crc16_modbus(data: bytes) -> bytes:
    crc = 0xFFFF
    for pos in data:
        crc ^= pos
        for _ in range(8):
            if (crc & 0x0001) != 0:
                crc >>= 1
                crc ^= 0xA001
            else:
                crc >>= 1
    return crc.to_bytes(2, byteorder='little')


def set_hand_joints_value(value, robot_hand):
    cmd_list = Command().cmd_list.copy()
    tmp = []
    for i in range(len(value)):
        tmp.extend(byte_deal(value[i]))
    for i in range(len(tmp)):
        cmd_list[i + 7] = tmp[i]
    send_data = bytes(cmd_list) + crc16_modbus(cmd_list)
    send_data = send_data.hex().upper()
    hex_list = [int(send_data[i:i + 2], 16) for i in range(0, len(send_data), 2)]
    if robot_hand is not None:
        robot_hand.tool_serial_write_data(hex_list)
    else:
        print(send_data)


# ====== 控制器主类 ======
class GloveToRobotHandController:
    def __init__(self, glove_mac_address: str, hand_type: str = 'left'):
        """
        hand_type: 'left' 或 'right'
        """
        self.hand_type = hand_type
        self.glove_sdk = ZlBusSdk('', 'zl', True)
        self.glove_sdk.blueAddr = glove_mac_address
        self.glove_sdk.bFound = True

        # 根据手的类型初始化机械手
        if hand_type == 'left':
            self.robot_hand = Mercury("/dev/left_arm")
            print(f"初始化左手机械臂: /dev/left_arm")
        else:
            self.robot_hand = Mercury("/dev/right_arm")
            print(f"初始化右手机械臂: /dev/right_arm")

        self.robot_hand.set_movement_type(1)
        self.robot_hand.set_vr_mode(1)

        self.current_glove_values = [0] * 6
        self.glove_open_ref = [0] * 6
        self.glove_close_ref = [100] * 6
        self.robot_open_values = [0, 0, 0, 0, 0, 0]
        self.robot_close_values = [65535, 65535, 65535, 65535, 65535, 2000]
        self._sample_hook = None
        self.data_queue = asyncio.Queue(maxsize=50)

    def on_glove_data_received(self, ant_values):
        if hasattr(self, "_sample_hook") and self._sample_hook is not None:
            self._sample_hook(ant_values)
            return
        try:
            self.data_queue.put_nowait(ant_values)
        except asyncio.QueueFull:
            try:
                self.data_queue.get_nowait()
            except asyncio.QueueEmpty:
                pass
            self.data_queue.put_nowait(ant_values)

    async def process_data_loop(self):
        while True:
            ant_values = await self.data_queue.get()
            while not self.data_queue.empty():
                ant_values = self.data_queue.get_nowait()
            self.current_glove_values = ant_values[:6]
            robot_hand_values = self.map_glove_to_robot()
            self.control_robot_hand(robot_hand_values)

    def map_glove_to_robot(self):
        mapped_values = []
        for i in range(6):
            glove_min = min(self.glove_open_ref[i], self.glove_close_ref[i])
            glove_max = max(self.glove_open_ref[i], self.glove_close_ref[i])
            if glove_max == glove_min:
                ratio = 0.0
            else:
                ratio = (self.current_glove_values[i] - glove_min) / (glove_max - glove_min)
                ratio = max(0.0, min(1.0, ratio))
            robot_min = min(self.robot_open_values[i], self.robot_close_values[i])
            robot_max = max(self.robot_open_values[i], self.robot_close_values[i])
            robot_value = int(robot_min + ratio * (robot_max - robot_min))
            mapped_values.append(robot_value)
        return mapped_values

    def control_robot_hand(self, values):
        print(f"[{self.hand_type.upper()}] 发命令到机械手", values, time())
        try:
            set_hand_joints_value(values, self.robot_hand)
        except Exception as e:
            pass

    async def calibrate(self):
        print(f"\n========== 开始校准 {self.hand_type.upper()} 手 ==========")

        # 张开手采样
        print(f"请张开 {self.hand_type.upper()} 手掌,2秒后将自动记录张开值,请保持动作!")
        await asyncio.sleep(2)
        open_samples = []

        def open_sample_hook(ant_values):
            if len(ant_values) == 6:
                open_samples.append(list(ant_values))

        self._sample_hook = open_sample_hook
        await asyncio.sleep(2)
        self._sample_hook = None
        if open_samples:
            avg = np.mean(open_samples, axis=0)
            self.glove_open_ref = [int(x) for x in avg]
            print(f"[{self.hand_type.upper()}] 已记录张开手掌参考值:", self.glove_open_ref)
        else:
            print(f"[{self.hand_type.upper()}] 未采集到张开手数据,校准失败。")
            return False

        # 握紧手采样
        print(f"请握紧 {self.hand_type.upper()} 拳头,2秒后将自动记录握紧值,请保持动作!")
        await asyncio.sleep(2)
        close_samples = []

        def close_sample_hook(ant_values):
            if len(ant_values) == 6:
                close_samples.append(list(ant_values))

        self._sample_hook = close_sample_hook
        await asyncio.sleep(2)
        self._sample_hook = None
        if close_samples:
            avg = np.mean(close_samples, axis=0)
            self.glove_close_ref = [int(x) for x in avg]
            print(f"[{self.hand_type.upper()}] 已记录握紧拳头参考值:", self.glove_close_ref)
        else:
            print(f"[{self.hand_type.upper()}] 未采集到握紧手数据,校准失败。")
            return False

        print(f"[{self.hand_type.upper()}] 校准完成!\n")
        return True

    def stop(self):
        self.glove_sdk.stop()


async def main():
    # ====== 1. 搜索所有设备 ======
    print("=" * 60)
    print("开始搜索BLE手套设备...")
    print("=" * 60)

    scanner = ZlBusSdk('', 'zl', True)
    devices = await scanner.find_multiple(timeout=10.0)

    if len(devices) < 2:
        print(f"错误:只找到 {len(devices)} 个设备,需要至少2个设备(左手和右手)")
        if len(devices) == 1:
            print("提示:请确保两个手套都已开机并处于可连接状态")
        return

    # ====== 2. 让用户选择左右手设备 ======
    print("\n找到以下设备:")
    device_list = list(devices.items())
    for idx, (addr, name) in enumerate(device_list):
        print(f"  [{idx + 1}] {name} | {addr}")

    print("\n请选择左手设备 (输入序号):")
    left_idx = int(input("左手设备序号: ")) - 1
    left_mac = device_list[left_idx][0]
    left_name = device_list[left_idx][1]

    print("\n请选择右手设备 (输入序号):")
    right_idx = int(input("右手设备序号: ")) - 1
    right_mac = device_list[right_idx][0]
    right_name = device_list[right_idx][1]

    print(f"\n已选择:")
    print(f"  左手: {left_name} ({left_mac})")
    print(f"  右手: {right_name} ({right_mac})")

    # ====== 3. 创建左右手控制器 ======
    left_controller = GloveToRobotHandController(left_mac, 'left')
    right_controller = GloveToRobotHandController(right_mac, 'right')

    # ====== 4. 设置BLE回调 ======
    def create_callback(controller):
        def callback(characteristic, data):
            if len(data) >= 14 and data[0] == 0xAA and data[1] == 0x15:
                ant_values = []
                for i in range(6):
                    start = 12 + i * 2
                    if start + 2 <= len(data):
                        val = int.from_bytes(data[start:start + 2], byteorder='little', signed=False)
                        ant_values.append(val)
                    else:
                        ant_values.append(0)
                controller.on_glove_data_received(ant_values)

        return callback

    left_controller.glove_sdk.notification_handler = create_callback(left_controller)
    right_controller.glove_sdk.notification_handler = create_callback(right_controller)

    # ====== 5. 开启BLE监听(异步同时启动) ======
    print("\n正在连接BLE设备...")
    left_ble_task = asyncio.create_task(left_controller.glove_sdk.listenData())
    right_ble_task = asyncio.create_task(right_controller.glove_sdk.listenData())

    # 等待连接建立
    await asyncio.sleep(2)

    # ====== 6. 依次校准左手、右手 ======
    print("\n" + "=" * 60)
    print("开始校准流程")
    print("=" * 60)

    # 先校准左手
    left_success = await left_controller.calibrate()
    if not left_success:
        print("左手校准失败,程序退出")
        left_controller.stop()
        right_controller.stop()
        return

    # 再校准右手
    right_success = await right_controller.calibrate()
    if not right_success:
        print("右手校准失败,程序退出")
        left_controller.stop()
        right_controller.stop()
        return

    # ====== 7. 启动数据处理循环(异步同时处理) ======
    print("\n" + "=" * 60)
    print("校准完成,开始实时控制双手机械臂")
    print("=" * 60)

    left_process_task = asyncio.create_task(left_controller.process_data_loop())
    right_process_task = asyncio.create_task(right_controller.process_data_loop())

    print("正在实时映射手套到机械手动作,按 Ctrl+C 退出")
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        print("\n程序被用户中断")
    finally:
        left_controller.stop()
        right_controller.stop()
        print("程序已退出")


if __name__ == '__main__':
    asyncio.run(main())

← 上一页 | 下一章 →

results matching ""

    No results matching ""