Pure-python wrapper for libusb-1.0 providing comprehensive USB device access with support for all transfer types
—
Asynchronous transfers enable non-blocking USB operations using event-driven callbacks, allowing concurrent transfers and integration with event loops. This approach is essential for high-performance applications, streaming data, and responsive user interfaces.
Create and manage transfer objects that encapsulate USB transfer operations and their associated data buffers.
class USBTransfer:
def close(self):
"""
Close transfer and break reference cycles.
Raises:
ValueError: If called on submitted transfer
"""
def doom(self):
"""
Mark transfer as doomed to prevent resubmission.
Doomed transfers are automatically closed after completion.
"""
def isSubmitted(self):
"""
Check if transfer is currently submitted.
Returns:
bool: True if transfer is submitted and pending
"""
def setCallback(self, callback):
"""
Set callback function for transfer completion.
Args:
callback: Function accepting (transfer) called on completion
"""
def getCallback(self):
"""
Get current callback function.
Returns:
callable or None: Current callback function
"""
def setUserData(self, user_data):
"""
Set user data associated with transfer.
Args:
user_data: Any Python object to associate with transfer
"""
def getUserData(self):
"""
Get user data associated with transfer.
Returns:
Any: User data object
"""Configure transfers for control endpoint operations with setup packet and optional data phase.
class USBTransfer:
def setControl(self, request_type, request, value, index, buffer_or_len,
callback=None, user_data=None, timeout=0):
"""
Configure transfer for control operation.
Args:
request_type (int): Request type bitmask (TYPE_* | RECIPIENT_* | ENDPOINT_*)
request (int): Request ID
value (int): Request-specific value parameter
index (int): Request-specific index parameter
buffer_or_len: bytes to send or int for receive buffer size
callback: Function called on completion (transfer)
user_data: User data for callback
timeout (int): Timeout in milliseconds (0 = no timeout)
Raises:
ValueError: If transfer is already submitted
DoomedTransferError: If transfer is doomed
"""Configure transfers for bulk endpoint operations with large data payloads and reliable delivery.
class USBTransfer:
def setBulk(self, endpoint, buffer_or_len, callback=None, user_data=None, timeout=0):
"""
Configure transfer for bulk operation.
Args:
endpoint (int): Bulk endpoint address (includes direction bit)
buffer_or_len: bytes to send or int for receive buffer size
callback: Function called on completion (transfer)
user_data: User data for callback
timeout (int): Timeout in milliseconds (0 = no timeout)
Raises:
ValueError: If transfer is already submitted
DoomedTransferError: If transfer is doomed
Note:
Use writable buffer (bytearray) to avoid memory copies.
"""Configure transfers for interrupt endpoint operations with guaranteed latency bounds.
class USBTransfer:
def setInterrupt(self, endpoint, buffer_or_len, callback=None, user_data=None, timeout=0):
"""
Configure transfer for interrupt operation.
Args:
endpoint (int): Interrupt endpoint address (includes direction bit)
buffer_or_len: bytes to send or int for receive buffer size
callback: Function called on completion (transfer)
user_data: User data for callback
timeout (int): Timeout in milliseconds (0 = no timeout)
Raises:
ValueError: If transfer is already submitted
DoomedTransferError: If transfer is doomed
Note:
Use writable buffer (bytearray) to avoid memory copies.
"""Configure transfers for isochronous endpoint operations with time-sensitive streaming data.
class USBTransfer:
def setIsochronous(self, endpoint, buffer_or_len, callback=None, user_data=None,
timeout=0, iso_transfer_length_list=None):
"""
Configure transfer for isochronous operation.
Args:
endpoint (int): Isochronous endpoint address (includes direction bit)
buffer_or_len: bytes to send or int for receive buffer size
callback: Function called on completion (transfer)
user_data: User data for callback
timeout (int): Timeout in milliseconds (0 = no timeout)
iso_transfer_length_list (list[int], optional): Individual packet sizes
Raises:
ValueError: If transfer is already submitted or buffer size issues
DoomedTransferError: If transfer is doomed
TypeError: If transfer wasn't created with iso_packets > 0
Note:
If iso_transfer_length_list not provided, buffer is divided evenly.
"""Submit transfers for execution and manage their lifecycle.
class USBTransfer:
def submit(self):
"""
Submit transfer for asynchronous execution.
Raises:
ValueError: If transfer already submitted or not initialized
DoomedTransferError: If transfer is doomed
USBError: If submission fails
"""
def cancel(self):
"""
Cancel submitted transfer.
Raises:
USBErrorNotFound: If transfer not submitted
Note:
Cancellation is asynchronous - wait for TRANSFER_CANCELLED status.
"""Query transfer status and retrieve results after completion.
class USBTransfer:
def getType(self):
"""
Get transfer type.
Returns:
int: Transfer type constant (TRANSFER_TYPE_CONTROL, etc.)
"""
def getEndpoint(self):
"""
Get endpoint address.
Returns:
int: Endpoint address with direction bit
"""
def getStatus(self):
"""
Get transfer completion status.
Returns:
int: Status constant (TRANSFER_COMPLETED, TRANSFER_ERROR, etc.)
Note:
Should not be called on submitted transfers.
"""
def getActualLength(self):
"""
Get number of bytes actually transferred.
Returns:
int: Actual transfer length
Note:
Should not be called on submitted transfers.
"""
def getBuffer(self):
"""
Get transfer data buffer.
Returns:
memoryview: Buffer containing transfer data
Note:
Should not be called on submitted transfers.
"""
def setBuffer(self, buffer_or_len):
"""
Replace transfer buffer.
Args:
buffer_or_len: New buffer data or size
Raises:
ValueError: If transfer submitted or invalid for transfer type
Note:
Not allowed for control transfers (use setControl) or for
resizing isochronous transfers (use setIsochronous).
"""Configure transfer behavior flags for error handling and packet management.
class USBTransfer:
def isShortAnError(self):
"""
Check if short transfers are treated as errors.
Returns:
bool: True if short transfers cause TRANSFER_ERROR status
"""
def setShortIsError(self, state):
"""
Set whether short transfers are treated as errors.
Args:
state (bool): True to treat short transfers as errors
"""
def isZeroPacketAdded(self):
"""
Check if zero-length packet is added for transfers that are
multiples of endpoint packet size.
Returns:
bool: True if zero packet will be added
"""
def setAddZeroPacket(self, state):
"""
Set whether to add zero-length packet for transfers that are
multiples of endpoint packet size.
Args:
state (bool): True to add zero packet
"""Access individual packet results for isochronous transfers.
class USBTransfer:
def getISOBufferList(self):
"""
Get list of individual isochronous packet buffers.
Returns:
list[bytes]: Buffer for each ISO packet
Raises:
TypeError: If not an isochronous transfer
Note:
Should not be called on submitted transfers.
"""
def getISOSetupList(self):
"""
Get list of individual isochronous packet descriptors.
Returns:
list[dict]: List of dicts with 'length', 'actual_length', 'status'
Raises:
TypeError: If not an isochronous transfer
Note:
Should not be called on submitted transfers (except 'length').
"""
def iterISO(self):
"""
Iterator over isochronous packets yielding (status, buffer) tuples.
Yields:
tuple[int, bytes]: (status, buffer) for each packet
Raises:
TypeError: If not an isochronous transfer
Note:
Buffer is truncated to actual_length. More efficient than
getISOBufferList + getISOSetupList for receive operations.
"""Simplified callback management for common transfer patterns with automatic resubmission.
class USBTransferHelper:
def __init__(self, transfer=None):
"""
Create transfer callback dispatcher.
Args:
transfer (USBTransfer, optional): Deprecated - transfer to manage
"""
def setEventCallback(self, event, callback):
"""
Set callback for specific transfer event.
Args:
event (int): Event constant (TRANSFER_COMPLETED, TRANSFER_ERROR, etc.)
callback: Function accepting (transfer) returning bool
Returns:
bool: True to resubmit transfer, False to stop
Raises:
ValueError: If event is not valid
"""
def setDefaultCallback(self, callback):
"""
Set default callback for events without specific callbacks.
Args:
callback: Function accepting (transfer) returning bool
"""
def getEventCallback(self, event, default=None):
"""
Get callback for specific event.
Args:
event (int): Event constant
default: Default value if no callback set
Returns:
callable or default: Callback function or default
"""
def __call__(self, transfer):
"""
Main callback function to set on transfers.
Args:
transfer (USBTransfer): Completed transfer
Note:
Automatically calls appropriate event callback and resubmits
if callback returns True.
"""Integrate USB event handling with file descriptor polling mechanisms.
class USBPoller:
def __init__(self, context, poller):
"""
Create USB poller for event loop integration.
Args:
context (USBContext): USB context to poll
poller: Poller object with register/unregister/poll methods
Note:
Poller must implement register(fd, events), unregister(fd),
and poll(timeout) returning [(fd, events), ...].
"""
def poll(self, timeout=None):
"""
Poll for events including USB events.
Args:
timeout (float, optional): Timeout in seconds
Returns:
list[tuple]: List of (fd, events) for non-USB events
Note:
USB events are handled internally, only non-USB events returned.
"""
def register(self, fd, events):
"""
Register non-USB file descriptor.
Args:
fd (int): File descriptor
events (int): Event mask (POLLIN, POLLOUT)
Raises:
ValueError: If fd is a USB event fd
"""
def unregister(self, fd):
"""
Unregister non-USB file descriptor.
Args:
fd (int): File descriptor
Raises:
ValueError: If fd is a USB event fd
"""import usb1
import threading
import time
def transfer_callback(transfer):
"""Callback function for transfer completion."""
status = transfer.getStatus()
if status == usb1.TRANSFER_COMPLETED:
actual_length = transfer.getActualLength()
print(f"Transfer completed: {actual_length} bytes")
# Access received data
buffer = transfer.getBuffer()
data = bytes(buffer[:actual_length])
print(f"Received: {data[:20].hex()}...")
elif status == usb1.TRANSFER_TIMED_OUT:
print("Transfer timed out")
elif status == usb1.TRANSFER_ERROR:
print("Transfer error")
elif status == usb1.TRANSFER_CANCELLED:
print("Transfer cancelled")
else:
print(f"Transfer status: {status}")
def async_transfer_example():
with usb1.USBContext() as context:
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
if device:
with device.open() as handle:
with handle.claimInterface(0):
# Create transfer
transfer = handle.getTransfer()
# Configure for bulk read
transfer.setBulk(
endpoint=0x82, # Bulk IN endpoint
buffer_or_len=1024, # Buffer size
callback=transfer_callback,
timeout=5000
)
# Submit transfer
transfer.submit()
print("Transfer submitted")
# Handle events until transfer completes
while transfer.isSubmitted():
context.handleEventsTimeout(1.0)
transfer.close()
async_transfer_example()import usb1
import threading
class TransferManager:
def __init__(self, handle, num_transfers=4):
self.handle = handle
self.transfers = []
self.completed_count = 0
self.lock = threading.Lock()
# Create multiple transfers
for i in range(num_transfers):
transfer = handle.getTransfer()
transfer.setUserData(i) # Store transfer ID
self.transfers.append(transfer)
def transfer_callback(self, transfer):
"""Callback for transfer completion."""
transfer_id = transfer.getUserData()
status = transfer.getStatus()
with self.lock:
self.completed_count += 1
if status == usb1.TRANSFER_COMPLETED:
length = transfer.getActualLength()
print(f"Transfer {transfer_id} completed: {length} bytes")
# Resubmit for continuous operation
if self.completed_count < 20: # Stop after 20 transfers
try:
transfer.submit()
return
except usb1.USBError as e:
print(f"Resubmit failed: {e}")
else:
print(f"Transfer {transfer_id} status: {status}")
def start_transfers(self):
"""Start all transfers."""
for transfer in self.transfers:
transfer.setBulk(
endpoint=0x82,
buffer_or_len=1024,
callback=self.transfer_callback,
timeout=2000
)
transfer.submit()
print(f"Started {len(self.transfers)} concurrent transfers")
def wait_completion(self, context):
"""Wait for all transfers to complete."""
while any(t.isSubmitted() for t in self.transfers):
context.handleEventsTimeout(1.0)
print("All transfers completed")
def cleanup(self):
"""Clean up transfers."""
for transfer in self.transfers:
if transfer.isSubmitted():
transfer.cancel()
for transfer in self.transfers:
transfer.close()
def concurrent_transfers_example():
with usb1.USBContext() as context:
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
if device:
with device.open() as handle:
with handle.claimInterface(0):
manager = TransferManager(handle)
try:
manager.start_transfers()
manager.wait_completion(context)
finally:
manager.cleanup()
concurrent_transfers_example()import usb1
def setup_transfer_helper_example():
"""Demonstrate USBTransferHelper for simplified callback management."""
def on_completed(transfer):
"""Handle successful transfer completion."""
length = transfer.getActualLength()
data = bytes(transfer.getBuffer()[:length])
print(f"Completed transfer: {length} bytes, data: {data[:10].hex()}")
return True # Resubmit transfer
def on_error(transfer):
"""Handle transfer errors."""
print("Transfer error occurred")
return False # Don't resubmit
def on_timeout(transfer):
"""Handle transfer timeouts."""
print("Transfer timed out")
return True # Retry
def on_cancelled(transfer):
"""Handle transfer cancellation."""
print("Transfer was cancelled")
return False # Don't resubmit
with usb1.USBContext() as context:
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
if device:
with device.open() as handle:
with handle.claimInterface(0):
# Create transfer and helper
transfer = handle.getTransfer()
helper = usb1.USBTransferHelper()
# Set up event callbacks
helper.setEventCallback(usb1.TRANSFER_COMPLETED, on_completed)
helper.setEventCallback(usb1.TRANSFER_ERROR, on_error)
helper.setEventCallback(usb1.TRANSFER_TIMED_OUT, on_timeout)
helper.setEventCallback(usb1.TRANSFER_CANCELLED, on_cancelled)
# Configure transfer
transfer.setInterrupt(
endpoint=0x81,
buffer_or_len=8,
callback=helper, # Use helper as callback
timeout=1000
)
# Submit and handle events
transfer.submit()
print("Transfer with helper submitted")
# Run for 10 seconds
import time
start_time = time.time()
while time.time() - start_time < 10:
context.handleEventsTimeout(1.0)
# Cancel if still running
if transfer.isSubmitted():
transfer.cancel()
# Wait for cancellation
while transfer.isSubmitted():
context.handleEventsTimeout(0.1)
transfer.close()
setup_transfer_helper_example()import usb1
import time
def isochronous_streaming_example():
"""Demonstrate isochronous transfers for streaming data."""
class IsoStreamProcessor:
def __init__(self):
self.packet_count = 0
self.error_count = 0
self.total_bytes = 0
def iso_callback(self, transfer):
"""Process isochronous transfer completion."""
if transfer.getStatus() == usb1.TRANSFER_COMPLETED:
# Process individual packets
for packet_status, packet_data in transfer.iterISO():
self.packet_count += 1
if packet_status == usb1.TRANSFER_COMPLETED:
self.total_bytes += len(packet_data)
# Process packet data here
if len(packet_data) > 0:
print(f"Packet {self.packet_count}: {len(packet_data)} bytes")
else:
self.error_count += 1
print(f"Packet {self.packet_count} error: {packet_status}")
# Resubmit for continuous streaming
try:
transfer.submit()
except usb1.USBError as e:
print(f"Resubmit failed: {e}")
else:
print(f"ISO transfer status: {transfer.getStatus()}")
with usb1.USBContext() as context:
# Look for device with isochronous endpoint
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
if device:
with device.open() as handle:
with handle.claimInterface(0):
processor = IsoStreamProcessor()
# Create transfer with ISO packets
transfer = handle.getTransfer(iso_packets=10) # 10 packets per transfer
# Configure for isochronous streaming
transfer.setIsochronous(
endpoint=0x83, # ISO IN endpoint
buffer_or_len=1000, # Total buffer size
callback=processor.iso_callback,
timeout=1000
)
# Start streaming
transfer.submit()
print("Started isochronous streaming")
# Stream for 5 seconds
start_time = time.time()
while time.time() - start_time < 5.0:
context.handleEventsTimeout(0.1)
# Stop streaming
if transfer.isSubmitted():
transfer.cancel()
while transfer.isSubmitted():
context.handleEventsTimeout(0.1)
print(f"Streaming complete:")
print(f" Packets: {processor.packet_count}")
print(f" Errors: {processor.error_count}")
print(f" Total bytes: {processor.total_bytes}")
transfer.close()
isochronous_streaming_example()import usb1
import select
import socket
import threading
def event_loop_integration_example():
"""Demonstrate USBPoller integration with select-based event loop."""
# Create a simple TCP server socket for demonstration
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('localhost', 0))
server_socket.listen(1)
server_socket.setblocking(False)
port = server_socket.getsockname()[1]
print(f"Server listening on port {port}")
# USB transfer callback
def usb_callback(transfer):
if transfer.getStatus() == usb1.TRANSFER_COMPLETED:
length = transfer.getActualLength()
print(f"USB: Received {length} bytes")
return True # Resubmit
return False
with usb1.USBContext() as context:
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
if device:
with device.open() as handle:
with handle.claimInterface(0):
# Set up USB transfer
transfer = handle.getTransfer()
transfer.setInterrupt(
endpoint=0x81,
buffer_or_len=64,
callback=usb_callback,
timeout=1000
)
transfer.submit()
# Create poller for integration
class SelectPoller:
def __init__(self):
self.read_fds = set()
self.write_fds = set()
def register(self, fd, events):
if events & select.POLLIN:
self.read_fds.add(fd)
if events & select.POLLOUT:
self.write_fds.add(fd)
def unregister(self, fd):
self.read_fds.discard(fd)
self.write_fds.discard(fd)
def poll(self, timeout):
if timeout is None or timeout < 0:
timeout = None
ready_read, ready_write, _ = select.select(
list(self.read_fds),
list(self.write_fds),
[],
timeout
)
result = []
for fd in ready_read:
result.append((fd, select.POLLIN))
for fd in ready_write:
result.append((fd, select.POLLOUT))
return result
try:
poller = usb1.USBPoller(context, SelectPoller())
# Register server socket
poller.register(server_socket.fileno(), select.POLLIN)
print("Event loop running (Ctrl+C to stop)")
while True:
# Poll for events (including USB)
events = poller.poll(1.0)
# Handle socket events
for fd, event in events:
if fd == server_socket.fileno():
try:
client_socket, addr = server_socket.accept()
print(f"TCP: Connection from {addr}")
client_socket.send(b"Hello from USB event loop!\n")
client_socket.close()
except socket.error:
pass
except KeyboardInterrupt:
print("\nStopping event loop")
finally:
if transfer.isSubmitted():
transfer.cancel()
while transfer.isSubmitted():
context.handleEventsTimeout(0.1)
transfer.close()
server_socket.close()
# Note: This example requires a USB device to be meaningful
# event_loop_integration_example()Install with Tessl CLI
npx tessl i tessl/pypi-libusb1