Pure-python wrapper for libusb-1.0 providing comprehensive USB device access with support for all transfer types
—
Synchronous transfers provide blocking I/O operations for control, bulk, and interrupt endpoints. These methods are simpler to use than asynchronous transfers but block the calling thread until completion or timeout. They're ideal for simple request-response patterns and low-throughput applications.
Control transfers handle device configuration, status queries, and vendor-specific commands using the standard USB control transfer format with setup packet and optional data phase.
class USBDeviceHandle:
def controlRead(self, request_type, request, value, index, length, timeout=0):
"""
Perform synchronous control read transfer.
Args:
request_type (int): Request type bitmask (TYPE_* | RECIPIENT_* | ENDPOINT_IN)
request (int): Request ID (vendor-specific or standard)
value (int): Request-specific value parameter
index (int): Request-specific index parameter (often interface/endpoint)
length (int): Maximum number of bytes to read
timeout (int): Timeout in milliseconds (0 = no timeout)
Returns:
bytes: Received data (may be shorter than length)
Raises:
USBErrorTimeout: Transfer timed out
USBError: Other transfer errors
"""
def controlWrite(self, request_type, request, value, index, data, timeout=0):
"""
Perform synchronous control write transfer.
Args:
request_type (int): Request type bitmask (TYPE_* | RECIPIENT_* | ENDPOINT_OUT)
request (int): Request ID (vendor-specific or standard)
value (int): Request-specific value parameter
index (int): Request-specific index parameter (often interface/endpoint)
data (bytes): Data to send (use writable buffer like bytearray to avoid copies)
timeout (int): Timeout in milliseconds (0 = no timeout)
Returns:
int: Number of bytes actually sent
Raises:
USBErrorTimeout: Transfer timed out
USBError: Other transfer errors
"""Bulk transfers provide reliable, large-volume data transfer for endpoints that can tolerate variable latency but require error-free delivery.
class USBDeviceHandle:
def bulkRead(self, endpoint, length, timeout=0):
"""
Perform synchronous bulk read transfer.
Args:
endpoint (int): Bulk IN endpoint address (with ENDPOINT_IN bit set)
length (int): Maximum number of bytes to read
timeout (int): Timeout in milliseconds (0 = no timeout)
Returns:
bytes: Received data (may be shorter than length)
Raises:
USBErrorTimeout: Transfer timed out (exception has .received property)
USBError: Other transfer errors
"""
def bulkWrite(self, endpoint, data, timeout=0):
"""
Perform synchronous bulk write transfer.
Args:
endpoint (int): Bulk OUT endpoint address (with ENDPOINT_OUT bit set)
data (bytes): Data to send (use writable buffer like bytearray to avoid copies)
timeout (int): Timeout in milliseconds (0 = no timeout)
Returns:
int: Number of bytes actually sent
Raises:
USBErrorTimeout: Transfer timed out (exception has .transferred property)
USBError: Other transfer errors
"""Interrupt transfers provide low-latency, periodic data transfer with guaranteed maximum latency for time-sensitive applications like input devices.
class USBDeviceHandle:
def interruptRead(self, endpoint, length, timeout=0):
"""
Perform synchronous interrupt read transfer.
Args:
endpoint (int): Interrupt IN endpoint address (with ENDPOINT_IN bit set)
length (int): Maximum number of bytes to read
timeout (int): Timeout in milliseconds (0 = no timeout)
Returns:
bytes: Received data (may be shorter than length)
Raises:
USBErrorTimeout: Transfer timed out (exception has .received property)
USBError: Other transfer errors
"""
def interruptWrite(self, endpoint, data, timeout=0):
"""
Perform synchronous interrupt write transfer.
Args:
endpoint (int): Interrupt OUT endpoint address (with ENDPOINT_OUT bit set)
data (bytes): Data to send (use writable buffer like bytearray to avoid copies)
timeout (int): Timeout in milliseconds (0 = no timeout)
Returns:
int: Number of bytes actually sent
Raises:
USBErrorTimeout: Transfer timed out (exception has .transferred property)
USBError: Other transfer errors
"""import usb1
with usb1.USBContext() as context:
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
if device:
with device.open() as handle:
# Get device status (standard request)
try:
status = handle.controlRead(
request_type=usb1.TYPE_STANDARD | usb1.RECIPIENT_DEVICE | usb1.ENDPOINT_IN,
request=0x00, # GET_STATUS
value=0,
index=0,
length=2,
timeout=1000
)
print(f"Device status: {int.from_bytes(status, 'little')}")
except usb1.USBErrorTimeout:
print("Control request timed out")
except usb1.USBError as e:
print(f"Control request failed: {e}")import usb1
def send_vendor_command(handle, command, data=None):
"""Send vendor-specific control command."""
try:
if data is None:
# Vendor command with no data (status/query)
response = handle.controlRead(
request_type=usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE | usb1.ENDPOINT_IN,
request=command,
value=0,
index=0,
length=64, # Maximum expected response
timeout=5000
)
return response
else:
# Vendor command with data
sent = handle.controlWrite(
request_type=usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE | usb1.ENDPOINT_OUT,
request=command,
value=0,
index=0,
data=data,
timeout=5000
)
return sent
except usb1.USBErrorTimeout:
print(f"Vendor command {command} timed out")
return None
except usb1.USBError as e:
print(f"Vendor command {command} failed: {e}")
return None
with usb1.USBContext() as context:
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
if device:
with device.open() as handle:
# Query device version
version = send_vendor_command(handle, 0x01)
if version:
print(f"Device version: {version.hex()}")
# Send configuration data
config_data = b'\x42\x00\x01\x02'
result = send_vendor_command(handle, 0x02, config_data)
if result is not None:
print(f"Configuration sent, {result} bytes")import usb1
import time
def bulk_transfer_example(handle):
"""Demonstrate bulk transfer with error handling."""
bulk_out_ep = 0x02 # OUT endpoint
bulk_in_ep = 0x82 # IN endpoint (0x02 | ENDPOINT_IN)
try:
# Send data via bulk transfer
send_data = b"Hello, USB device!" * 10 # Some test data
sent = handle.bulkWrite(bulk_out_ep, send_data, timeout=5000)
print(f"Sent {sent} of {len(send_data)} bytes")
# Read response via bulk transfer
response = handle.bulkRead(bulk_in_ep, 1024, timeout=5000)
print(f"Received {len(response)} bytes: {response[:50]}...")
return True
except usb1.USBErrorTimeout as e:
if hasattr(e, 'transferred'):
print(f"Bulk write timed out after {e.transferred} bytes")
elif hasattr(e, 'received'):
print(f"Bulk read timed out with {len(e.received)} bytes received")
else:
print("Bulk transfer timed out")
return False
except usb1.USBErrorPipe:
print("Bulk endpoint stalled - clearing halt")
handle.clearHalt(bulk_out_ep)
handle.clearHalt(bulk_in_ep)
return False
except usb1.USBError as e:
print(f"Bulk transfer error: {e}")
return False
with usb1.USBContext() as context:
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
if device:
with device.open() as handle:
with handle.claimInterface(0):
success = bulk_transfer_example(handle)
print(f"Bulk transfer {'succeeded' if success else 'failed'}")import usb1
import struct
def read_interrupt_data(handle, endpoint, packet_size, duration=10):
"""
Read interrupt data continuously for specified duration.
Typical for HID devices like mice, keyboards.
"""
print(f"Reading interrupt data for {duration} seconds...")
start_time = time.time()
packet_count = 0
while time.time() - start_time < duration:
try:
# Read interrupt data (non-blocking with short timeout)
data = handle.interruptRead(endpoint, packet_size, timeout=100)
packet_count += 1
# Parse HID data (example for mouse)
if len(data) >= 3:
buttons, dx, dy = struct.unpack('Bbb', data[:3])
if buttons or dx or dy: # Only print if there's activity
print(f"Packet {packet_count}: buttons=0x{buttons:02x}, dx={dx}, dy={dy}")
except usb1.USBErrorTimeout:
# Timeout is normal for interrupt endpoints with no data
continue
except usb1.USBError as e:
print(f"Interrupt read error: {e}")
break
print(f"Read {packet_count} interrupt packets")
with usb1.USBContext() as context:
# Find HID device (mouse example)
for device in context.getDeviceIterator(skip_on_error=True):
if device.getDeviceClass() == 3: # HID class
print(f"Found HID device: {device.getVendorID():04x}:{device.getProductID():04x}")
try:
with device.open() as handle:
# Detach kernel driver if needed (Linux)
if handle.kernelDriverActive(0):
handle.detachKernelDriver(0)
with handle.claimInterface(0):
# Read from interrupt IN endpoint (typically 0x81)
read_interrupt_data(handle, 0x81, 8, duration=5)
except usb1.USBError as e:
print(f"Error accessing HID device: {e}")
breakimport usb1
import time
def high_speed_bulk_transfer(handle, endpoint_out, endpoint_in, chunk_size=64*1024):
"""
Demonstrate high-performance bulk transfer with large buffers.
"""
# Use bytearray for zero-copy operations
send_buffer = bytearray(chunk_size)
# Fill with test pattern
for i in range(len(send_buffer)):
send_buffer[i] = i & 0xFF
print(f"Starting high-speed bulk transfer, chunk size: {chunk_size}")
start_time = time.time()
total_sent = 0
total_received = 0
try:
for iteration in range(10): # 10 iterations
# Send data
sent = handle.bulkWrite(endpoint_out, send_buffer, timeout=5000)
total_sent += sent
# Receive response
response = handle.bulkRead(endpoint_in, chunk_size, timeout=5000)
total_received += len(response)
# Verify data integrity (example)
if len(response) >= 10:
if response[:10] != send_buffer[:10]:
print(f"Warning: Data mismatch in iteration {iteration}")
if iteration % 2 == 0:
print(f"Iteration {iteration}: sent {sent}, received {len(response)}")
except usb1.USBErrorTimeout as e:
print(f"Transfer timed out: {e}")
if hasattr(e, 'transferred'):
total_sent += e.transferred
if hasattr(e, 'received'):
total_received += len(e.received)
elapsed = time.time() - start_time
print(f"Transfer complete:")
print(f" Time: {elapsed:.2f} seconds")
print(f" Sent: {total_sent} bytes ({total_sent/elapsed/1024:.1f} KB/s)")
print(f" Received: {total_received} bytes ({total_received/elapsed/1024:.1f} KB/s)")
with usb1.USBContext() as context:
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
if device:
with device.open() as handle:
with handle.claimInterface(0):
high_speed_bulk_transfer(handle, 0x02, 0x82)import usb1
import time
def robust_transfer(handle, transfer_func, *args, max_retries=3, retry_delay=0.1):
"""
Perform transfer with automatic retry on certain errors.
"""
for attempt in range(max_retries + 1):
try:
return transfer_func(*args)
except usb1.USBErrorTimeout as e:
print(f"Attempt {attempt + 1}: Timeout")
if attempt == max_retries:
raise
time.sleep(retry_delay)
except usb1.USBErrorPipe as e:
print(f"Attempt {attempt + 1}: Pipe error - clearing halt")
# Extract endpoint from args (depends on transfer type)
if len(args) >= 1 and isinstance(args[0], int):
endpoint = args[0]
handle.clearHalt(endpoint)
if attempt == max_retries:
raise
time.sleep(retry_delay)
except usb1.USBErrorNoDevice:
print("Device disconnected")
raise
except usb1.USBError as e:
print(f"Attempt {attempt + 1}: USB error {e}")
if attempt == max_retries:
raise
time.sleep(retry_delay)
def transfer_with_recovery_example(handle):
"""Example using robust transfer wrapper."""
try:
# Robust bulk read
data = robust_transfer(
handle,
handle.bulkRead,
0x82, # endpoint
1024, # length
5000 # timeout
)
print(f"Successfully read {len(data)} bytes after retries")
# Robust control write
result = robust_transfer(
handle,
handle.controlWrite,
usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE | usb1.ENDPOINT_OUT,
0x01, # request
0, # value
0, # index
b'\x42\x00', # data
5000 # timeout
)
print(f"Successfully sent control command after retries")
except usb1.USBError as e:
print(f"Transfer failed after all retries: {e}")
with usb1.USBContext() as context:
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
if device:
with device.open() as handle:
with handle.claimInterface(0):
transfer_with_recovery_example(handle)Install with Tessl CLI
npx tessl i tessl/pypi-libusb1