CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-libusb1

Pure-python wrapper for libusb-1.0 providing comprehensive USB device access with support for all transfer types

Pending
Overview
Eval results
Files

async-transfers.mddocs/

Asynchronous USB Transfers

Asynchronous transfers enable non-blocking USB operations using event-driven callbacks, allowing concurrent transfers and integration with event loops. This approach is essential for high-performance applications, streaming data, and responsive user interfaces.

Capabilities

Transfer Object Management

Create and manage transfer objects that encapsulate USB transfer operations and their associated data buffers.

class USBTransfer:
    def close(self):
        """
        Close transfer and break reference cycles.
        
        Raises:
            ValueError: If called on submitted transfer
        """

    def doom(self):
        """
        Mark transfer as doomed to prevent resubmission.
        Doomed transfers are automatically closed after completion.
        """

    def isSubmitted(self):
        """
        Check if transfer is currently submitted.
        
        Returns:
            bool: True if transfer is submitted and pending
        """

    def setCallback(self, callback):
        """
        Set callback function for transfer completion.
        
        Args:
            callback: Function accepting (transfer) called on completion
        """

    def getCallback(self):
        """
        Get current callback function.
        
        Returns:
            callable or None: Current callback function
        """

    def setUserData(self, user_data):
        """
        Set user data associated with transfer.
        
        Args:
            user_data: Any Python object to associate with transfer
        """

    def getUserData(self):
        """
        Get user data associated with transfer.
        
        Returns:
            Any: User data object
        """

Control Transfer Setup

Configure transfers for control endpoint operations with setup packet and optional data phase.

class USBTransfer:
    def setControl(self, request_type, request, value, index, buffer_or_len, 
                   callback=None, user_data=None, timeout=0):
        """
        Configure transfer for control operation.
        
        Args:
            request_type (int): Request type bitmask (TYPE_* | RECIPIENT_* | ENDPOINT_*)
            request (int): Request ID
            value (int): Request-specific value parameter
            index (int): Request-specific index parameter
            buffer_or_len: bytes to send or int for receive buffer size
            callback: Function called on completion (transfer)
            user_data: User data for callback
            timeout (int): Timeout in milliseconds (0 = no timeout)
            
        Raises:
            ValueError: If transfer is already submitted
            DoomedTransferError: If transfer is doomed
        """

Bulk Transfer Setup

Configure transfers for bulk endpoint operations with large data payloads and reliable delivery.

class USBTransfer:
    def setBulk(self, endpoint, buffer_or_len, callback=None, user_data=None, timeout=0):
        """
        Configure transfer for bulk operation.
        
        Args:
            endpoint (int): Bulk endpoint address (includes direction bit)
            buffer_or_len: bytes to send or int for receive buffer size
            callback: Function called on completion (transfer)
            user_data: User data for callback
            timeout (int): Timeout in milliseconds (0 = no timeout)
            
        Raises:
            ValueError: If transfer is already submitted
            DoomedTransferError: If transfer is doomed
            
        Note:
            Use writable buffer (bytearray) to avoid memory copies.
        """

Interrupt Transfer Setup

Configure transfers for interrupt endpoint operations with guaranteed latency bounds.

class USBTransfer:
    def setInterrupt(self, endpoint, buffer_or_len, callback=None, user_data=None, timeout=0):
        """
        Configure transfer for interrupt operation.
        
        Args:
            endpoint (int): Interrupt endpoint address (includes direction bit)
            buffer_or_len: bytes to send or int for receive buffer size
            callback: Function called on completion (transfer)
            user_data: User data for callback
            timeout (int): Timeout in milliseconds (0 = no timeout)
            
        Raises:
            ValueError: If transfer is already submitted
            DoomedTransferError: If transfer is doomed
            
        Note:
            Use writable buffer (bytearray) to avoid memory copies.
        """

Isochronous Transfer Setup

Configure transfers for isochronous endpoint operations with time-sensitive streaming data.

class USBTransfer:
    def setIsochronous(self, endpoint, buffer_or_len, callback=None, user_data=None, 
                       timeout=0, iso_transfer_length_list=None):
        """
        Configure transfer for isochronous operation.
        
        Args:
            endpoint (int): Isochronous endpoint address (includes direction bit)
            buffer_or_len: bytes to send or int for receive buffer size
            callback: Function called on completion (transfer)
            user_data: User data for callback
            timeout (int): Timeout in milliseconds (0 = no timeout)
            iso_transfer_length_list (list[int], optional): Individual packet sizes
            
        Raises:
            ValueError: If transfer is already submitted or buffer size issues
            DoomedTransferError: If transfer is doomed
            TypeError: If transfer wasn't created with iso_packets > 0
            
        Note:
            If iso_transfer_length_list not provided, buffer is divided evenly.
        """

Transfer Execution

Submit transfers for execution and manage their lifecycle.

class USBTransfer:
    def submit(self):
        """
        Submit transfer for asynchronous execution.
        
        Raises:
            ValueError: If transfer already submitted or not initialized
            DoomedTransferError: If transfer is doomed
            USBError: If submission fails
        """

    def cancel(self):
        """
        Cancel submitted transfer.
        
        Raises:
            USBErrorNotFound: If transfer not submitted
            
        Note:
            Cancellation is asynchronous - wait for TRANSFER_CANCELLED status.
        """

Transfer Status and Results

Query transfer status and retrieve results after completion.

class USBTransfer:
    def getType(self):
        """
        Get transfer type.
        
        Returns:
            int: Transfer type constant (TRANSFER_TYPE_CONTROL, etc.)
        """

    def getEndpoint(self):
        """
        Get endpoint address.
        
        Returns:
            int: Endpoint address with direction bit
        """

    def getStatus(self):
        """
        Get transfer completion status.
        
        Returns:
            int: Status constant (TRANSFER_COMPLETED, TRANSFER_ERROR, etc.)
            
        Note:
            Should not be called on submitted transfers.
        """

    def getActualLength(self):
        """
        Get number of bytes actually transferred.
        
        Returns:
            int: Actual transfer length
            
        Note:
            Should not be called on submitted transfers.
        """

    def getBuffer(self):
        """
        Get transfer data buffer.
        
        Returns:
            memoryview: Buffer containing transfer data
            
        Note:
            Should not be called on submitted transfers.
        """

    def setBuffer(self, buffer_or_len):
        """
        Replace transfer buffer.
        
        Args:
            buffer_or_len: New buffer data or size
            
        Raises:
            ValueError: If transfer submitted or invalid for transfer type
            
        Note:
            Not allowed for control transfers (use setControl) or for
            resizing isochronous transfers (use setIsochronous).
        """

Transfer Options

Configure transfer behavior flags for error handling and packet management.

class USBTransfer:
    def isShortAnError(self):
        """
        Check if short transfers are treated as errors.
        
        Returns:
            bool: True if short transfers cause TRANSFER_ERROR status
        """

    def setShortIsError(self, state):
        """
        Set whether short transfers are treated as errors.
        
        Args:
            state (bool): True to treat short transfers as errors
        """

    def isZeroPacketAdded(self):
        """
        Check if zero-length packet is added for transfers that are
        multiples of endpoint packet size.
        
        Returns:
            bool: True if zero packet will be added
        """

    def setAddZeroPacket(self, state):
        """
        Set whether to add zero-length packet for transfers that are
        multiples of endpoint packet size.
        
        Args:
            state (bool): True to add zero packet
        """

Isochronous Transfer Analysis

Access individual packet results for isochronous transfers.

class USBTransfer:
    def getISOBufferList(self):
        """
        Get list of individual isochronous packet buffers.
        
        Returns:
            list[bytes]: Buffer for each ISO packet
            
        Raises:
            TypeError: If not an isochronous transfer
            
        Note:
            Should not be called on submitted transfers.
        """

    def getISOSetupList(self):
        """
        Get list of individual isochronous packet descriptors.
        
        Returns:
            list[dict]: List of dicts with 'length', 'actual_length', 'status'
            
        Raises:
            TypeError: If not an isochronous transfer
            
        Note:
            Should not be called on submitted transfers (except 'length').
        """

    def iterISO(self):
        """
        Iterator over isochronous packets yielding (status, buffer) tuples.
        
        Yields:
            tuple[int, bytes]: (status, buffer) for each packet
            
        Raises:
            TypeError: If not an isochronous transfer
            
        Note:
            Buffer is truncated to actual_length. More efficient than
            getISOBufferList + getISOSetupList for receive operations.
        """

Transfer Helper for Event Management

Simplified callback management for common transfer patterns with automatic resubmission.

class USBTransferHelper:
    def __init__(self, transfer=None):
        """
        Create transfer callback dispatcher.
        
        Args:
            transfer (USBTransfer, optional): Deprecated - transfer to manage
        """

    def setEventCallback(self, event, callback):
        """
        Set callback for specific transfer event.
        
        Args:
            event (int): Event constant (TRANSFER_COMPLETED, TRANSFER_ERROR, etc.)
            callback: Function accepting (transfer) returning bool
            
        Returns:
            bool: True to resubmit transfer, False to stop
            
        Raises:
            ValueError: If event is not valid
        """

    def setDefaultCallback(self, callback):
        """
        Set default callback for events without specific callbacks.
        
        Args:
            callback: Function accepting (transfer) returning bool
        """

    def getEventCallback(self, event, default=None):
        """
        Get callback for specific event.
        
        Args:
            event (int): Event constant
            default: Default value if no callback set
            
        Returns:
            callable or default: Callback function or default
        """

    def __call__(self, transfer):
        """
        Main callback function to set on transfers.
        
        Args:
            transfer (USBTransfer): Completed transfer
            
        Note:
            Automatically calls appropriate event callback and resubmits
            if callback returns True.
        """

Event Loop Integration

Integrate USB event handling with file descriptor polling mechanisms.

class USBPoller:
    def __init__(self, context, poller):
        """
        Create USB poller for event loop integration.
        
        Args:
            context (USBContext): USB context to poll
            poller: Poller object with register/unregister/poll methods
            
        Note:
            Poller must implement register(fd, events), unregister(fd), 
            and poll(timeout) returning [(fd, events), ...].
        """

    def poll(self, timeout=None):
        """
        Poll for events including USB events.
        
        Args:
            timeout (float, optional): Timeout in seconds
            
        Returns:
            list[tuple]: List of (fd, events) for non-USB events
            
        Note:
            USB events are handled internally, only non-USB events returned.
        """

    def register(self, fd, events):
        """
        Register non-USB file descriptor.
        
        Args:
            fd (int): File descriptor
            events (int): Event mask (POLLIN, POLLOUT)
            
        Raises:
            ValueError: If fd is a USB event fd
        """

    def unregister(self, fd):
        """
        Unregister non-USB file descriptor.
        
        Args:
            fd (int): File descriptor
            
        Raises:
            ValueError: If fd is a USB event fd
        """

Usage Examples

Basic Asynchronous Transfer

import usb1
import threading
import time

def transfer_callback(transfer):
    """Callback function for transfer completion."""
    status = transfer.getStatus()
    if status == usb1.TRANSFER_COMPLETED:
        actual_length = transfer.getActualLength()
        print(f"Transfer completed: {actual_length} bytes")
        
        # Access received data
        buffer = transfer.getBuffer()
        data = bytes(buffer[:actual_length])
        print(f"Received: {data[:20].hex()}...")
        
    elif status == usb1.TRANSFER_TIMED_OUT:
        print("Transfer timed out")
    elif status == usb1.TRANSFER_ERROR:
        print("Transfer error")
    elif status == usb1.TRANSFER_CANCELLED:
        print("Transfer cancelled")
    else:
        print(f"Transfer status: {status}")

def async_transfer_example():
    with usb1.USBContext() as context:
        device = context.getByVendorIDAndProductID(0x1234, 0x5678)
        if device:
            with device.open() as handle:
                with handle.claimInterface(0):
                    # Create transfer
                    transfer = handle.getTransfer()
                    
                    # Configure for bulk read
                    transfer.setBulk(
                        endpoint=0x82,      # Bulk IN endpoint
                        buffer_or_len=1024, # Buffer size
                        callback=transfer_callback,
                        timeout=5000
                    )
                    
                    # Submit transfer
                    transfer.submit()
                    print("Transfer submitted")
                    
                    # Handle events until transfer completes
                    while transfer.isSubmitted():
                        context.handleEventsTimeout(1.0)
                    
                    transfer.close()

async_transfer_example()

Multiple Concurrent Transfers

import usb1
import threading

class TransferManager:
    def __init__(self, handle, num_transfers=4):
        self.handle = handle
        self.transfers = []
        self.completed_count = 0
        self.lock = threading.Lock()
        
        # Create multiple transfers
        for i in range(num_transfers):
            transfer = handle.getTransfer()
            transfer.setUserData(i)  # Store transfer ID
            self.transfers.append(transfer)
    
    def transfer_callback(self, transfer):
        """Callback for transfer completion."""
        transfer_id = transfer.getUserData()
        status = transfer.getStatus()
        
        with self.lock:
            self.completed_count += 1
            
        if status == usb1.TRANSFER_COMPLETED:
            length = transfer.getActualLength()
            print(f"Transfer {transfer_id} completed: {length} bytes")
            
            # Resubmit for continuous operation
            if self.completed_count < 20:  # Stop after 20 transfers
                try:
                    transfer.submit()
                    return
                except usb1.USBError as e:
                    print(f"Resubmit failed: {e}")
        else:
            print(f"Transfer {transfer_id} status: {status}")
    
    def start_transfers(self):
        """Start all transfers."""
        for transfer in self.transfers:
            transfer.setBulk(
                endpoint=0x82,
                buffer_or_len=1024,
                callback=self.transfer_callback,
                timeout=2000
            )
            transfer.submit()
        print(f"Started {len(self.transfers)} concurrent transfers")
    
    def wait_completion(self, context):
        """Wait for all transfers to complete."""
        while any(t.isSubmitted() for t in self.transfers):
            context.handleEventsTimeout(1.0)
        print("All transfers completed")
    
    def cleanup(self):
        """Clean up transfers."""
        for transfer in self.transfers:
            if transfer.isSubmitted():
                transfer.cancel()
        for transfer in self.transfers:
            transfer.close()

def concurrent_transfers_example():
    with usb1.USBContext() as context:
        device = context.getByVendorIDAndProductID(0x1234, 0x5678)
        if device:
            with device.open() as handle:
                with handle.claimInterface(0):
                    manager = TransferManager(handle)
                    try:
                        manager.start_transfers()
                        manager.wait_completion(context)
                    finally:
                        manager.cleanup()

concurrent_transfers_example()

Transfer Helper with Event Callbacks

import usb1

def setup_transfer_helper_example():
    """Demonstrate USBTransferHelper for simplified callback management."""
    
    def on_completed(transfer):
        """Handle successful transfer completion."""
        length = transfer.getActualLength()
        data = bytes(transfer.getBuffer()[:length])
        print(f"Completed transfer: {length} bytes, data: {data[:10].hex()}")
        return True  # Resubmit transfer
    
    def on_error(transfer):
        """Handle transfer errors."""
        print("Transfer error occurred")
        return False  # Don't resubmit
    
    def on_timeout(transfer):
        """Handle transfer timeouts."""
        print("Transfer timed out")
        return True  # Retry
    
    def on_cancelled(transfer):
        """Handle transfer cancellation."""
        print("Transfer was cancelled")
        return False  # Don't resubmit
    
    with usb1.USBContext() as context:
        device = context.getByVendorIDAndProductID(0x1234, 0x5678)
        if device:
            with device.open() as handle:
                with handle.claimInterface(0):
                    # Create transfer and helper
                    transfer = handle.getTransfer()
                    helper = usb1.USBTransferHelper()
                    
                    # Set up event callbacks
                    helper.setEventCallback(usb1.TRANSFER_COMPLETED, on_completed)
                    helper.setEventCallback(usb1.TRANSFER_ERROR, on_error)
                    helper.setEventCallback(usb1.TRANSFER_TIMED_OUT, on_timeout)
                    helper.setEventCallback(usb1.TRANSFER_CANCELLED, on_cancelled)
                    
                    # Configure transfer
                    transfer.setInterrupt(
                        endpoint=0x81,
                        buffer_or_len=8,
                        callback=helper,  # Use helper as callback
                        timeout=1000
                    )
                    
                    # Submit and handle events
                    transfer.submit()
                    print("Transfer with helper submitted")
                    
                    # Run for 10 seconds
                    import time
                    start_time = time.time()
                    while time.time() - start_time < 10:
                        context.handleEventsTimeout(1.0)
                    
                    # Cancel if still running
                    if transfer.isSubmitted():
                        transfer.cancel()
                        # Wait for cancellation
                        while transfer.isSubmitted():
                            context.handleEventsTimeout(0.1)
                    
                    transfer.close()

setup_transfer_helper_example()

Isochronous Transfer for Streaming

import usb1
import time

def isochronous_streaming_example():
    """Demonstrate isochronous transfers for streaming data."""
    
    class IsoStreamProcessor:
        def __init__(self):
            self.packet_count = 0
            self.error_count = 0
            self.total_bytes = 0
        
        def iso_callback(self, transfer):
            """Process isochronous transfer completion."""
            if transfer.getStatus() == usb1.TRANSFER_COMPLETED:
                # Process individual packets
                for packet_status, packet_data in transfer.iterISO():
                    self.packet_count += 1
                    
                    if packet_status == usb1.TRANSFER_COMPLETED:
                        self.total_bytes += len(packet_data)
                        # Process packet data here
                        if len(packet_data) > 0:
                            print(f"Packet {self.packet_count}: {len(packet_data)} bytes")
                    else:
                        self.error_count += 1
                        print(f"Packet {self.packet_count} error: {packet_status}")
                
                # Resubmit for continuous streaming
                try:
                    transfer.submit()
                except usb1.USBError as e:
                    print(f"Resubmit failed: {e}")
            else:
                print(f"ISO transfer status: {transfer.getStatus()}")
    
    with usb1.USBContext() as context:
        # Look for device with isochronous endpoint
        device = context.getByVendorIDAndProductID(0x1234, 0x5678)
        if device:
            with device.open() as handle:
                with handle.claimInterface(0):
                    processor = IsoStreamProcessor()
                    
                    # Create transfer with ISO packets
                    transfer = handle.getTransfer(iso_packets=10)  # 10 packets per transfer
                    
                    # Configure for isochronous streaming
                    transfer.setIsochronous(
                        endpoint=0x83,  # ISO IN endpoint
                        buffer_or_len=1000,  # Total buffer size
                        callback=processor.iso_callback,
                        timeout=1000
                    )
                    
                    # Start streaming
                    transfer.submit()
                    print("Started isochronous streaming")
                    
                    # Stream for 5 seconds
                    start_time = time.time()
                    while time.time() - start_time < 5.0:
                        context.handleEventsTimeout(0.1)
                    
                    # Stop streaming
                    if transfer.isSubmitted():
                        transfer.cancel()
                        while transfer.isSubmitted():
                            context.handleEventsTimeout(0.1)
                    
                    print(f"Streaming complete:")
                    print(f"  Packets: {processor.packet_count}")
                    print(f"  Errors: {processor.error_count}")
                    print(f"  Total bytes: {processor.total_bytes}")
                    
                    transfer.close()

isochronous_streaming_example()

Event Loop Integration with USBPoller

import usb1
import select
import socket
import threading

def event_loop_integration_example():
    """Demonstrate USBPoller integration with select-based event loop."""
    
    # Create a simple TCP server socket for demonstration
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind(('localhost', 0))
    server_socket.listen(1)
    server_socket.setblocking(False)
    
    port = server_socket.getsockname()[1]
    print(f"Server listening on port {port}")
    
    # USB transfer callback
    def usb_callback(transfer):
        if transfer.getStatus() == usb1.TRANSFER_COMPLETED:
            length = transfer.getActualLength()
            print(f"USB: Received {length} bytes")
            return True  # Resubmit
        return False
    
    with usb1.USBContext() as context:
        device = context.getByVendorIDAndProductID(0x1234, 0x5678)
        if device:
            with device.open() as handle:
                with handle.claimInterface(0):
                    # Set up USB transfer
                    transfer = handle.getTransfer()
                    transfer.setInterrupt(
                        endpoint=0x81,
                        buffer_or_len=64,
                        callback=usb_callback,
                        timeout=1000
                    )
                    transfer.submit()
                    
                    # Create poller for integration
                    class SelectPoller:
                        def __init__(self):
                            self.read_fds = set()
                            self.write_fds = set()
                        
                        def register(self, fd, events):
                            if events & select.POLLIN:
                                self.read_fds.add(fd)
                            if events & select.POLLOUT:
                                self.write_fds.add(fd)
                        
                        def unregister(self, fd):
                            self.read_fds.discard(fd)
                            self.write_fds.discard(fd)
                        
                        def poll(self, timeout):
                            if timeout is None or timeout < 0:
                                timeout = None
                            
                            ready_read, ready_write, _ = select.select(
                                list(self.read_fds), 
                                list(self.write_fds), 
                                [], 
                                timeout
                            )
                            
                            result = []
                            for fd in ready_read:
                                result.append((fd, select.POLLIN))
                            for fd in ready_write:
                                result.append((fd, select.POLLOUT))
                            return result
                    
                    try:
                        poller = usb1.USBPoller(context, SelectPoller())
                        
                        # Register server socket
                        poller.register(server_socket.fileno(), select.POLLIN)
                        
                        print("Event loop running (Ctrl+C to stop)")
                        while True:
                            # Poll for events (including USB)
                            events = poller.poll(1.0)
                            
                            # Handle socket events
                            for fd, event in events:
                                if fd == server_socket.fileno():
                                    try:
                                        client_socket, addr = server_socket.accept()
                                        print(f"TCP: Connection from {addr}")
                                        client_socket.send(b"Hello from USB event loop!\n")
                                        client_socket.close()
                                    except socket.error:
                                        pass
                    
                    except KeyboardInterrupt:
                        print("\nStopping event loop")
                    
                    finally:
                        if transfer.isSubmitted():
                            transfer.cancel()
                            while transfer.isSubmitted():
                                context.handleEventsTimeout(0.1)
                        transfer.close()
                        server_socket.close()

# Note: This example requires a USB device to be meaningful
# event_loop_integration_example()

Install with Tessl CLI

npx tessl i tessl/pypi-libusb1

docs

async-transfers.md

descriptors.md

device-access.md

index.md

sync-transfers.md

usb-context.md

tile.json