Scripts control dexterous hands and gloves control dexterous hands
Case 1:Script Control of Robotic Hand
import time
from pymycobot import Mercury
# Connect to left arm (or hand)
ml = Mercury("/dev/left_arm")
ml.set_movement_type(3)
ml.set_vr_mode(1)
# Finger poses (open/close)
banana = [
[0, 10000, 10000, 10000, 10000, 65535], # Hand open
[31000, 25000, 26000, 24000, 24000, 65535], # Fist closed
[0, 10000, 10000, 10000, 10000, 65535] # Open again
]
class Command:
ID = 2
Code = 16
Address_High = 4
Address_LOW = 111
Len = 12
Number_High = 0
Number_LOW = 6
Value_High = 0
Value_LOW = 0
Joint_High = 0
Joint_LOW = 0
cmd_list = [ID, Code, Address_High, Address_LOW, Number_High, Number_LOW, Len,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
]
cmd_list = Command().cmd_list
def byte_deal(*args):
result = []
for value in args:
high_byte = (value >> 8) & 0xFF
low_byte = value & 0xFF
result.extend([high_byte, low_byte])
return result
def crc16_modbus(data: bytes) -> bytes:
crc = 0xFFFF
for pos in data:
crc ^= pos
for _ in range(8):
if (crc & 0x0001) != 0:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc.to_bytes(2, byteorder='little')
def set_hand_joints_value(value, a1=None):
tmp = []
for i in range(len(value)):
tmp.extend(byte_deal(value[i]))
for i in range(len(tmp)):
cmd_list[i + 7] = tmp[i]
send_data = bytes(cmd_list) + crc16_modbus(cmd_list)
send_data = send_data.hex().upper()
hex_list = [int(send_data[i:i + 2], 16) for i in range(0, len(send_data), 2)]
if a1 is not None:
a1.tool_serial_write_data(hex_list)
else:
print("Sending command:", send_data)
# Simple loop test for opening and closing fingers
def main():
print("Starting finger movement test...")
while True:
set_hand_joints_value(banana[0], ml)
print("Hand opened")
time.sleep(2)
set_hand_joints_value(banana[1], ml)
print("Fist closed")
time.sleep(2)
if __name__ == "__main__":
main()
Case 2:Glove-Controlled Robotic Hand
import asyncio
import platform
from time import time
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
import numpy as np
import pyZlBus2 as zlb
from pymycobot import Mercury
# BLE UUIDs
CmdCtrl_WriteNotify_UUID = "AEC91001-6E7A-4BC2-9A4C-4CDA7A728F58"
UploadData_Notify_UUID = "AEC91002-6E7A-4BC2-9A4C-4CDA7A728F58"
TxData_Notify_UUID = "AEC91003-6E7A-4BC2-9A4C-4CDA7A728F58"
DEBUG = 0
class ZlBusSdk:
def __init__(self, advNameStr: str = '', searchStr: str = 'zl', bleScannerPass: bool = False):
if platform.system() == 'Windows':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
self.advNameStr = advNameStr
self.searchStr = searchStr
self.bFound = False
self.bQuit = False
self.blueAddr = ""
self.findAddr = {}
self.client = None
self.scannerPass = bleScannerPass
self.pkt = zlb.ZlBusUnPack(tracker_nums=1, user_id=0xFF, fifoMaxSize=10)
self.pkt.setFlowIdFormat(zlb.api.e_TK_FlowIdFormat.TK_FLOW_ID_FORMAT_8)
self.notification_handler = self.default_notification_handler
def detection_callback(self, device, advertisement_data):
if self.advNameStr != '':
if device.name == self.advNameStr:
self.blueAddr = device.address
self.bFound = True
elif device and device.name and (
device.name.startswith(self.searchStr) or device.name.startswith(self.searchStr.upper())):
if device.address not in self.findAddr:
self.findAddr[device.address] = device.name
print(device.name, " | ", device.address)
if not self.bFound:
self.blueAddr = device.address
self.bFound = True
def default_notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
if DEBUG:
print("Low-level received data:", data)
async def find(self, timeout: float = 8.0):
try:
print(' Searching for BLE devices...')
self.findAddr.clear()
self.bFound = False
self.client = None
endTime = time() + timeout
async with BleakScanner(detection_callback=self.detection_callback) as scanner:
if platform.system() == 'Windows' and self.scannerPass == False:
await scanner.start()
while not self.bFound and time() < endTime:
await asyncio.sleep(0.1)
if self.bFound:
print(f'BLE device found - MacAddress: {self.blueAddr}')
else:
print('BLE device search timed out [Error]')
print('No device found [Error]')
except Exception as e:
print(f'Exception during BLE device search [Error] -> {e}')
async def find_multiple(self, timeout: float = 8.0):
"""Search for multiple devices"""
try:
print(' Searching for all BLE devices...')
self.findAddr.clear()
self.bFound = False
self.client = None
endTime = time() + timeout
async with BleakScanner(detection_callback=self.detection_callback) as scanner:
if platform.system() == 'Windows' and self.scannerPass == False:
await scanner.start()
while time() < endTime:
await asyncio.sleep(0.1)
if len(self.findAddr) > 0:
print(f'Found {len(self.findAddr)} device(s)')
return self.findAddr
else:
print('No devices found [Error]')
return {}
except Exception as e:
print(f'Exception during BLE device search [Error] -> {e}')
return {}
async def listenData(self):
try:
print(f' Connecting to BLE device {self.blueAddr}...')
async with BleakClient(self.blueAddr) as self.client:
print(f' BLE connection successful: {self.blueAddr}')
await asyncio.sleep(0.2)
await self.client.start_notify(UploadData_Notify_UUID, self.notification_handler)
await asyncio.sleep(0.1)
await self.client.start_notify(CmdCtrl_WriteNotify_UUID, self.notification_handler)
await asyncio.sleep(0.1)
await self.client.start_notify(TxData_Notify_UUID, self.notification_handler)
await asyncio.sleep(0.5)
await self.client.write_gatt_char(CmdCtrl_WriteNotify_UUID, zlb.api.ul_modifyDataFormat(
zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_TIME |
zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_QUATERNION |
zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_ACC |
zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_GYRO |
zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_LIN_ACC |
zlb.api.e_Upload_DataFormat.NEW_UPLOAD_DATA_ADCx), response=True)
await asyncio.sleep(0.1)
await self.client.write_gatt_char(CmdCtrl_WriteNotify_UUID, zlb.api.ul_getDataFormat(), response=True)
await asyncio.sleep(0.1)
await self.client.write_gatt_char(CmdCtrl_WriteNotify_UUID, zlb.api.hl_configOutDataPort(
zlb.api.e_DataOutPort.TK_RF_PORT), response=True)
await asyncio.sleep(0.1)
print(f"Listening for BLE device: {self.blueAddr}")
while not self.bQuit:
await asyncio.sleep(0.1)
except Exception as e:
print(f'Exception connecting to BLE device [Error] -> {e}')
def stop(self):
self.bQuit = True
# ====== Robotic Hand Control Section ======
class Command:
ID = 2
Code = 16
Address_High = 4
Address_LOW = 111
Len = 12
Number_High = 0
Number_LOW = 6
Value_High = 0
Value_LOW = 0
Joint_High = 0
Joint_LOW = 0
cmd_list = [ID, Code, Address_High, Address_LOW, Number_High, Number_LOW, Len,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
Joint_High, Joint_LOW,
]
def byte_deal(*args):
result = []
for value in args:
high_byte = (value >> 8) & 0xFF
low_byte = value & 0xFF
result.extend([high_byte, low_byte])
return result
def crc16_modbus(data: bytes) -> bytes:
crc = 0xFFFF
for pos in data:
crc ^= pos
for _ in range(8):
if (crc & 0x0001) != 0:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc.to_bytes(2, byteorder='little')
def set_hand_joints_value(value, robot_hand):
cmd_list = Command().cmd_list.copy()
tmp = []
for i in range(len(value)):
tmp.extend(byte_deal(value[i]))
for i in range(len(tmp)):
cmd_list[i + 7] = tmp[i]
send_data = bytes(cmd_list) + crc16_modbus(cmd_list)
send_data = send_data.hex().upper()
hex_list = [int(send_data[i:i + 2], 16) for i in range(0, len(send_data), 2)]
if robot_hand is not None:
robot_hand.tool_serial_write_data(hex_list)
else:
print(send_data)
# ====== Main Controller Class ======
class GloveToRobotHandController:
def __init__(self, glove_mac_address: str, hand_type: str = 'left'):
"""
hand_type: 'left' or 'right'
"""
self.hand_type = hand_type
self.glove_sdk = ZlBusSdk('', 'zl', True)
self.glove_sdk.blueAddr = glove_mac_address
self.glove_sdk.bFound = True
# Initialize robotic hand based on hand type
if hand_type == 'left':
self.robot_hand = Mercury("/dev/left_arm")
print(f"Initialized left robotic arm: /dev/left_arm")
else:
self.robot_hand = Mercury("/dev/right_arm")
print(f"Initialized right robotic arm: /dev/right_arm")
self.robot_hand.set_movement_type(1)
self.robot_hand.set_vr_mode(1)
self.current_glove_values = [0] * 6
self.glove_open_ref = [0] * 6
self.glove_close_ref = [100] * 6
self.robot_open_values = [0, 0, 0, 0, 0, 0]
self.robot_close_values = [65535, 65535, 65535, 65535, 65535, 2000]
self._sample_hook = None
self.data_queue = asyncio.Queue(maxsize=50)
def on_glove_data_received(self, ant_values):
if hasattr(self, "_sample_hook") and self._sample_hook is not None:
self._sample_hook(ant_values)
return
try:
self.data_queue.put_nowait(ant_values)
except asyncio.QueueFull:
try:
self.data_queue.get_nowait()
except asyncio.QueueEmpty:
pass
self.data_queue.put_nowait(ant_values)
async def process_data_loop(self):
while True:
ant_values = await self.data_queue.get()
while not self.data_queue.empty():
ant_values = self.data_queue.get_nowait()
self.current_glove_values = ant_values[:6]
robot_hand_values = self.map_glove_to_robot()
self.control_robot_hand(robot_hand_values)
def map_glove_to_robot(self):
mapped_values = []
for i in range(6):
glove_min = min(self.glove_open_ref[i], self.glove_close_ref[i])
glove_max = max(self.glove_open_ref[i], self.glove_close_ref[i])
if glove_max == glove_min:
ratio = 0.0
else:
ratio = (self.current_glove_values[i] - glove_min) / (glove_max - glove_min)
ratio = max(0.0, min(1.0, ratio))
robot_min = min(self.robot_open_values[i], self.robot_close_values[i])
robot_max = max(self.robot_open_values[i], self.robot_close_values[i])
robot_value = int(robot_min + ratio * (robot_max - robot_min))
mapped_values.append(robot_value)
return mapped_values
def control_robot_hand(self, values):
print(f"[{self.hand_type.upper()}] Sending command to robotic hand", values, time())
try:
set_hand_joints_value(values, self.robot_hand)
except Exception as e:
pass
async def calibrate(self):
print(f"\n========== Starting calibration for {self.hand_type.upper()} hand ==========")
# Sample open hand gesture
print(f"Please open your {self.hand_type.upper()} hand. Reference values will be recorded after 2 seconds. Hold the pose!")
await asyncio.sleep(2)
open_samples = []
def open_sample_hook(ant_values):
if len(ant_values) == 6:
open_samples.append(list(ant_values))
self._sample_hook = open_sample_hook
await asyncio.sleep(2)
self._sample_hook = None
if open_samples:
avg = np.mean(open_samples, axis=0)
self.glove_open_ref = [int(x) for x in avg]
print(f"[{self.hand_type.upper()}] Open-hand reference values recorded:", self.glove_open_ref)
else:
print(f"[{self.hand_type.upper()}] No open-hand data collected. Calibration failed.")
return False
# Sample closed fist gesture
print(f"Please close your {self.hand_type.upper()} fist. Reference values will be recorded after 2 seconds. Hold the pose!")
await asyncio.sleep(2)
close_samples = []
def close_sample_hook(ant_values):
if len(ant_values) == 6:
close_samples.append(list(ant_values))
self._sample_hook = close_sample_hook
await asyncio.sleep(2)
self._sample_hook = None
if close_samples:
avg = np.mean(close_samples, axis=0)
self.glove_close_ref = [int(x) for x in avg]
print(f"[{self.hand_type.upper()}] Closed-fist reference values recorded:", self.glove_close_ref)
else:
print(f"[{self.hand_type.upper()}] No closed-fist data collected. Calibration failed.")
return False
print(f"[{self.hand_type.upper()}] Calibration completed!\n")
return True
def stop(self):
self.glove_sdk.stop()
async def main():
# ====== 1. Search for all available devices ======
print("=" * 60)
print("Searching for BLE glove devices...")
print("=" * 60)
scanner = ZlBusSdk('', 'zl', True)
devices = await scanner.find_multiple(timeout=10.0)
if len(devices) < 2:
print(f"Error: Only {len(devices)} device(s) found. At least two devices (left and right) are required.")
if len(devices) == 1:
print("Tip: Please ensure both gloves are powered on and discoverable.")
return
# ====== 2. Let user select left and right devices ======
print("\nDetected devices:")
device_list = list(devices.items())
for idx, (addr, name) in enumerate(device_list):
print(f" [{idx + 1}] {name} | {addr}")
print("\nSelect the LEFT hand device (enter index):")
left_idx = int(input("Left hand index: ")) - 1
left_mac = device_list[left_idx][0]
left_name = device_list[left_idx][1]
print("\nSelect the RIGHT hand device (enter index):")
right_idx = int(input("Right hand index: ")) - 1
right_mac = device_list[right_idx][0]
right_name = device_list[right_idx][1]
print(f"\nSelection confirmed:")
print(f" Left: {left_name} ({left_mac})")
print(f" Right: {right_name} ({right_mac})")
# ====== 3. Create controllers for left and right hands ======
left_controller = GloveToRobotHandController(left_mac, 'left')
right_controller = GloveToRobotHandController(right_mac, 'right')
# ====== 4. Set up BLE notification callbacks ======
def create_callback(controller):
def callback(characteristic, data):
if len(data) >= 14 and data[0] == 0xAA and data[1] == 0x15:
ant_values = []
for i in range(6):
start = 12 + i * 2
if start + 2 <= len(data):
val = int.from_bytes(data[start:start + 2], byteorder='little', signed=False)
ant_values.append(val)
else:
ant_values.append(0)
controller.on_glove_data_received(ant_values)
return callback
left_controller.glove_sdk.notification_handler = create_callback(left_controller)
right_controller.glove_sdk.notification_handler = create_callback(right_controller)
# ====== 5. Start BLE listening (asynchronously) ======
print("\nConnecting to BLE devices...")
left_ble_task = asyncio.create_task(left_controller.glove_sdk.listenData())
right_ble_task = asyncio.create_task(right_controller.glove_sdk.listenData())
# Wait for connections to establish
await asyncio.sleep(2)
# ====== 6. Calibrate left hand, then right hand ======
print("\n" + "=" * 60)
print("Starting calibration procedure")
print("=" * 60)
# Calibrate left hand first
left_success = await left_controller.calibrate()
if not left_success:
print("Left hand calibration failed. Exiting program.")
left_controller.stop()
right_controller.stop()
return
# Then calibrate right hand
right_success = await right_controller.calibrate()
if not right_success:
print("Right hand calibration failed. Exiting program.")
left_controller.stop()
right_controller.stop()
return
# ====== 7. Start data processing loops (asynchronously) ======
print("\n" + "=" * 60)
print("Calibration complete. Starting real-time control of dual robotic arms.")
print("=" * 60)
left_process_task = asyncio.create_task(left_controller.process_data_loop())
right_process_task = asyncio.create_task(right_controller.process_data_loop())
print("Mapping glove gestures to robotic hand movements in real time. Press Ctrl+C to exit.")
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nProgram interrupted by user")
finally:
left_controller.stop()
right_controller.stop()
print("Program exited")
if __name__ == '__main__':
asyncio.run(main())
← Previous Page | Next Chapter →