0
# Asynchronous I/O
1
2
Core asynchronous I/O primitives including event loops, streams, locks, queues, and futures. These components enable non-blocking operations and concurrent programming patterns.
3
4
## Capabilities
5
6
### Event Loop
7
8
Core event loop managing all asynchronous operations, timers, callbacks, and I/O events in Tornado applications.
9
10
```python { .api }
11
class IOLoop:
12
"""Asynchronous I/O event loop."""
13
14
@classmethod
15
def configure(cls, impl, **kwargs):
16
"""Configure IOLoop implementation."""
17
18
@classmethod
19
def instance(cls):
20
"""Get global IOLoop instance."""
21
22
@classmethod
23
def current(cls, instance: bool = True):
24
"""
25
Get current thread's IOLoop.
26
27
Args:
28
instance: Whether to create instance if none exists
29
30
Returns:
31
Current IOLoop instance
32
"""
33
34
@classmethod
35
def install(cls):
36
"""Install this IOLoop as global instance."""
37
38
def start(self):
39
"""Start the event loop."""
40
41
def stop(self):
42
"""Stop the event loop."""
43
44
def run_sync(self, func, timeout: float = None):
45
"""
46
Run coroutine synchronously with timeout.
47
48
Args:
49
func: Coroutine function to run
50
timeout: Optional timeout in seconds
51
52
Returns:
53
Result of coroutine
54
"""
55
56
def add_handler(self, fd, handler, events):
57
"""
58
Add I/O event handler for file descriptor.
59
60
Args:
61
fd: File descriptor
62
handler: Handler function
63
events: Event mask (IOLoop.READ, IOLoop.WRITE, IOLoop.ERROR)
64
"""
65
66
def update_handler(self, fd, events):
67
"""Update events for file descriptor."""
68
69
def remove_handler(self, fd):
70
"""Remove handler for file descriptor."""
71
72
def add_timeout(self, deadline, callback, *args, **kwargs):
73
"""
74
Add timeout callback.
75
76
Args:
77
deadline: Absolute time or timedelta
78
callback: Callback function
79
*args: Callback arguments
80
**kwargs: Callback keyword arguments
81
82
Returns:
83
Timeout handle
84
"""
85
86
def call_later(self, delay: float, callback, *args, **kwargs):
87
"""
88
Call function after delay.
89
90
Args:
91
delay: Delay in seconds
92
callback: Callback function
93
*args: Callback arguments
94
**kwargs: Callback keyword arguments
95
96
Returns:
97
Timeout handle
98
"""
99
100
def call_at(self, when: float, callback, *args, **kwargs):
101
"""
102
Call function at specific time.
103
104
Args:
105
when: Absolute time
106
callback: Callback function
107
*args: Callback arguments
108
**kwargs: Callback keyword arguments
109
110
Returns:
111
Timeout handle
112
"""
113
114
def remove_timeout(self, timeout):
115
"""Remove timeout callback."""
116
117
def add_callback(self, callback, *args, **kwargs):
118
"""
119
Add callback to be executed on next iteration.
120
121
Args:
122
callback: Callback function
123
*args: Callback arguments
124
**kwargs: Callback keyword arguments
125
"""
126
127
def add_callback_from_signal(self, callback, *args, **kwargs):
128
"""Add callback from signal handler context."""
129
130
def spawn_callback(self, callback, *args, **kwargs):
131
"""
132
Spawn callback as separate task.
133
134
Args:
135
callback: Callback function
136
*args: Callback arguments
137
**kwargs: Callback keyword arguments
138
"""
139
140
def add_future(self, future, callback):
141
"""
142
Add callback to be called when future completes.
143
144
Args:
145
future: Future object
146
callback: Callback function
147
"""
148
149
def run_in_executor(self, executor, func, *args):
150
"""
151
Run function in executor.
152
153
Args:
154
executor: Executor instance
155
func: Function to execute
156
*args: Function arguments
157
158
Returns:
159
Future resolving to function result
160
"""
161
162
def set_default_executor(self, executor):
163
"""Set default executor for run_in_executor."""
164
165
def time(self) -> float:
166
"""Get current time."""
167
168
class PeriodicCallback:
169
"""Periodic callback scheduler."""
170
171
def __init__(self, callback, callback_time: float, io_loop=None):
172
"""
173
Initialize periodic callback.
174
175
Args:
176
callback: Function to call periodically
177
callback_time: Interval in milliseconds
178
io_loop: IOLoop instance (uses current if None)
179
"""
180
181
def start(self):
182
"""Start periodic callbacks."""
183
184
def stop(self):
185
"""Stop periodic callbacks."""
186
187
def is_running(self) -> bool:
188
"""Check if callback is running."""
189
```
190
191
### Asynchronous Streams
192
193
Stream abstractions for non-blocking I/O operations on sockets, pipes, and other file descriptors.
194
195
```python { .api }
196
class BaseIOStream:
197
"""Base class for asynchronous streams."""
198
199
def __init__(self, io_loop=None, max_buffer_size: int = None, read_chunk_size: int = None, max_write_buffer_size: int = None):
200
"""
201
Initialize stream.
202
203
Args:
204
io_loop: IOLoop instance
205
max_buffer_size: Maximum read buffer size
206
read_chunk_size: Read chunk size
207
max_write_buffer_size: Maximum write buffer size
208
"""
209
210
def read_bytes(self, num_bytes: int, callback=None, streaming_callback=None, partial: bool = False) -> Future:
211
"""
212
Read specified number of bytes.
213
214
Args:
215
num_bytes: Number of bytes to read
216
callback: Callback function (if not using async/await)
217
streaming_callback: Callback for data chunks
218
partial: Whether to allow partial reads
219
220
Returns:
221
Future resolving to bytes
222
"""
223
224
def read_until_regex(self, regex, callback=None, max_bytes: int = None) -> Future:
225
"""
226
Read until regex pattern matches.
227
228
Args:
229
regex: Regular expression pattern
230
callback: Callback function (if not using async/await)
231
max_bytes: Maximum bytes to read
232
233
Returns:
234
Future resolving to bytes
235
"""
236
237
def read_until(self, delimiter: bytes, callback=None, max_bytes: int = None) -> Future:
238
"""
239
Read until delimiter found.
240
241
Args:
242
delimiter: Delimiter bytes
243
callback: Callback function (if not using async/await)
244
max_bytes: Maximum bytes to read
245
246
Returns:
247
Future resolving to bytes
248
"""
249
250
def read_into(self, buf, callback=None, partial: bool = False) -> Future:
251
"""
252
Read data into existing buffer.
253
254
Args:
255
buf: Buffer to read into
256
callback: Callback function (if not using async/await)
257
partial: Whether to allow partial reads
258
259
Returns:
260
Future resolving to number of bytes read
261
"""
262
263
def read_until_close(self, callback=None, streaming_callback=None) -> Future:
264
"""
265
Read all data until stream closes.
266
267
Args:
268
callback: Callback function (if not using async/await)
269
streaming_callback: Callback for data chunks
270
271
Returns:
272
Future resolving to all bytes
273
"""
274
275
def write(self, data: bytes, callback=None) -> Future:
276
"""
277
Write data to stream.
278
279
Args:
280
data: Data to write
281
callback: Callback function (if not using async/await)
282
283
Returns:
284
Future resolving when write completes
285
"""
286
287
def close(self, exc_info: bool = False):
288
"""
289
Close stream.
290
291
Args:
292
exc_info: Whether to log exception info
293
"""
294
295
def set_close_callback(self, callback):
296
"""
297
Set callback to be called when stream closes.
298
299
Args:
300
callback: Close callback function
301
"""
302
303
def closed(self) -> bool:
304
"""Check if stream is closed."""
305
306
def reading(self) -> bool:
307
"""Check if stream is reading."""
308
309
def writing(self) -> bool:
310
"""Check if stream is writing."""
311
312
def set_nodelay(self, value: bool):
313
"""Enable/disable Nagle's algorithm."""
314
315
class IOStream(BaseIOStream):
316
"""Socket-based stream implementation."""
317
318
def __init__(self, socket, io_loop=None, **kwargs):
319
"""
320
Initialize socket stream.
321
322
Args:
323
socket: Socket object
324
io_loop: IOLoop instance
325
**kwargs: Additional stream options
326
"""
327
328
async def connect(self, address, callback=None, server_hostname: str = None):
329
"""
330
Connect to remote address.
331
332
Args:
333
address: Remote address tuple (host, port)
334
callback: Callback function (if not using async/await)
335
server_hostname: Server hostname for SNI
336
"""
337
338
def start_tls(self, server_side: bool, ssl_options=None, server_hostname: str = None) -> Future:
339
"""
340
Start TLS/SSL on connection.
341
342
Args:
343
server_side: Whether this is server side
344
ssl_options: SSL configuration options
345
server_hostname: Server hostname for SNI
346
347
Returns:
348
Future resolving when TLS handshake completes
349
"""
350
351
class SSLIOStream(IOStream):
352
"""SSL/TLS socket stream."""
353
354
def __init__(self, *args, **kwargs):
355
"""Initialize SSL stream."""
356
357
def wait_for_handshake(self, callback=None) -> Future:
358
"""
359
Wait for SSL handshake to complete.
360
361
Args:
362
callback: Callback function (if not using async/await)
363
364
Returns:
365
Future resolving when handshake completes
366
"""
367
368
class PipeIOStream(BaseIOStream):
369
"""Pipe-based stream for subprocess communication."""
370
371
def __init__(self, fd, io_loop=None, **kwargs):
372
"""
373
Initialize pipe stream.
374
375
Args:
376
fd: File descriptor
377
io_loop: IOLoop instance
378
**kwargs: Additional stream options
379
"""
380
```
381
382
### Synchronization Primitives
383
384
Asynchronous versions of threading primitives like locks, conditions, events, and semaphores for coordinating coroutines.
385
386
```python { .api }
387
class Lock:
388
"""Asynchronous lock (mutex)."""
389
390
def __init__(self):
391
"""Initialize lock."""
392
393
async def __aenter__(self):
394
"""Async context manager entry."""
395
await self.acquire()
396
return self
397
398
async def __aexit__(self, exc_type, exc_val, exc_tb):
399
"""Async context manager exit."""
400
self.release()
401
402
async def acquire(self):
403
"""
404
Acquire lock.
405
406
Blocks until lock is available.
407
"""
408
409
def release(self):
410
"""
411
Release lock.
412
413
Raises:
414
RuntimeError: If lock is not currently held
415
"""
416
417
class Condition:
418
"""Asynchronous condition variable."""
419
420
def __init__(self, lock: Lock = None):
421
"""
422
Initialize condition.
423
424
Args:
425
lock: Optional lock to use (creates new if None)
426
"""
427
428
async def __aenter__(self):
429
"""Async context manager entry."""
430
await self.acquire()
431
return self
432
433
async def __aexit__(self, exc_type, exc_val, exc_tb):
434
"""Async context manager exit."""
435
self.release()
436
437
async def acquire(self):
438
"""Acquire underlying lock."""
439
440
def release(self):
441
"""Release underlying lock."""
442
443
async def wait(self, timeout: float = None) -> bool:
444
"""
445
Wait for condition to be notified.
446
447
Args:
448
timeout: Optional timeout in seconds
449
450
Returns:
451
True if notified, False if timeout
452
"""
453
454
def notify(self, n: int = 1):
455
"""
456
Notify waiting coroutines.
457
458
Args:
459
n: Number of coroutines to notify
460
"""
461
462
def notify_all(self):
463
"""Notify all waiting coroutines."""
464
465
class Event:
466
"""Asynchronous event flag."""
467
468
def __init__(self):
469
"""Initialize event (starts unset)."""
470
471
def is_set(self) -> bool:
472
"""Check if event is set."""
473
474
def set(self):
475
"""Set event flag and notify waiters."""
476
477
def clear(self):
478
"""Clear event flag."""
479
480
async def wait(self, timeout: float = None) -> bool:
481
"""
482
Wait for event to be set.
483
484
Args:
485
timeout: Optional timeout in seconds
486
487
Returns:
488
True if event was set, False if timeout
489
"""
490
491
class Semaphore:
492
"""Asynchronous semaphore."""
493
494
def __init__(self, value: int = 1):
495
"""
496
Initialize semaphore.
497
498
Args:
499
value: Initial semaphore value
500
"""
501
502
async def __aenter__(self):
503
"""Async context manager entry."""
504
await self.acquire()
505
return self
506
507
async def __aexit__(self, exc_type, exc_val, exc_tb):
508
"""Async context manager exit."""
509
self.release()
510
511
async def acquire(self):
512
"""Acquire semaphore (decrements counter)."""
513
514
def release(self):
515
"""Release semaphore (increments counter)."""
516
517
class BoundedSemaphore(Semaphore):
518
"""Semaphore with bounded release operation."""
519
520
def release(self):
521
"""
522
Release semaphore with bounds checking.
523
524
Raises:
525
ValueError: If releasing would exceed initial value
526
"""
527
```
528
529
### Asynchronous Queues
530
531
Queue implementations for passing data between coroutines with different ordering strategies and flow control.
532
533
```python { .api }
534
class Queue:
535
"""Asynchronous FIFO queue."""
536
537
def __init__(self, maxsize: int = 0):
538
"""
539
Initialize queue.
540
541
Args:
542
maxsize: Maximum queue size (0 for unlimited)
543
"""
544
545
def qsize(self) -> int:
546
"""Get current queue size."""
547
548
def empty(self) -> bool:
549
"""Check if queue is empty."""
550
551
def full(self) -> bool:
552
"""Check if queue is full."""
553
554
async def put(self, item):
555
"""
556
Put item in queue.
557
558
Args:
559
item: Item to add to queue
560
561
Blocks if queue is full.
562
"""
563
564
def put_nowait(self, item):
565
"""
566
Put item in queue without blocking.
567
568
Args:
569
item: Item to add to queue
570
571
Raises:
572
QueueFull: If queue is full
573
"""
574
575
async def get(self):
576
"""
577
Get item from queue.
578
579
Returns:
580
Item from queue
581
582
Blocks if queue is empty.
583
"""
584
585
def get_nowait(self):
586
"""
587
Get item from queue without blocking.
588
589
Returns:
590
Item from queue
591
592
Raises:
593
QueueEmpty: If queue is empty
594
"""
595
596
def task_done(self):
597
"""Indicate that queued task is complete."""
598
599
async def join(self):
600
"""Wait until all tasks are done."""
601
602
class PriorityQueue(Queue):
603
"""Asynchronous priority queue (lowest priority first)."""
604
605
def __init__(self, maxsize: int = 0):
606
"""Initialize priority queue."""
607
608
class LifoQueue(Queue):
609
"""Asynchronous LIFO queue (stack)."""
610
611
def __init__(self, maxsize: int = 0):
612
"""Initialize LIFO queue."""
613
```
614
615
### Future Utilities
616
617
Utilities for working with Future objects, including conversion, timeouts, and execution control.
618
619
```python { .api }
620
Future = asyncio.Future
621
622
def is_future(obj) -> bool:
623
"""
624
Check if object is a Future.
625
626
Args:
627
obj: Object to check
628
629
Returns:
630
True if object is a Future
631
"""
632
633
def run_on_executor(executor=None, io_loop=None):
634
"""
635
Decorator to run function on executor.
636
637
Args:
638
executor: Executor to use
639
io_loop: IOLoop instance
640
641
Returns:
642
Decorator function
643
"""
644
645
def chain_future(a: Future, b: Future):
646
"""
647
Chain two futures together.
648
649
Args:
650
a: Source future
651
b: Target future
652
"""
653
654
def future_set_result_unless_cancelled(future: Future, value):
655
"""
656
Set future result unless cancelled.
657
658
Args:
659
future: Future to set
660
value: Result value
661
"""
662
663
def future_set_exception_unless_cancelled(future: Future, exc):
664
"""
665
Set future exception unless cancelled.
666
667
Args:
668
future: Future to set
669
exc: Exception to set
670
"""
671
672
async def with_timeout(timeout: float, future: Future):
673
"""
674
Wrap future with timeout.
675
676
Args:
677
timeout: Timeout in seconds
678
future: Future to wrap
679
680
Returns:
681
Future result
682
683
Raises:
684
asyncio.TimeoutError: If timeout expires
685
"""
686
687
async def sleep(duration: float):
688
"""
689
Sleep for specified duration.
690
691
Args:
692
duration: Sleep duration in seconds
693
"""
694
695
class DummyExecutor:
696
"""Executor that runs functions synchronously."""
697
698
def submit(self, fn, *args, **kwargs):
699
"""Submit function for execution."""
700
```
701
702
## Types
703
704
```python { .api }
705
# Event mask constants for IOLoop
706
READ = 0x001
707
WRITE = 0x004
708
ERROR = 0x008
709
710
# Timeout handle type
711
TimeoutHandle = object
712
713
# File descriptor type
714
FileDescriptor = Union[int, socket.socket]
715
716
# Event callback type
717
EventCallback = Callable[[int, int], None]
718
719
# Timeout callback type
720
TimeoutCallback = Callable[[], None]
721
722
# Stream callback types
723
StreamCallback = Callable[[bytes], None]
724
CloseCallback = Callable[[], None]
725
```
726
727
## Exceptions
728
729
```python { .api }
730
class StreamClosedError(Exception):
731
"""Exception when stream operation attempted on closed stream."""
732
733
def __init__(self, real_error=None):
734
"""
735
Initialize stream closed error.
736
737
Args:
738
real_error: Underlying error that caused closure
739
"""
740
741
class UnsatisfiableReadError(Exception):
742
"""Exception when read cannot be satisfied."""
743
744
class StreamBufferFullError(Exception):
745
"""Exception when stream buffer is full."""
746
747
class QueueEmpty(Exception):
748
"""Exception when queue is empty."""
749
750
class QueueFull(Exception):
751
"""Exception when queue is full."""
752
```