0
# Asynchronous I/O and Reactors
1
2
Core event loop and asynchronous programming primitives that form the foundation of Twisted's networking capabilities. The reactor provides the central event loop, while Deferreds manage asynchronous operations and callbacks.
3
4
## Capabilities
5
6
### Deferred Objects
7
8
Deferred objects are Twisted's implementation of promises/futures for managing asynchronous operations. They allow chaining callbacks and error handlers for non-blocking operations.
9
10
```python { .api }
11
class defer.Deferred:
12
"""
13
A Deferred object represents a value that may not be available yet.
14
15
Methods:
16
- addCallback(callback): Add success callback
17
- addErrback(errback): Add error callback
18
- addBoth(callback): Add callback for both success and error
19
- callback(result): Fire with success result
20
- errback(failure): Fire with error
21
- cancel(): Cancel the deferred
22
"""
23
def addCallback(self, callback, *args, **kwargs): ...
24
def addErrback(self, errback, *args, **kwargs): ...
25
def addBoth(self, callback, *args, **kwargs): ...
26
def callback(self, result): ...
27
def errback(self, failure): ...
28
def cancel(self): ...
29
30
def defer.succeed(result):
31
"""
32
Create a Deferred that has already been fired with a success result.
33
34
Args:
35
result: The result value
36
37
Returns:
38
Deferred: Already-fired deferred
39
"""
40
41
def defer.fail(failure):
42
"""
43
Create a Deferred that has already been fired with a failure.
44
45
Args:
46
failure: Failure object or exception
47
48
Returns:
49
Deferred: Already-failed deferred
50
"""
51
52
def defer.gatherResults(deferredList):
53
"""
54
Collect results from multiple deferreds into a list.
55
56
Args:
57
deferredList (list): List of Deferred objects
58
59
Returns:
60
Deferred: Fires with list of results when all complete
61
"""
62
63
def defer.maybeDeferred(f, *args, **kwargs):
64
"""
65
Ensure a function call returns a Deferred.
66
67
Args:
68
f: Function to call
69
*args, **kwargs: Arguments to pass to function
70
71
Returns:
72
Deferred: Either the returned Deferred or wrapped result
73
"""
74
75
def defer.inlineCallbacks(func):
76
"""
77
Decorator allowing generator-based async/await-like syntax.
78
79
Args:
80
func: Generator function using yield for async operations
81
82
Returns:
83
Function that returns a Deferred
84
"""
85
86
def defer.returnValue(value):
87
"""
88
Return a value from an inlineCallbacks generator.
89
90
Args:
91
value: Value to return
92
"""
93
```
94
95
**Usage Example**:
96
97
```python
98
from twisted.internet import defer
99
100
@defer.inlineCallbacks
101
def processData(data):
102
# Async operations using yield
103
result1 = yield asyncOperation1(data)
104
result2 = yield asyncOperation2(result1)
105
defer.returnValue(result2)
106
107
# Using callbacks
108
d = processData("input")
109
d.addCallback(lambda result: print(f"Final result: {result}"))
110
d.addErrback(lambda failure: print(f"Error: {failure.value}"))
111
```
112
113
### Deferred Collections and Coordination
114
115
Utilities for managing multiple Deferred objects and coordinating asynchronous operations.
116
117
```python { .api }
118
class defer.DeferredList(defer.Deferred):
119
"""
120
Collect results from multiple Deferreds into a single result.
121
122
Attributes:
123
- fireOnOneCallback: Fire when first Deferred succeeds
124
- fireOnOneErrback: Fire when first Deferred fails
125
"""
126
def __init__(self, deferredList, fireOnOneCallback=False, fireOnOneErrback=False, consumeErrors=False):
127
"""
128
Args:
129
deferredList (list): List of Deferred objects to track
130
fireOnOneCallback (bool): Fire on first success
131
fireOnOneErrback (bool): Fire on first failure
132
consumeErrors (bool): Consume errors from component Deferreds
133
"""
134
135
def defer.gatherResults(deferredList, consumeErrors=False):
136
"""
137
Collect results from multiple deferreds into a list (equivalent to DeferredList with consumeErrors=True).
138
139
Args:
140
deferredList (list): List of Deferred objects
141
consumeErrors (bool): Whether to consume errors
142
143
Returns:
144
Deferred: Fires with list of results when all complete
145
"""
146
```
147
148
### Concurrency Primitives
149
150
Synchronization primitives for coordinating asynchronous operations.
151
152
```python { .api }
153
class defer.DeferredLock:
154
"""
155
An event-driven lock for coordinating access to shared resources.
156
157
Attributes:
158
- locked: True when acquired, False otherwise
159
"""
160
locked = False
161
162
def acquire(self):
163
"""
164
Acquire the lock.
165
166
Returns:
167
Deferred[DeferredLock]: Fires with lock when acquired
168
"""
169
170
def release(self):
171
"""Release the lock."""
172
173
class defer.DeferredSemaphore:
174
"""
175
An event-driven semaphore for limiting concurrent operations.
176
177
Attributes:
178
- tokens: Available tokens
179
- limit: Maximum tokens
180
"""
181
def __init__(self, tokens):
182
"""
183
Args:
184
tokens (int): Number of tokens (must be >= 1)
185
"""
186
187
def acquire(self):
188
"""
189
Acquire a token.
190
191
Returns:
192
Deferred[DeferredSemaphore]: Fires when token acquired
193
"""
194
195
def release(self):
196
"""Release a token."""
197
198
class defer.DeferredQueue:
199
"""
200
An event-driven queue for producer-consumer scenarios.
201
202
Attributes:
203
- size: Maximum queue size (None for unlimited)
204
- backlog: Maximum waiting gets (None for unlimited)
205
"""
206
def __init__(self, size=None, backlog=None):
207
"""
208
Args:
209
size (int): Maximum queue size
210
backlog (int): Maximum pending gets
211
"""
212
213
def put(self, obj):
214
"""
215
Add an object to the queue.
216
217
Args:
218
obj: Object to add
219
220
Raises:
221
QueueOverflow: Queue is full
222
"""
223
224
def get(self):
225
"""
226
Get an object from the queue.
227
228
Returns:
229
Deferred: Fires with object when available
230
"""
231
```
232
233
**Usage Example**:
234
235
```python
236
from twisted.internet import defer
237
238
# Using DeferredList
239
d1 = someAsyncOperation()
240
d2 = anotherAsyncOperation()
241
dl = defer.DeferredList([d1, d2])
242
dl.addCallback(lambda results: print(f"All done: {results}"))
243
244
# Using DeferredLock for resource coordination
245
lock = defer.DeferredLock()
246
247
@defer.inlineCallbacks
248
def useResource():
249
yield lock.acquire()
250
try:
251
# Use shared resource
252
pass
253
finally:
254
lock.release()
255
256
# Using DeferredQueue for producer-consumer
257
queue = defer.DeferredQueue()
258
queue.put("item1")
259
d = queue.get()
260
d.addCallback(lambda item: print(f"Got: {item}"))
261
```
262
263
### Protocol and Factory
264
265
Network protocols and factories for managing connections and implementing network services.
266
267
```python { .api }
268
class protocol.Protocol:
269
"""
270
Base class for network protocols.
271
272
Key Methods:
273
- connectionMade(): Called when connection established
274
- dataReceived(data): Called when data arrives
275
- connectionLost(reason): Called when connection ends
276
"""
277
def connectionMade(self): ...
278
def dataReceived(self, data: bytes): ...
279
def connectionLost(self, reason): ...
280
281
transport = None # Set by framework to ITransport
282
283
class protocol.Factory:
284
"""
285
Base class for protocol factories.
286
"""
287
def buildProtocol(self, addr): ...
288
def startedConnecting(self, connector): ...
289
def clientConnectionFailed(self, connector, reason): ...
290
def clientConnectionLost(self, connector, reason): ...
291
292
class protocol.ClientFactory(protocol.Factory):
293
"""
294
Factory for client connections.
295
"""
296
297
class protocol.ServerFactory(protocol.Factory):
298
"""
299
Factory for server connections.
300
"""
301
302
class protocol.ReconnectingClientFactory(protocol.ClientFactory):
303
"""
304
Client factory that automatically reconnects on connection loss.
305
306
Attributes:
307
- initialDelay: Initial reconnection delay in seconds
308
- maxDelay: Maximum reconnection delay
309
- factor: Backoff multiplier
310
"""
311
initialDelay = 1.0
312
maxDelay = 3600.0
313
factor = 2.0
314
```
315
316
**Usage Example**:
317
318
```python
319
from twisted.internet import protocol, reactor, endpoints
320
321
class EchoProtocol(protocol.Protocol):
322
def connectionMade(self):
323
print(f"Connection from {self.transport.getPeer()}")
324
325
def dataReceived(self, data):
326
print(f"Received: {data.decode()}")
327
self.transport.write(data) # Echo back
328
329
def connectionLost(self, reason):
330
print(f"Connection lost: {reason.value}")
331
332
class EchoFactory(protocol.ServerFactory):
333
def buildProtocol(self, addr):
334
return EchoProtocol()
335
336
# Start server
337
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8080)
338
endpoint.listen(EchoFactory())
339
```
340
341
### Endpoints
342
343
High-level abstractions for creating network connections without dealing directly with reactor APIs.
344
345
```python { .api }
346
def endpoints.clientFromString(reactor, description):
347
"""
348
Create a client endpoint from string description.
349
350
Args:
351
reactor: The reactor to use
352
description (str): Endpoint description (e.g., "tcp:host=example.com:port=80")
353
354
Returns:
355
IStreamClientEndpoint: Client endpoint
356
"""
357
358
def endpoints.serverFromString(reactor, description):
359
"""
360
Create a server endpoint from string description.
361
362
Args:
363
reactor: The reactor to use
364
description (str): Endpoint description (e.g., "tcp:port=8080")
365
366
Returns:
367
IStreamServerEndpoint: Server endpoint
368
"""
369
370
def endpoints.connectProtocol(endpoint, protocol):
371
"""
372
Connect a protocol instance to an endpoint.
373
374
Args:
375
endpoint: Client endpoint
376
protocol: Protocol instance
377
378
Returns:
379
Deferred: Fires with protocol when connected
380
"""
381
382
class endpoints.TCP4ServerEndpoint:
383
"""
384
IPv4 TCP server endpoint.
385
"""
386
def __init__(self, reactor, port, backlog=50, interface=''):
387
"""
388
Args:
389
reactor: The reactor
390
port (int): Port number to bind
391
backlog (int): Listen backlog size
392
interface (str): Interface to bind ('127.0.0.1', '', etc.)
393
"""
394
395
def listen(self, factory):
396
"""
397
Start listening with the given factory.
398
399
Args:
400
factory: Protocol factory
401
402
Returns:
403
Deferred: Fires with IListeningPort when listening
404
"""
405
406
class endpoints.TCP4ClientEndpoint:
407
"""
408
IPv4 TCP client endpoint.
409
"""
410
def __init__(self, reactor, host, port, timeout=30, bindAddress=None):
411
"""
412
Args:
413
reactor: The reactor
414
host (str): Hostname to connect to
415
port (int): Port number
416
timeout (int): Connection timeout in seconds
417
bindAddress: Local address to bind
418
"""
419
420
def connect(self, factory):
421
"""
422
Connect using the given factory.
423
424
Args:
425
factory: Protocol factory
426
427
Returns:
428
Deferred: Fires with protocol when connected
429
"""
430
431
class endpoints.UNIXServerEndpoint:
432
"""
433
Unix domain socket server endpoint.
434
"""
435
def __init__(self, reactor, address, backlog=50, mode=0o666, wantPID=0):
436
"""
437
Args:
438
reactor: The reactor
439
address (str): Socket file path
440
backlog (int): Listen backlog
441
mode (int): Socket file permissions
442
wantPID (bool): Include PID in address
443
"""
444
445
class endpoints.SSL4ServerEndpoint:
446
"""
447
SSL server endpoint.
448
"""
449
def __init__(self, reactor, port, sslContextFactory, backlog=50, interface=''):
450
"""
451
Args:
452
reactor: The reactor
453
port (int): Port to bind
454
sslContextFactory: SSL context factory
455
backlog (int): Listen backlog
456
interface (str): Interface to bind
457
"""
458
```
459
460
**Usage Example**:
461
462
```python
463
from twisted.internet import reactor, endpoints, protocol
464
465
class SimpleProtocol(protocol.Protocol):
466
def connectionMade(self):
467
self.transport.write(b"Hello, world!")
468
self.transport.loseConnection()
469
470
# Using string descriptions
471
server_endpoint = endpoints.serverFromString(reactor, "tcp:port=8080")
472
client_endpoint = endpoints.clientFromString(reactor, "tcp:host=localhost:port=8080")
473
474
# Direct endpoint creation
475
server_endpoint = endpoints.TCP4ServerEndpoint(reactor, 8080)
476
client_endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8080)
477
478
# Connect
479
d = client_endpoint.connect(protocol.ClientFactory())
480
d.addCallback(lambda proto: proto.transport.write(b"data"))
481
```
482
483
### Task Scheduling
484
485
Utilities for scheduling and managing recurring tasks and delayed operations.
486
487
```python { .api }
488
class task.LoopingCall:
489
"""
490
Repeatedly call a function at regular intervals.
491
"""
492
def __init__(self, f, *args, **kwargs):
493
"""
494
Args:
495
f: Function to call
496
*args, **kwargs: Arguments for function
497
"""
498
499
def start(self, interval, now=True):
500
"""
501
Start the looping call.
502
503
Args:
504
interval (float): Seconds between calls
505
now (bool): Whether to call immediately
506
507
Returns:
508
Deferred: Fires when stopped
509
"""
510
511
def stop(self):
512
"""Stop the looping call."""
513
514
def task.deferLater(reactor, delay, callable, *args, **kwargs):
515
"""
516
Schedule a function call after a delay.
517
518
Args:
519
reactor: The reactor
520
delay (float): Delay in seconds
521
callable: Function to call
522
*args, **kwargs: Arguments for function
523
524
Returns:
525
Deferred: Fires with function result after delay
526
"""
527
528
def task.react(main, argv=None, reactor=None):
529
"""
530
Run a main function with the reactor, handling startup and shutdown.
531
532
Args:
533
main: Main function that takes (reactor, *argv)
534
argv: Command line arguments
535
reactor: Reactor to use (default: global reactor)
536
"""
537
538
class task.Clock:
539
"""
540
Deterministic clock for testing time-based operations.
541
"""
542
def advance(self, amount):
543
"""
544
Advance the clock by the given amount.
545
546
Args:
547
amount (float): Seconds to advance
548
"""
549
550
def callLater(self, delay, callable, *args, **kwargs):
551
"""
552
Schedule a delayed call.
553
554
Args:
555
delay (float): Delay in seconds
556
callable: Function to call
557
*args, **kwargs: Arguments
558
559
Returns:
560
IDelayedCall: Scheduled call object
561
"""
562
```
563
564
**Usage Example**:
565
566
```python
567
from twisted.internet import task, reactor
568
569
# Looping call
570
def heartbeat():
571
print("Heartbeat")
572
573
lc = task.LoopingCall(heartbeat)
574
lc.start(1.0) # Call every second
575
576
# Delayed call
577
def delayed_action():
578
print("Delayed action executed")
579
580
d = task.deferLater(reactor, 5.0, delayed_action)
581
d.addCallback(lambda _: print("Delay completed"))
582
583
# Using react for main function
584
def main(reactor, name):
585
print(f"Hello, {name}!")
586
return task.deferLater(reactor, 2.0, reactor.stop)
587
588
task.react(main, ["World"])
589
```
590
591
### Reactor Operations
592
593
Direct reactor interface for low-level event loop control and network operations.
594
595
```python { .api }
596
# The reactor is typically imported as a singleton
597
from twisted.internet import reactor
598
599
# Key reactor methods (available on reactor instance):
600
def reactor.run():
601
"""Start the event loop (blocks until stopped)."""
602
603
def reactor.stop():
604
"""Stop the event loop."""
605
606
def reactor.callLater(delay, callable, *args, **kwargs):
607
"""
608
Schedule a function call after a delay.
609
610
Args:
611
delay (float): Delay in seconds
612
callable: Function to call
613
*args, **kwargs: Arguments
614
615
Returns:
616
IDelayedCall: Scheduled call object
617
"""
618
619
def reactor.callWhenRunning(callable, *args, **kwargs):
620
"""
621
Schedule a function call when reactor starts running.
622
623
Args:
624
callable: Function to call
625
*args, **kwargs: Arguments
626
"""
627
628
def reactor.listenTCP(port, factory, backlog=50, interface=''):
629
"""
630
Listen for TCP connections.
631
632
Args:
633
port (int): Port number
634
factory: Protocol factory
635
backlog (int): Listen backlog
636
interface (str): Interface to bind
637
638
Returns:
639
IListeningPort: Listening port object
640
"""
641
642
def reactor.connectTCP(host, port, factory, timeout=30, bindAddress=None):
643
"""
644
Make a TCP connection.
645
646
Args:
647
host (str): Hostname
648
port (int): Port number
649
factory: Client factory
650
timeout (int): Connection timeout
651
bindAddress: Local bind address
652
653
Returns:
654
IConnector: Connection object
655
"""
656
```
657
658
### Error Handling
659
660
Common exception types and error handling patterns in Twisted's asynchronous operations.
661
662
```python { .api }
663
# Common exceptions from twisted.internet.error
664
class error.ConnectionDone(Exception):
665
"""Connection was closed cleanly."""
666
667
class error.ConnectionLost(Exception):
668
"""Connection was lost unexpectedly."""
669
670
class error.ConnectionRefusedError(Exception):
671
"""Connection was refused by the remote host."""
672
673
class error.TimeoutError(Exception):
674
"""Operation timed out."""
675
676
class error.DNSLookupError(Exception):
677
"""DNS lookup failed."""
678
679
# From twisted.internet.defer
680
class defer.CancelledError(Exception):
681
"""Deferred was cancelled."""
682
683
class defer.AlreadyCalledError(Exception):
684
"""Deferred has already been fired."""
685
686
# From twisted.python.failure
687
class failure.Failure:
688
"""
689
Exception wrapper that preserves tracebacks across async boundaries.
690
691
Attributes:
692
- value: The wrapped exception
693
- type: Exception type
694
- tb: Traceback object
695
"""
696
def __init__(self, exc_value=None, exc_type=None, exc_tb=None): ...
697
def trap(self, *errorTypes): ...
698
def check(self, *errorTypes): ...
699
def getErrorMessage(self): ...
700
def printTraceback(self, file=None): ...
701
```
702
703
**Error Handling Example**:
704
705
```python
706
from twisted.internet import defer, error
707
from twisted.python import failure
708
709
def handleError(f):
710
"""Handle different types of failures."""
711
if f.check(error.ConnectionRefusedError):
712
print("Connection refused")
713
elif f.check(error.TimeoutError):
714
print("Operation timed out")
715
else:
716
print(f"Unexpected error: {f.value}")
717
f.printTraceback()
718
719
def riskyOperation():
720
# This might fail
721
d = defer.Deferred()
722
# ... async operation ...
723
return d
724
725
d = riskyOperation()
726
d.addErrback(handleError)
727
```
728
729
### Interface Types
730
731
Key interfaces used throughout Twisted's asynchronous I/O system.
732
733
```python { .api }
734
# Common interfaces from twisted.internet.interfaces
735
class IDelayedCall:
736
"""
737
Interface for scheduled calls that can be cancelled.
738
739
Methods:
740
- cancel(): Cancel the scheduled call
741
- active(): Check if call is still scheduled
742
- getTime(): Get scheduled time
743
"""
744
def cancel(): ...
745
def active(): ...
746
def getTime(): ...
747
748
class IListeningPort:
749
"""
750
Interface for listening network ports.
751
752
Methods:
753
- getHost(): Get local address
754
- startListening(): Begin listening
755
- stopListening(): Stop listening and return Deferred
756
"""
757
def getHost(): ...
758
def startListening(): ...
759
def stopListening(): ...
760
761
class ITransport:
762
"""
763
Interface for network transports.
764
765
Methods:
766
- write(data): Write data to transport
767
- writeSequence(data): Write sequence of data
768
- loseConnection(): Close connection cleanly
769
- abortConnection(): Close connection immediately
770
- getPeer(): Get remote address
771
- getHost(): Get local address
772
"""
773
def write(data): ...
774
def writeSequence(data): ...
775
def loseConnection(): ...
776
def abortConnection(): ...
777
def getPeer(): ...
778
def getHost(): ...
779
780
# From twisted.python.failure
781
class Failure:
782
"""
783
Exception wrapper preserving tracebacks across async boundaries.
784
785
Attributes:
786
- value: The wrapped exception
787
- type: Exception type
788
- tb: Traceback object
789
"""
790
def __init__(self, exc_value=None, exc_type=None, exc_tb=None): ...
791
def trap(self, *errorTypes): ...
792
def check(self, *errorTypes): ...
793
def getErrorMessage(self): ...
794
def printTraceback(self, file=None): ...
795
```
796
797
### Required Imports
798
799
Complete import statements for using the asynchronous I/O capabilities:
800
801
```python
802
# Core deferred and reactor functionality
803
from twisted.internet import defer, reactor, protocol, endpoints, task, error
804
from twisted.internet.interfaces import IDelayedCall, IListeningPort, ITransport
805
from twisted.python.failure import Failure
806
807
# Common import patterns
808
from twisted.internet.defer import (
809
Deferred, DeferredList, DeferredLock, DeferredSemaphore, DeferredQueue,
810
succeed, fail, gatherResults, maybeDeferred, inlineCallbacks, returnValue
811
)
812
```