An asynchronous networking framework written in Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core event loop and asynchronous programming primitives that form the foundation of Twisted's networking capabilities. The reactor provides the central event loop, while Deferreds manage asynchronous operations and callbacks.
Deferred objects are Twisted's implementation of promises/futures for managing asynchronous operations. They allow chaining callbacks and error handlers for non-blocking operations.
class defer.Deferred:
"""
A Deferred object represents a value that may not be available yet.
Methods:
- addCallback(callback): Add success callback
- addErrback(errback): Add error callback
- addBoth(callback): Add callback for both success and error
- callback(result): Fire with success result
- errback(failure): Fire with error
- cancel(): Cancel the deferred
"""
def addCallback(self, callback, *args, **kwargs): ...
def addErrback(self, errback, *args, **kwargs): ...
def addBoth(self, callback, *args, **kwargs): ...
def callback(self, result): ...
def errback(self, failure): ...
def cancel(self): ...
def defer.succeed(result):
"""
Create a Deferred that has already been fired with a success result.
Args:
result: The result value
Returns:
Deferred: Already-fired deferred
"""
def defer.fail(failure):
"""
Create a Deferred that has already been fired with a failure.
Args:
failure: Failure object or exception
Returns:
Deferred: Already-failed deferred
"""
def defer.gatherResults(deferredList):
"""
Collect results from multiple deferreds into a list.
Args:
deferredList (list): List of Deferred objects
Returns:
Deferred: Fires with list of results when all complete
"""
def defer.maybeDeferred(f, *args, **kwargs):
"""
Ensure a function call returns a Deferred.
Args:
f: Function to call
*args, **kwargs: Arguments to pass to function
Returns:
Deferred: Either the returned Deferred or wrapped result
"""
def defer.inlineCallbacks(func):
"""
Decorator allowing generator-based async/await-like syntax.
Args:
func: Generator function using yield for async operations
Returns:
Function that returns a Deferred
"""
def defer.returnValue(value):
"""
Return a value from an inlineCallbacks generator.
Args:
value: Value to return
"""Usage Example:
from twisted.internet import defer
@defer.inlineCallbacks
def processData(data):
# Async operations using yield
result1 = yield asyncOperation1(data)
result2 = yield asyncOperation2(result1)
defer.returnValue(result2)
# Using callbacks
d = processData("input")
d.addCallback(lambda result: print(f"Final result: {result}"))
d.addErrback(lambda failure: print(f"Error: {failure.value}"))Utilities for managing multiple Deferred objects and coordinating asynchronous operations.
class defer.DeferredList(defer.Deferred):
"""
Collect results from multiple Deferreds into a single result.
Attributes:
- fireOnOneCallback: Fire when first Deferred succeeds
- fireOnOneErrback: Fire when first Deferred fails
"""
def __init__(self, deferredList, fireOnOneCallback=False, fireOnOneErrback=False, consumeErrors=False):
"""
Args:
deferredList (list): List of Deferred objects to track
fireOnOneCallback (bool): Fire on first success
fireOnOneErrback (bool): Fire on first failure
consumeErrors (bool): Consume errors from component Deferreds
"""
def defer.gatherResults(deferredList, consumeErrors=False):
"""
Collect results from multiple deferreds into a list (equivalent to DeferredList with consumeErrors=True).
Args:
deferredList (list): List of Deferred objects
consumeErrors (bool): Whether to consume errors
Returns:
Deferred: Fires with list of results when all complete
"""Synchronization primitives for coordinating asynchronous operations.
class defer.DeferredLock:
"""
An event-driven lock for coordinating access to shared resources.
Attributes:
- locked: True when acquired, False otherwise
"""
locked = False
def acquire(self):
"""
Acquire the lock.
Returns:
Deferred[DeferredLock]: Fires with lock when acquired
"""
def release(self):
"""Release the lock."""
class defer.DeferredSemaphore:
"""
An event-driven semaphore for limiting concurrent operations.
Attributes:
- tokens: Available tokens
- limit: Maximum tokens
"""
def __init__(self, tokens):
"""
Args:
tokens (int): Number of tokens (must be >= 1)
"""
def acquire(self):
"""
Acquire a token.
Returns:
Deferred[DeferredSemaphore]: Fires when token acquired
"""
def release(self):
"""Release a token."""
class defer.DeferredQueue:
"""
An event-driven queue for producer-consumer scenarios.
Attributes:
- size: Maximum queue size (None for unlimited)
- backlog: Maximum waiting gets (None for unlimited)
"""
def __init__(self, size=None, backlog=None):
"""
Args:
size (int): Maximum queue size
backlog (int): Maximum pending gets
"""
def put(self, obj):
"""
Add an object to the queue.
Args:
obj: Object to add
Raises:
QueueOverflow: Queue is full
"""
def get(self):
"""
Get an object from the queue.
Returns:
Deferred: Fires with object when available
"""Usage Example:
from twisted.internet import defer
# Using DeferredList
d1 = someAsyncOperation()
d2 = anotherAsyncOperation()
dl = defer.DeferredList([d1, d2])
dl.addCallback(lambda results: print(f"All done: {results}"))
# Using DeferredLock for resource coordination
lock = defer.DeferredLock()
@defer.inlineCallbacks
def useResource():
yield lock.acquire()
try:
# Use shared resource
pass
finally:
lock.release()
# Using DeferredQueue for producer-consumer
queue = defer.DeferredQueue()
queue.put("item1")
d = queue.get()
d.addCallback(lambda item: print(f"Got: {item}"))Network protocols and factories for managing connections and implementing network services.
class protocol.Protocol:
"""
Base class for network protocols.
Key Methods:
- connectionMade(): Called when connection established
- dataReceived(data): Called when data arrives
- connectionLost(reason): Called when connection ends
"""
def connectionMade(self): ...
def dataReceived(self, data: bytes): ...
def connectionLost(self, reason): ...
transport = None # Set by framework to ITransport
class protocol.Factory:
"""
Base class for protocol factories.
"""
def buildProtocol(self, addr): ...
def startedConnecting(self, connector): ...
def clientConnectionFailed(self, connector, reason): ...
def clientConnectionLost(self, connector, reason): ...
class protocol.ClientFactory(protocol.Factory):
"""
Factory for client connections.
"""
class protocol.ServerFactory(protocol.Factory):
"""
Factory for server connections.
"""
class protocol.ReconnectingClientFactory(protocol.ClientFactory):
"""
Client factory that automatically reconnects on connection loss.
Attributes:
- initialDelay: Initial reconnection delay in seconds
- maxDelay: Maximum reconnection delay
- factor: Backoff multiplier
"""
initialDelay = 1.0
maxDelay = 3600.0
factor = 2.0Usage Example:
from twisted.internet import protocol, reactor, endpoints
class EchoProtocol(protocol.Protocol):
def connectionMade(self):
print(f"Connection from {self.transport.getPeer()}")
def dataReceived(self, data):
print(f"Received: {data.decode()}")
self.transport.write(data) # Echo back
def connectionLost(self, reason):
print(f"Connection lost: {reason.value}")
class EchoFactory(protocol.ServerFactory):
def buildProtocol(self, addr):
return EchoProtocol()
# Start server
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8080)
endpoint.listen(EchoFactory())High-level abstractions for creating network connections without dealing directly with reactor APIs.
def endpoints.clientFromString(reactor, description):
"""
Create a client endpoint from string description.
Args:
reactor: The reactor to use
description (str): Endpoint description (e.g., "tcp:host=example.com:port=80")
Returns:
IStreamClientEndpoint: Client endpoint
"""
def endpoints.serverFromString(reactor, description):
"""
Create a server endpoint from string description.
Args:
reactor: The reactor to use
description (str): Endpoint description (e.g., "tcp:port=8080")
Returns:
IStreamServerEndpoint: Server endpoint
"""
def endpoints.connectProtocol(endpoint, protocol):
"""
Connect a protocol instance to an endpoint.
Args:
endpoint: Client endpoint
protocol: Protocol instance
Returns:
Deferred: Fires with protocol when connected
"""
class endpoints.TCP4ServerEndpoint:
"""
IPv4 TCP server endpoint.
"""
def __init__(self, reactor, port, backlog=50, interface=''):
"""
Args:
reactor: The reactor
port (int): Port number to bind
backlog (int): Listen backlog size
interface (str): Interface to bind ('127.0.0.1', '', etc.)
"""
def listen(self, factory):
"""
Start listening with the given factory.
Args:
factory: Protocol factory
Returns:
Deferred: Fires with IListeningPort when listening
"""
class endpoints.TCP4ClientEndpoint:
"""
IPv4 TCP client endpoint.
"""
def __init__(self, reactor, host, port, timeout=30, bindAddress=None):
"""
Args:
reactor: The reactor
host (str): Hostname to connect to
port (int): Port number
timeout (int): Connection timeout in seconds
bindAddress: Local address to bind
"""
def connect(self, factory):
"""
Connect using the given factory.
Args:
factory: Protocol factory
Returns:
Deferred: Fires with protocol when connected
"""
class endpoints.UNIXServerEndpoint:
"""
Unix domain socket server endpoint.
"""
def __init__(self, reactor, address, backlog=50, mode=0o666, wantPID=0):
"""
Args:
reactor: The reactor
address (str): Socket file path
backlog (int): Listen backlog
mode (int): Socket file permissions
wantPID (bool): Include PID in address
"""
class endpoints.SSL4ServerEndpoint:
"""
SSL server endpoint.
"""
def __init__(self, reactor, port, sslContextFactory, backlog=50, interface=''):
"""
Args:
reactor: The reactor
port (int): Port to bind
sslContextFactory: SSL context factory
backlog (int): Listen backlog
interface (str): Interface to bind
"""Usage Example:
from twisted.internet import reactor, endpoints, protocol
class SimpleProtocol(protocol.Protocol):
def connectionMade(self):
self.transport.write(b"Hello, world!")
self.transport.loseConnection()
# Using string descriptions
server_endpoint = endpoints.serverFromString(reactor, "tcp:port=8080")
client_endpoint = endpoints.clientFromString(reactor, "tcp:host=localhost:port=8080")
# Direct endpoint creation
server_endpoint = endpoints.TCP4ServerEndpoint(reactor, 8080)
client_endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8080)
# Connect
d = client_endpoint.connect(protocol.ClientFactory())
d.addCallback(lambda proto: proto.transport.write(b"data"))Utilities for scheduling and managing recurring tasks and delayed operations.
class task.LoopingCall:
"""
Repeatedly call a function at regular intervals.
"""
def __init__(self, f, *args, **kwargs):
"""
Args:
f: Function to call
*args, **kwargs: Arguments for function
"""
def start(self, interval, now=True):
"""
Start the looping call.
Args:
interval (float): Seconds between calls
now (bool): Whether to call immediately
Returns:
Deferred: Fires when stopped
"""
def stop(self):
"""Stop the looping call."""
def task.deferLater(reactor, delay, callable, *args, **kwargs):
"""
Schedule a function call after a delay.
Args:
reactor: The reactor
delay (float): Delay in seconds
callable: Function to call
*args, **kwargs: Arguments for function
Returns:
Deferred: Fires with function result after delay
"""
def task.react(main, argv=None, reactor=None):
"""
Run a main function with the reactor, handling startup and shutdown.
Args:
main: Main function that takes (reactor, *argv)
argv: Command line arguments
reactor: Reactor to use (default: global reactor)
"""
class task.Clock:
"""
Deterministic clock for testing time-based operations.
"""
def advance(self, amount):
"""
Advance the clock by the given amount.
Args:
amount (float): Seconds to advance
"""
def callLater(self, delay, callable, *args, **kwargs):
"""
Schedule a delayed call.
Args:
delay (float): Delay in seconds
callable: Function to call
*args, **kwargs: Arguments
Returns:
IDelayedCall: Scheduled call object
"""Usage Example:
from twisted.internet import task, reactor
# Looping call
def heartbeat():
print("Heartbeat")
lc = task.LoopingCall(heartbeat)
lc.start(1.0) # Call every second
# Delayed call
def delayed_action():
print("Delayed action executed")
d = task.deferLater(reactor, 5.0, delayed_action)
d.addCallback(lambda _: print("Delay completed"))
# Using react for main function
def main(reactor, name):
print(f"Hello, {name}!")
return task.deferLater(reactor, 2.0, reactor.stop)
task.react(main, ["World"])Direct reactor interface for low-level event loop control and network operations.
# The reactor is typically imported as a singleton
from twisted.internet import reactor
# Key reactor methods (available on reactor instance):
def reactor.run():
"""Start the event loop (blocks until stopped)."""
def reactor.stop():
"""Stop the event loop."""
def reactor.callLater(delay, callable, *args, **kwargs):
"""
Schedule a function call after a delay.
Args:
delay (float): Delay in seconds
callable: Function to call
*args, **kwargs: Arguments
Returns:
IDelayedCall: Scheduled call object
"""
def reactor.callWhenRunning(callable, *args, **kwargs):
"""
Schedule a function call when reactor starts running.
Args:
callable: Function to call
*args, **kwargs: Arguments
"""
def reactor.listenTCP(port, factory, backlog=50, interface=''):
"""
Listen for TCP connections.
Args:
port (int): Port number
factory: Protocol factory
backlog (int): Listen backlog
interface (str): Interface to bind
Returns:
IListeningPort: Listening port object
"""
def reactor.connectTCP(host, port, factory, timeout=30, bindAddress=None):
"""
Make a TCP connection.
Args:
host (str): Hostname
port (int): Port number
factory: Client factory
timeout (int): Connection timeout
bindAddress: Local bind address
Returns:
IConnector: Connection object
"""Common exception types and error handling patterns in Twisted's asynchronous operations.
# Common exceptions from twisted.internet.error
class error.ConnectionDone(Exception):
"""Connection was closed cleanly."""
class error.ConnectionLost(Exception):
"""Connection was lost unexpectedly."""
class error.ConnectionRefusedError(Exception):
"""Connection was refused by the remote host."""
class error.TimeoutError(Exception):
"""Operation timed out."""
class error.DNSLookupError(Exception):
"""DNS lookup failed."""
# From twisted.internet.defer
class defer.CancelledError(Exception):
"""Deferred was cancelled."""
class defer.AlreadyCalledError(Exception):
"""Deferred has already been fired."""
# From twisted.python.failure
class failure.Failure:
"""
Exception wrapper that preserves tracebacks across async boundaries.
Attributes:
- value: The wrapped exception
- type: Exception type
- tb: Traceback object
"""
def __init__(self, exc_value=None, exc_type=None, exc_tb=None): ...
def trap(self, *errorTypes): ...
def check(self, *errorTypes): ...
def getErrorMessage(self): ...
def printTraceback(self, file=None): ...Error Handling Example:
from twisted.internet import defer, error
from twisted.python import failure
def handleError(f):
"""Handle different types of failures."""
if f.check(error.ConnectionRefusedError):
print("Connection refused")
elif f.check(error.TimeoutError):
print("Operation timed out")
else:
print(f"Unexpected error: {f.value}")
f.printTraceback()
def riskyOperation():
# This might fail
d = defer.Deferred()
# ... async operation ...
return d
d = riskyOperation()
d.addErrback(handleError)Key interfaces used throughout Twisted's asynchronous I/O system.
# Common interfaces from twisted.internet.interfaces
class IDelayedCall:
"""
Interface for scheduled calls that can be cancelled.
Methods:
- cancel(): Cancel the scheduled call
- active(): Check if call is still scheduled
- getTime(): Get scheduled time
"""
def cancel(): ...
def active(): ...
def getTime(): ...
class IListeningPort:
"""
Interface for listening network ports.
Methods:
- getHost(): Get local address
- startListening(): Begin listening
- stopListening(): Stop listening and return Deferred
"""
def getHost(): ...
def startListening(): ...
def stopListening(): ...
class ITransport:
"""
Interface for network transports.
Methods:
- write(data): Write data to transport
- writeSequence(data): Write sequence of data
- loseConnection(): Close connection cleanly
- abortConnection(): Close connection immediately
- getPeer(): Get remote address
- getHost(): Get local address
"""
def write(data): ...
def writeSequence(data): ...
def loseConnection(): ...
def abortConnection(): ...
def getPeer(): ...
def getHost(): ...
# From twisted.python.failure
class Failure:
"""
Exception wrapper preserving tracebacks across async boundaries.
Attributes:
- value: The wrapped exception
- type: Exception type
- tb: Traceback object
"""
def __init__(self, exc_value=None, exc_type=None, exc_tb=None): ...
def trap(self, *errorTypes): ...
def check(self, *errorTypes): ...
def getErrorMessage(self): ...
def printTraceback(self, file=None): ...Complete import statements for using the asynchronous I/O capabilities:
# Core deferred and reactor functionality
from twisted.internet import defer, reactor, protocol, endpoints, task, error
from twisted.internet.interfaces import IDelayedCall, IListeningPort, ITransport
from twisted.python.failure import Failure
# Common import patterns
from twisted.internet.defer import (
Deferred, DeferredList, DeferredLock, DeferredSemaphore, DeferredQueue,
succeed, fail, gatherResults, maybeDeferred, inlineCallbacks, returnValue
)Install with Tessl CLI
npx tessl i tessl/pypi-twisted