0
# Asynchronous USB Transfers
1
2
Asynchronous transfers enable non-blocking USB operations using event-driven callbacks, allowing concurrent transfers and integration with event loops. This approach is essential for high-performance applications, streaming data, and responsive user interfaces.
3
4
## Capabilities
5
6
### Transfer Object Management
7
8
Create and manage transfer objects that encapsulate USB transfer operations and their associated data buffers.
9
10
```python { .api }
11
class USBTransfer:
12
def close(self):
13
"""
14
Close transfer and break reference cycles.
15
16
Raises:
17
ValueError: If called on submitted transfer
18
"""
19
20
def doom(self):
21
"""
22
Mark transfer as doomed to prevent resubmission.
23
Doomed transfers are automatically closed after completion.
24
"""
25
26
def isSubmitted(self):
27
"""
28
Check if transfer is currently submitted.
29
30
Returns:
31
bool: True if transfer is submitted and pending
32
"""
33
34
def setCallback(self, callback):
35
"""
36
Set callback function for transfer completion.
37
38
Args:
39
callback: Function accepting (transfer) called on completion
40
"""
41
42
def getCallback(self):
43
"""
44
Get current callback function.
45
46
Returns:
47
callable or None: Current callback function
48
"""
49
50
def setUserData(self, user_data):
51
"""
52
Set user data associated with transfer.
53
54
Args:
55
user_data: Any Python object to associate with transfer
56
"""
57
58
def getUserData(self):
59
"""
60
Get user data associated with transfer.
61
62
Returns:
63
Any: User data object
64
"""
65
```
66
67
### Control Transfer Setup
68
69
Configure transfers for control endpoint operations with setup packet and optional data phase.
70
71
```python { .api }
72
class USBTransfer:
73
def setControl(self, request_type, request, value, index, buffer_or_len,
74
callback=None, user_data=None, timeout=0):
75
"""
76
Configure transfer for control operation.
77
78
Args:
79
request_type (int): Request type bitmask (TYPE_* | RECIPIENT_* | ENDPOINT_*)
80
request (int): Request ID
81
value (int): Request-specific value parameter
82
index (int): Request-specific index parameter
83
buffer_or_len: bytes to send or int for receive buffer size
84
callback: Function called on completion (transfer)
85
user_data: User data for callback
86
timeout (int): Timeout in milliseconds (0 = no timeout)
87
88
Raises:
89
ValueError: If transfer is already submitted
90
DoomedTransferError: If transfer is doomed
91
"""
92
```
93
94
### Bulk Transfer Setup
95
96
Configure transfers for bulk endpoint operations with large data payloads and reliable delivery.
97
98
```python { .api }
99
class USBTransfer:
100
def setBulk(self, endpoint, buffer_or_len, callback=None, user_data=None, timeout=0):
101
"""
102
Configure transfer for bulk operation.
103
104
Args:
105
endpoint (int): Bulk endpoint address (includes direction bit)
106
buffer_or_len: bytes to send or int for receive buffer size
107
callback: Function called on completion (transfer)
108
user_data: User data for callback
109
timeout (int): Timeout in milliseconds (0 = no timeout)
110
111
Raises:
112
ValueError: If transfer is already submitted
113
DoomedTransferError: If transfer is doomed
114
115
Note:
116
Use writable buffer (bytearray) to avoid memory copies.
117
"""
118
```
119
120
### Interrupt Transfer Setup
121
122
Configure transfers for interrupt endpoint operations with guaranteed latency bounds.
123
124
```python { .api }
125
class USBTransfer:
126
def setInterrupt(self, endpoint, buffer_or_len, callback=None, user_data=None, timeout=0):
127
"""
128
Configure transfer for interrupt operation.
129
130
Args:
131
endpoint (int): Interrupt endpoint address (includes direction bit)
132
buffer_or_len: bytes to send or int for receive buffer size
133
callback: Function called on completion (transfer)
134
user_data: User data for callback
135
timeout (int): Timeout in milliseconds (0 = no timeout)
136
137
Raises:
138
ValueError: If transfer is already submitted
139
DoomedTransferError: If transfer is doomed
140
141
Note:
142
Use writable buffer (bytearray) to avoid memory copies.
143
"""
144
```
145
146
### Isochronous Transfer Setup
147
148
Configure transfers for isochronous endpoint operations with time-sensitive streaming data.
149
150
```python { .api }
151
class USBTransfer:
152
def setIsochronous(self, endpoint, buffer_or_len, callback=None, user_data=None,
153
timeout=0, iso_transfer_length_list=None):
154
"""
155
Configure transfer for isochronous operation.
156
157
Args:
158
endpoint (int): Isochronous endpoint address (includes direction bit)
159
buffer_or_len: bytes to send or int for receive buffer size
160
callback: Function called on completion (transfer)
161
user_data: User data for callback
162
timeout (int): Timeout in milliseconds (0 = no timeout)
163
iso_transfer_length_list (list[int], optional): Individual packet sizes
164
165
Raises:
166
ValueError: If transfer is already submitted or buffer size issues
167
DoomedTransferError: If transfer is doomed
168
TypeError: If transfer wasn't created with iso_packets > 0
169
170
Note:
171
If iso_transfer_length_list not provided, buffer is divided evenly.
172
"""
173
```
174
175
### Transfer Execution
176
177
Submit transfers for execution and manage their lifecycle.
178
179
```python { .api }
180
class USBTransfer:
181
def submit(self):
182
"""
183
Submit transfer for asynchronous execution.
184
185
Raises:
186
ValueError: If transfer already submitted or not initialized
187
DoomedTransferError: If transfer is doomed
188
USBError: If submission fails
189
"""
190
191
def cancel(self):
192
"""
193
Cancel submitted transfer.
194
195
Raises:
196
USBErrorNotFound: If transfer not submitted
197
198
Note:
199
Cancellation is asynchronous - wait for TRANSFER_CANCELLED status.
200
"""
201
```
202
203
### Transfer Status and Results
204
205
Query transfer status and retrieve results after completion.
206
207
```python { .api }
208
class USBTransfer:
209
def getType(self):
210
"""
211
Get transfer type.
212
213
Returns:
214
int: Transfer type constant (TRANSFER_TYPE_CONTROL, etc.)
215
"""
216
217
def getEndpoint(self):
218
"""
219
Get endpoint address.
220
221
Returns:
222
int: Endpoint address with direction bit
223
"""
224
225
def getStatus(self):
226
"""
227
Get transfer completion status.
228
229
Returns:
230
int: Status constant (TRANSFER_COMPLETED, TRANSFER_ERROR, etc.)
231
232
Note:
233
Should not be called on submitted transfers.
234
"""
235
236
def getActualLength(self):
237
"""
238
Get number of bytes actually transferred.
239
240
Returns:
241
int: Actual transfer length
242
243
Note:
244
Should not be called on submitted transfers.
245
"""
246
247
def getBuffer(self):
248
"""
249
Get transfer data buffer.
250
251
Returns:
252
memoryview: Buffer containing transfer data
253
254
Note:
255
Should not be called on submitted transfers.
256
"""
257
258
def setBuffer(self, buffer_or_len):
259
"""
260
Replace transfer buffer.
261
262
Args:
263
buffer_or_len: New buffer data or size
264
265
Raises:
266
ValueError: If transfer submitted or invalid for transfer type
267
268
Note:
269
Not allowed for control transfers (use setControl) or for
270
resizing isochronous transfers (use setIsochronous).
271
"""
272
```
273
274
### Transfer Options
275
276
Configure transfer behavior flags for error handling and packet management.
277
278
```python { .api }
279
class USBTransfer:
280
def isShortAnError(self):
281
"""
282
Check if short transfers are treated as errors.
283
284
Returns:
285
bool: True if short transfers cause TRANSFER_ERROR status
286
"""
287
288
def setShortIsError(self, state):
289
"""
290
Set whether short transfers are treated as errors.
291
292
Args:
293
state (bool): True to treat short transfers as errors
294
"""
295
296
def isZeroPacketAdded(self):
297
"""
298
Check if zero-length packet is added for transfers that are
299
multiples of endpoint packet size.
300
301
Returns:
302
bool: True if zero packet will be added
303
"""
304
305
def setAddZeroPacket(self, state):
306
"""
307
Set whether to add zero-length packet for transfers that are
308
multiples of endpoint packet size.
309
310
Args:
311
state (bool): True to add zero packet
312
"""
313
```
314
315
### Isochronous Transfer Analysis
316
317
Access individual packet results for isochronous transfers.
318
319
```python { .api }
320
class USBTransfer:
321
def getISOBufferList(self):
322
"""
323
Get list of individual isochronous packet buffers.
324
325
Returns:
326
list[bytes]: Buffer for each ISO packet
327
328
Raises:
329
TypeError: If not an isochronous transfer
330
331
Note:
332
Should not be called on submitted transfers.
333
"""
334
335
def getISOSetupList(self):
336
"""
337
Get list of individual isochronous packet descriptors.
338
339
Returns:
340
list[dict]: List of dicts with 'length', 'actual_length', 'status'
341
342
Raises:
343
TypeError: If not an isochronous transfer
344
345
Note:
346
Should not be called on submitted transfers (except 'length').
347
"""
348
349
def iterISO(self):
350
"""
351
Iterator over isochronous packets yielding (status, buffer) tuples.
352
353
Yields:
354
tuple[int, bytes]: (status, buffer) for each packet
355
356
Raises:
357
TypeError: If not an isochronous transfer
358
359
Note:
360
Buffer is truncated to actual_length. More efficient than
361
getISOBufferList + getISOSetupList for receive operations.
362
"""
363
```
364
365
### Transfer Helper for Event Management
366
367
Simplified callback management for common transfer patterns with automatic resubmission.
368
369
```python { .api }
370
class USBTransferHelper:
371
def __init__(self, transfer=None):
372
"""
373
Create transfer callback dispatcher.
374
375
Args:
376
transfer (USBTransfer, optional): Deprecated - transfer to manage
377
"""
378
379
def setEventCallback(self, event, callback):
380
"""
381
Set callback for specific transfer event.
382
383
Args:
384
event (int): Event constant (TRANSFER_COMPLETED, TRANSFER_ERROR, etc.)
385
callback: Function accepting (transfer) returning bool
386
387
Returns:
388
bool: True to resubmit transfer, False to stop
389
390
Raises:
391
ValueError: If event is not valid
392
"""
393
394
def setDefaultCallback(self, callback):
395
"""
396
Set default callback for events without specific callbacks.
397
398
Args:
399
callback: Function accepting (transfer) returning bool
400
"""
401
402
def getEventCallback(self, event, default=None):
403
"""
404
Get callback for specific event.
405
406
Args:
407
event (int): Event constant
408
default: Default value if no callback set
409
410
Returns:
411
callable or default: Callback function or default
412
"""
413
414
def __call__(self, transfer):
415
"""
416
Main callback function to set on transfers.
417
418
Args:
419
transfer (USBTransfer): Completed transfer
420
421
Note:
422
Automatically calls appropriate event callback and resubmits
423
if callback returns True.
424
"""
425
```
426
427
### Event Loop Integration
428
429
Integrate USB event handling with file descriptor polling mechanisms.
430
431
```python { .api }
432
class USBPoller:
433
def __init__(self, context, poller):
434
"""
435
Create USB poller for event loop integration.
436
437
Args:
438
context (USBContext): USB context to poll
439
poller: Poller object with register/unregister/poll methods
440
441
Note:
442
Poller must implement register(fd, events), unregister(fd),
443
and poll(timeout) returning [(fd, events), ...].
444
"""
445
446
def poll(self, timeout=None):
447
"""
448
Poll for events including USB events.
449
450
Args:
451
timeout (float, optional): Timeout in seconds
452
453
Returns:
454
list[tuple]: List of (fd, events) for non-USB events
455
456
Note:
457
USB events are handled internally, only non-USB events returned.
458
"""
459
460
def register(self, fd, events):
461
"""
462
Register non-USB file descriptor.
463
464
Args:
465
fd (int): File descriptor
466
events (int): Event mask (POLLIN, POLLOUT)
467
468
Raises:
469
ValueError: If fd is a USB event fd
470
"""
471
472
def unregister(self, fd):
473
"""
474
Unregister non-USB file descriptor.
475
476
Args:
477
fd (int): File descriptor
478
479
Raises:
480
ValueError: If fd is a USB event fd
481
"""
482
```
483
484
## Usage Examples
485
486
### Basic Asynchronous Transfer
487
488
```python
489
import usb1
490
import threading
491
import time
492
493
def transfer_callback(transfer):
494
"""Callback function for transfer completion."""
495
status = transfer.getStatus()
496
if status == usb1.TRANSFER_COMPLETED:
497
actual_length = transfer.getActualLength()
498
print(f"Transfer completed: {actual_length} bytes")
499
500
# Access received data
501
buffer = transfer.getBuffer()
502
data = bytes(buffer[:actual_length])
503
print(f"Received: {data[:20].hex()}...")
504
505
elif status == usb1.TRANSFER_TIMED_OUT:
506
print("Transfer timed out")
507
elif status == usb1.TRANSFER_ERROR:
508
print("Transfer error")
509
elif status == usb1.TRANSFER_CANCELLED:
510
print("Transfer cancelled")
511
else:
512
print(f"Transfer status: {status}")
513
514
def async_transfer_example():
515
with usb1.USBContext() as context:
516
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
517
if device:
518
with device.open() as handle:
519
with handle.claimInterface(0):
520
# Create transfer
521
transfer = handle.getTransfer()
522
523
# Configure for bulk read
524
transfer.setBulk(
525
endpoint=0x82, # Bulk IN endpoint
526
buffer_or_len=1024, # Buffer size
527
callback=transfer_callback,
528
timeout=5000
529
)
530
531
# Submit transfer
532
transfer.submit()
533
print("Transfer submitted")
534
535
# Handle events until transfer completes
536
while transfer.isSubmitted():
537
context.handleEventsTimeout(1.0)
538
539
transfer.close()
540
541
async_transfer_example()
542
```
543
544
### Multiple Concurrent Transfers
545
546
```python
547
import usb1
548
import threading
549
550
class TransferManager:
551
def __init__(self, handle, num_transfers=4):
552
self.handle = handle
553
self.transfers = []
554
self.completed_count = 0
555
self.lock = threading.Lock()
556
557
# Create multiple transfers
558
for i in range(num_transfers):
559
transfer = handle.getTransfer()
560
transfer.setUserData(i) # Store transfer ID
561
self.transfers.append(transfer)
562
563
def transfer_callback(self, transfer):
564
"""Callback for transfer completion."""
565
transfer_id = transfer.getUserData()
566
status = transfer.getStatus()
567
568
with self.lock:
569
self.completed_count += 1
570
571
if status == usb1.TRANSFER_COMPLETED:
572
length = transfer.getActualLength()
573
print(f"Transfer {transfer_id} completed: {length} bytes")
574
575
# Resubmit for continuous operation
576
if self.completed_count < 20: # Stop after 20 transfers
577
try:
578
transfer.submit()
579
return
580
except usb1.USBError as e:
581
print(f"Resubmit failed: {e}")
582
else:
583
print(f"Transfer {transfer_id} status: {status}")
584
585
def start_transfers(self):
586
"""Start all transfers."""
587
for transfer in self.transfers:
588
transfer.setBulk(
589
endpoint=0x82,
590
buffer_or_len=1024,
591
callback=self.transfer_callback,
592
timeout=2000
593
)
594
transfer.submit()
595
print(f"Started {len(self.transfers)} concurrent transfers")
596
597
def wait_completion(self, context):
598
"""Wait for all transfers to complete."""
599
while any(t.isSubmitted() for t in self.transfers):
600
context.handleEventsTimeout(1.0)
601
print("All transfers completed")
602
603
def cleanup(self):
604
"""Clean up transfers."""
605
for transfer in self.transfers:
606
if transfer.isSubmitted():
607
transfer.cancel()
608
for transfer in self.transfers:
609
transfer.close()
610
611
def concurrent_transfers_example():
612
with usb1.USBContext() as context:
613
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
614
if device:
615
with device.open() as handle:
616
with handle.claimInterface(0):
617
manager = TransferManager(handle)
618
try:
619
manager.start_transfers()
620
manager.wait_completion(context)
621
finally:
622
manager.cleanup()
623
624
concurrent_transfers_example()
625
```
626
627
### Transfer Helper with Event Callbacks
628
629
```python
630
import usb1
631
632
def setup_transfer_helper_example():
633
"""Demonstrate USBTransferHelper for simplified callback management."""
634
635
def on_completed(transfer):
636
"""Handle successful transfer completion."""
637
length = transfer.getActualLength()
638
data = bytes(transfer.getBuffer()[:length])
639
print(f"Completed transfer: {length} bytes, data: {data[:10].hex()}")
640
return True # Resubmit transfer
641
642
def on_error(transfer):
643
"""Handle transfer errors."""
644
print("Transfer error occurred")
645
return False # Don't resubmit
646
647
def on_timeout(transfer):
648
"""Handle transfer timeouts."""
649
print("Transfer timed out")
650
return True # Retry
651
652
def on_cancelled(transfer):
653
"""Handle transfer cancellation."""
654
print("Transfer was cancelled")
655
return False # Don't resubmit
656
657
with usb1.USBContext() as context:
658
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
659
if device:
660
with device.open() as handle:
661
with handle.claimInterface(0):
662
# Create transfer and helper
663
transfer = handle.getTransfer()
664
helper = usb1.USBTransferHelper()
665
666
# Set up event callbacks
667
helper.setEventCallback(usb1.TRANSFER_COMPLETED, on_completed)
668
helper.setEventCallback(usb1.TRANSFER_ERROR, on_error)
669
helper.setEventCallback(usb1.TRANSFER_TIMED_OUT, on_timeout)
670
helper.setEventCallback(usb1.TRANSFER_CANCELLED, on_cancelled)
671
672
# Configure transfer
673
transfer.setInterrupt(
674
endpoint=0x81,
675
buffer_or_len=8,
676
callback=helper, # Use helper as callback
677
timeout=1000
678
)
679
680
# Submit and handle events
681
transfer.submit()
682
print("Transfer with helper submitted")
683
684
# Run for 10 seconds
685
import time
686
start_time = time.time()
687
while time.time() - start_time < 10:
688
context.handleEventsTimeout(1.0)
689
690
# Cancel if still running
691
if transfer.isSubmitted():
692
transfer.cancel()
693
# Wait for cancellation
694
while transfer.isSubmitted():
695
context.handleEventsTimeout(0.1)
696
697
transfer.close()
698
699
setup_transfer_helper_example()
700
```
701
702
### Isochronous Transfer for Streaming
703
704
```python
705
import usb1
706
import time
707
708
def isochronous_streaming_example():
709
"""Demonstrate isochronous transfers for streaming data."""
710
711
class IsoStreamProcessor:
712
def __init__(self):
713
self.packet_count = 0
714
self.error_count = 0
715
self.total_bytes = 0
716
717
def iso_callback(self, transfer):
718
"""Process isochronous transfer completion."""
719
if transfer.getStatus() == usb1.TRANSFER_COMPLETED:
720
# Process individual packets
721
for packet_status, packet_data in transfer.iterISO():
722
self.packet_count += 1
723
724
if packet_status == usb1.TRANSFER_COMPLETED:
725
self.total_bytes += len(packet_data)
726
# Process packet data here
727
if len(packet_data) > 0:
728
print(f"Packet {self.packet_count}: {len(packet_data)} bytes")
729
else:
730
self.error_count += 1
731
print(f"Packet {self.packet_count} error: {packet_status}")
732
733
# Resubmit for continuous streaming
734
try:
735
transfer.submit()
736
except usb1.USBError as e:
737
print(f"Resubmit failed: {e}")
738
else:
739
print(f"ISO transfer status: {transfer.getStatus()}")
740
741
with usb1.USBContext() as context:
742
# Look for device with isochronous endpoint
743
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
744
if device:
745
with device.open() as handle:
746
with handle.claimInterface(0):
747
processor = IsoStreamProcessor()
748
749
# Create transfer with ISO packets
750
transfer = handle.getTransfer(iso_packets=10) # 10 packets per transfer
751
752
# Configure for isochronous streaming
753
transfer.setIsochronous(
754
endpoint=0x83, # ISO IN endpoint
755
buffer_or_len=1000, # Total buffer size
756
callback=processor.iso_callback,
757
timeout=1000
758
)
759
760
# Start streaming
761
transfer.submit()
762
print("Started isochronous streaming")
763
764
# Stream for 5 seconds
765
start_time = time.time()
766
while time.time() - start_time < 5.0:
767
context.handleEventsTimeout(0.1)
768
769
# Stop streaming
770
if transfer.isSubmitted():
771
transfer.cancel()
772
while transfer.isSubmitted():
773
context.handleEventsTimeout(0.1)
774
775
print(f"Streaming complete:")
776
print(f" Packets: {processor.packet_count}")
777
print(f" Errors: {processor.error_count}")
778
print(f" Total bytes: {processor.total_bytes}")
779
780
transfer.close()
781
782
isochronous_streaming_example()
783
```
784
785
### Event Loop Integration with USBPoller
786
787
```python
788
import usb1
789
import select
790
import socket
791
import threading
792
793
def event_loop_integration_example():
794
"""Demonstrate USBPoller integration with select-based event loop."""
795
796
# Create a simple TCP server socket for demonstration
797
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
798
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
799
server_socket.bind(('localhost', 0))
800
server_socket.listen(1)
801
server_socket.setblocking(False)
802
803
port = server_socket.getsockname()[1]
804
print(f"Server listening on port {port}")
805
806
# USB transfer callback
807
def usb_callback(transfer):
808
if transfer.getStatus() == usb1.TRANSFER_COMPLETED:
809
length = transfer.getActualLength()
810
print(f"USB: Received {length} bytes")
811
return True # Resubmit
812
return False
813
814
with usb1.USBContext() as context:
815
device = context.getByVendorIDAndProductID(0x1234, 0x5678)
816
if device:
817
with device.open() as handle:
818
with handle.claimInterface(0):
819
# Set up USB transfer
820
transfer = handle.getTransfer()
821
transfer.setInterrupt(
822
endpoint=0x81,
823
buffer_or_len=64,
824
callback=usb_callback,
825
timeout=1000
826
)
827
transfer.submit()
828
829
# Create poller for integration
830
class SelectPoller:
831
def __init__(self):
832
self.read_fds = set()
833
self.write_fds = set()
834
835
def register(self, fd, events):
836
if events & select.POLLIN:
837
self.read_fds.add(fd)
838
if events & select.POLLOUT:
839
self.write_fds.add(fd)
840
841
def unregister(self, fd):
842
self.read_fds.discard(fd)
843
self.write_fds.discard(fd)
844
845
def poll(self, timeout):
846
if timeout is None or timeout < 0:
847
timeout = None
848
849
ready_read, ready_write, _ = select.select(
850
list(self.read_fds),
851
list(self.write_fds),
852
[],
853
timeout
854
)
855
856
result = []
857
for fd in ready_read:
858
result.append((fd, select.POLLIN))
859
for fd in ready_write:
860
result.append((fd, select.POLLOUT))
861
return result
862
863
try:
864
poller = usb1.USBPoller(context, SelectPoller())
865
866
# Register server socket
867
poller.register(server_socket.fileno(), select.POLLIN)
868
869
print("Event loop running (Ctrl+C to stop)")
870
while True:
871
# Poll for events (including USB)
872
events = poller.poll(1.0)
873
874
# Handle socket events
875
for fd, event in events:
876
if fd == server_socket.fileno():
877
try:
878
client_socket, addr = server_socket.accept()
879
print(f"TCP: Connection from {addr}")
880
client_socket.send(b"Hello from USB event loop!\n")
881
client_socket.close()
882
except socket.error:
883
pass
884
885
except KeyboardInterrupt:
886
print("\nStopping event loop")
887
888
finally:
889
if transfer.isSubmitted():
890
transfer.cancel()
891
while transfer.isSubmitted():
892
context.handleEventsTimeout(0.1)
893
transfer.close()
894
server_socket.close()
895
896
# Note: This example requires a USB device to be meaningful
897
# event_loop_integration_example()
898
```