CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-twisted

An asynchronous networking framework written in Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-io.mddocs/

Asynchronous I/O and Reactors

Core event loop and asynchronous programming primitives that form the foundation of Twisted's networking capabilities. The reactor provides the central event loop, while Deferreds manage asynchronous operations and callbacks.

Capabilities

Deferred Objects

Deferred objects are Twisted's implementation of promises/futures for managing asynchronous operations. They allow chaining callbacks and error handlers for non-blocking operations.

class defer.Deferred:
    """
    A Deferred object represents a value that may not be available yet.
    
    Methods:
    - addCallback(callback): Add success callback
    - addErrback(errback): Add error callback  
    - addBoth(callback): Add callback for both success and error
    - callback(result): Fire with success result
    - errback(failure): Fire with error
    - cancel(): Cancel the deferred
    """
    def addCallback(self, callback, *args, **kwargs): ...
    def addErrback(self, errback, *args, **kwargs): ...
    def addBoth(self, callback, *args, **kwargs): ...
    def callback(self, result): ...
    def errback(self, failure): ...
    def cancel(self): ...

def defer.succeed(result):
    """
    Create a Deferred that has already been fired with a success result.
    
    Args:
        result: The result value
        
    Returns:
        Deferred: Already-fired deferred
    """

def defer.fail(failure):
    """
    Create a Deferred that has already been fired with a failure.
    
    Args:
        failure: Failure object or exception
        
    Returns:
        Deferred: Already-failed deferred
    """

def defer.gatherResults(deferredList):
    """
    Collect results from multiple deferreds into a list.
    
    Args:
        deferredList (list): List of Deferred objects
        
    Returns:
        Deferred: Fires with list of results when all complete
    """

def defer.maybeDeferred(f, *args, **kwargs):
    """
    Ensure a function call returns a Deferred.
    
    Args:
        f: Function to call
        *args, **kwargs: Arguments to pass to function
        
    Returns:
        Deferred: Either the returned Deferred or wrapped result
    """

def defer.inlineCallbacks(func):
    """
    Decorator allowing generator-based async/await-like syntax.
    
    Args:
        func: Generator function using yield for async operations
        
    Returns:
        Function that returns a Deferred
    """

def defer.returnValue(value):
    """
    Return a value from an inlineCallbacks generator.
    
    Args:
        value: Value to return
    """

Usage Example:

from twisted.internet import defer

@defer.inlineCallbacks
def processData(data):
    # Async operations using yield
    result1 = yield asyncOperation1(data)
    result2 = yield asyncOperation2(result1)
    defer.returnValue(result2)

# Using callbacks
d = processData("input")
d.addCallback(lambda result: print(f"Final result: {result}"))
d.addErrback(lambda failure: print(f"Error: {failure.value}"))

Deferred Collections and Coordination

Utilities for managing multiple Deferred objects and coordinating asynchronous operations.

class defer.DeferredList(defer.Deferred):
    """
    Collect results from multiple Deferreds into a single result.
    
    Attributes:
    - fireOnOneCallback: Fire when first Deferred succeeds
    - fireOnOneErrback: Fire when first Deferred fails
    """
    def __init__(self, deferredList, fireOnOneCallback=False, fireOnOneErrback=False, consumeErrors=False):
        """
        Args:
            deferredList (list): List of Deferred objects to track
            fireOnOneCallback (bool): Fire on first success
            fireOnOneErrback (bool): Fire on first failure  
            consumeErrors (bool): Consume errors from component Deferreds
        """

def defer.gatherResults(deferredList, consumeErrors=False):
    """
    Collect results from multiple deferreds into a list (equivalent to DeferredList with consumeErrors=True).
    
    Args:
        deferredList (list): List of Deferred objects
        consumeErrors (bool): Whether to consume errors
        
    Returns:
        Deferred: Fires with list of results when all complete
    """

Concurrency Primitives

Synchronization primitives for coordinating asynchronous operations.

class defer.DeferredLock:
    """
    An event-driven lock for coordinating access to shared resources.
    
    Attributes:
    - locked: True when acquired, False otherwise
    """
    locked = False
    
    def acquire(self):
        """
        Acquire the lock.
        
        Returns:
            Deferred[DeferredLock]: Fires with lock when acquired
        """
    
    def release(self):
        """Release the lock."""

class defer.DeferredSemaphore:
    """
    An event-driven semaphore for limiting concurrent operations.
    
    Attributes:
    - tokens: Available tokens
    - limit: Maximum tokens
    """
    def __init__(self, tokens):
        """
        Args:
            tokens (int): Number of tokens (must be >= 1)
        """
    
    def acquire(self):
        """
        Acquire a token.
        
        Returns:
            Deferred[DeferredSemaphore]: Fires when token acquired
        """
    
    def release(self):
        """Release a token."""

class defer.DeferredQueue:
    """
    An event-driven queue for producer-consumer scenarios.
    
    Attributes:
    - size: Maximum queue size (None for unlimited)
    - backlog: Maximum waiting gets (None for unlimited)
    """
    def __init__(self, size=None, backlog=None):
        """
        Args:
            size (int): Maximum queue size
            backlog (int): Maximum pending gets
        """
    
    def put(self, obj):
        """
        Add an object to the queue.
        
        Args:
            obj: Object to add
            
        Raises:
            QueueOverflow: Queue is full
        """
    
    def get(self):
        """
        Get an object from the queue.
        
        Returns:
            Deferred: Fires with object when available
        """

Usage Example:

from twisted.internet import defer

# Using DeferredList
d1 = someAsyncOperation()
d2 = anotherAsyncOperation()
dl = defer.DeferredList([d1, d2])
dl.addCallback(lambda results: print(f"All done: {results}"))

# Using DeferredLock for resource coordination
lock = defer.DeferredLock()

@defer.inlineCallbacks
def useResource():
    yield lock.acquire()
    try:
        # Use shared resource
        pass
    finally:
        lock.release()

# Using DeferredQueue for producer-consumer
queue = defer.DeferredQueue()
queue.put("item1")
d = queue.get()
d.addCallback(lambda item: print(f"Got: {item}"))

Protocol and Factory

Network protocols and factories for managing connections and implementing network services.

class protocol.Protocol:
    """
    Base class for network protocols.
    
    Key Methods:
    - connectionMade(): Called when connection established
    - dataReceived(data): Called when data arrives
    - connectionLost(reason): Called when connection ends
    """
    def connectionMade(self): ...
    def dataReceived(self, data: bytes): ...
    def connectionLost(self, reason): ...
    
    transport = None  # Set by framework to ITransport

class protocol.Factory:
    """
    Base class for protocol factories.
    """
    def buildProtocol(self, addr): ...
    def startedConnecting(self, connector): ...
    def clientConnectionFailed(self, connector, reason): ...
    def clientConnectionLost(self, connector, reason): ...

class protocol.ClientFactory(protocol.Factory):
    """
    Factory for client connections.
    """

class protocol.ServerFactory(protocol.Factory):
    """
    Factory for server connections.
    """

class protocol.ReconnectingClientFactory(protocol.ClientFactory):
    """
    Client factory that automatically reconnects on connection loss.
    
    Attributes:
    - initialDelay: Initial reconnection delay in seconds
    - maxDelay: Maximum reconnection delay
    - factor: Backoff multiplier
    """
    initialDelay = 1.0
    maxDelay = 3600.0
    factor = 2.0

Usage Example:

from twisted.internet import protocol, reactor, endpoints

class EchoProtocol(protocol.Protocol):
    def connectionMade(self):
        print(f"Connection from {self.transport.getPeer()}")
    
    def dataReceived(self, data):
        print(f"Received: {data.decode()}")
        self.transport.write(data)  # Echo back
    
    def connectionLost(self, reason):
        print(f"Connection lost: {reason.value}")

class EchoFactory(protocol.ServerFactory):
    def buildProtocol(self, addr):
        return EchoProtocol()

# Start server
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8080)
endpoint.listen(EchoFactory())

Endpoints

High-level abstractions for creating network connections without dealing directly with reactor APIs.

def endpoints.clientFromString(reactor, description):
    """
    Create a client endpoint from string description.
    
    Args:
        reactor: The reactor to use
        description (str): Endpoint description (e.g., "tcp:host=example.com:port=80")
        
    Returns:
        IStreamClientEndpoint: Client endpoint
    """

def endpoints.serverFromString(reactor, description):
    """
    Create a server endpoint from string description.
    
    Args:
        reactor: The reactor to use  
        description (str): Endpoint description (e.g., "tcp:port=8080")
        
    Returns:
        IStreamServerEndpoint: Server endpoint
    """

def endpoints.connectProtocol(endpoint, protocol):
    """
    Connect a protocol instance to an endpoint.
    
    Args:
        endpoint: Client endpoint
        protocol: Protocol instance
        
    Returns:
        Deferred: Fires with protocol when connected
    """

class endpoints.TCP4ServerEndpoint:
    """
    IPv4 TCP server endpoint.
    """
    def __init__(self, reactor, port, backlog=50, interface=''):
        """
        Args:
            reactor: The reactor
            port (int): Port number to bind
            backlog (int): Listen backlog size
            interface (str): Interface to bind ('127.0.0.1', '', etc.)
        """
    
    def listen(self, factory):
        """
        Start listening with the given factory.
        
        Args:
            factory: Protocol factory
            
        Returns:
            Deferred: Fires with IListeningPort when listening
        """

class endpoints.TCP4ClientEndpoint:
    """
    IPv4 TCP client endpoint.
    """
    def __init__(self, reactor, host, port, timeout=30, bindAddress=None):
        """
        Args:
            reactor: The reactor
            host (str): Hostname to connect to
            port (int): Port number
            timeout (int): Connection timeout in seconds
            bindAddress: Local address to bind
        """
    
    def connect(self, factory):
        """
        Connect using the given factory.
        
        Args:
            factory: Protocol factory
            
        Returns:
            Deferred: Fires with protocol when connected
        """

class endpoints.UNIXServerEndpoint:
    """
    Unix domain socket server endpoint.
    """
    def __init__(self, reactor, address, backlog=50, mode=0o666, wantPID=0):
        """
        Args:
            reactor: The reactor
            address (str): Socket file path
            backlog (int): Listen backlog
            mode (int): Socket file permissions
            wantPID (bool): Include PID in address
        """

class endpoints.SSL4ServerEndpoint:
    """
    SSL server endpoint.
    """
    def __init__(self, reactor, port, sslContextFactory, backlog=50, interface=''):
        """
        Args:
            reactor: The reactor
            port (int): Port to bind
            sslContextFactory: SSL context factory
            backlog (int): Listen backlog
            interface (str): Interface to bind
        """

Usage Example:

from twisted.internet import reactor, endpoints, protocol

class SimpleProtocol(protocol.Protocol):
    def connectionMade(self):
        self.transport.write(b"Hello, world!")
        self.transport.loseConnection()

# Using string descriptions
server_endpoint = endpoints.serverFromString(reactor, "tcp:port=8080")
client_endpoint = endpoints.clientFromString(reactor, "tcp:host=localhost:port=8080")

# Direct endpoint creation
server_endpoint = endpoints.TCP4ServerEndpoint(reactor, 8080)
client_endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8080)

# Connect
d = client_endpoint.connect(protocol.ClientFactory())
d.addCallback(lambda proto: proto.transport.write(b"data"))

Task Scheduling

Utilities for scheduling and managing recurring tasks and delayed operations.

class task.LoopingCall:
    """
    Repeatedly call a function at regular intervals.
    """
    def __init__(self, f, *args, **kwargs):
        """
        Args:
            f: Function to call
            *args, **kwargs: Arguments for function
        """
    
    def start(self, interval, now=True):
        """
        Start the looping call.
        
        Args:
            interval (float): Seconds between calls
            now (bool): Whether to call immediately
            
        Returns:
            Deferred: Fires when stopped
        """
    
    def stop(self):
        """Stop the looping call."""

def task.deferLater(reactor, delay, callable, *args, **kwargs):
    """
    Schedule a function call after a delay.
    
    Args:
        reactor: The reactor
        delay (float): Delay in seconds
        callable: Function to call
        *args, **kwargs: Arguments for function
        
    Returns:
        Deferred: Fires with function result after delay
    """

def task.react(main, argv=None, reactor=None):
    """
    Run a main function with the reactor, handling startup and shutdown.
    
    Args:
        main: Main function that takes (reactor, *argv)
        argv: Command line arguments
        reactor: Reactor to use (default: global reactor)
    """

class task.Clock:
    """
    Deterministic clock for testing time-based operations.
    """
    def advance(self, amount):
        """
        Advance the clock by the given amount.
        
        Args:
            amount (float): Seconds to advance
        """
    
    def callLater(self, delay, callable, *args, **kwargs):
        """
        Schedule a delayed call.
        
        Args:
            delay (float): Delay in seconds
            callable: Function to call
            *args, **kwargs: Arguments
            
        Returns:
            IDelayedCall: Scheduled call object
        """

Usage Example:

from twisted.internet import task, reactor

# Looping call
def heartbeat():
    print("Heartbeat")

lc = task.LoopingCall(heartbeat)
lc.start(1.0)  # Call every second

# Delayed call
def delayed_action():
    print("Delayed action executed")

d = task.deferLater(reactor, 5.0, delayed_action)
d.addCallback(lambda _: print("Delay completed"))

# Using react for main function
def main(reactor, name):
    print(f"Hello, {name}!")
    return task.deferLater(reactor, 2.0, reactor.stop)

task.react(main, ["World"])

Reactor Operations

Direct reactor interface for low-level event loop control and network operations.

# The reactor is typically imported as a singleton
from twisted.internet import reactor

# Key reactor methods (available on reactor instance):
def reactor.run():
    """Start the event loop (blocks until stopped)."""

def reactor.stop():
    """Stop the event loop."""

def reactor.callLater(delay, callable, *args, **kwargs):
    """
    Schedule a function call after a delay.
    
    Args:
        delay (float): Delay in seconds
        callable: Function to call
        *args, **kwargs: Arguments
        
    Returns:
        IDelayedCall: Scheduled call object
    """

def reactor.callWhenRunning(callable, *args, **kwargs):
    """
    Schedule a function call when reactor starts running.
    
    Args:
        callable: Function to call
        *args, **kwargs: Arguments
    """

def reactor.listenTCP(port, factory, backlog=50, interface=''):
    """
    Listen for TCP connections.
    
    Args:
        port (int): Port number
        factory: Protocol factory
        backlog (int): Listen backlog
        interface (str): Interface to bind
        
    Returns:
        IListeningPort: Listening port object
    """

def reactor.connectTCP(host, port, factory, timeout=30, bindAddress=None):
    """
    Make a TCP connection.
    
    Args:
        host (str): Hostname
        port (int): Port number
        factory: Client factory
        timeout (int): Connection timeout
        bindAddress: Local bind address
        
    Returns:
        IConnector: Connection object
    """

Error Handling

Common exception types and error handling patterns in Twisted's asynchronous operations.

# Common exceptions from twisted.internet.error
class error.ConnectionDone(Exception):
    """Connection was closed cleanly."""

class error.ConnectionLost(Exception):
    """Connection was lost unexpectedly."""

class error.ConnectionRefusedError(Exception):
    """Connection was refused by the remote host."""

class error.TimeoutError(Exception):
    """Operation timed out."""

class error.DNSLookupError(Exception):
    """DNS lookup failed."""

# From twisted.internet.defer
class defer.CancelledError(Exception):
    """Deferred was cancelled."""

class defer.AlreadyCalledError(Exception):
    """Deferred has already been fired."""

# From twisted.python.failure
class failure.Failure:
    """
    Exception wrapper that preserves tracebacks across async boundaries.
    
    Attributes:
    - value: The wrapped exception
    - type: Exception type
    - tb: Traceback object
    """
    def __init__(self, exc_value=None, exc_type=None, exc_tb=None): ...
    def trap(self, *errorTypes): ...
    def check(self, *errorTypes): ...
    def getErrorMessage(self): ...
    def printTraceback(self, file=None): ...

Error Handling Example:

from twisted.internet import defer, error
from twisted.python import failure

def handleError(f):
    """Handle different types of failures."""
    if f.check(error.ConnectionRefusedError):
        print("Connection refused")
    elif f.check(error.TimeoutError):
        print("Operation timed out")
    else:
        print(f"Unexpected error: {f.value}")
        f.printTraceback()

def riskyOperation():
    # This might fail
    d = defer.Deferred()
    # ... async operation ...
    return d

d = riskyOperation()
d.addErrback(handleError)

Interface Types

Key interfaces used throughout Twisted's asynchronous I/O system.

# Common interfaces from twisted.internet.interfaces
class IDelayedCall:
    """
    Interface for scheduled calls that can be cancelled.
    
    Methods:
    - cancel(): Cancel the scheduled call
    - active(): Check if call is still scheduled
    - getTime(): Get scheduled time
    """
    def cancel(): ...
    def active(): ...
    def getTime(): ...

class IListeningPort:
    """
    Interface for listening network ports.
    
    Methods:
    - getHost(): Get local address
    - startListening(): Begin listening
    - stopListening(): Stop listening and return Deferred
    """
    def getHost(): ...
    def startListening(): ...
    def stopListening(): ...

class ITransport:
    """
    Interface for network transports.
    
    Methods:
    - write(data): Write data to transport
    - writeSequence(data): Write sequence of data
    - loseConnection(): Close connection cleanly
    - abortConnection(): Close connection immediately
    - getPeer(): Get remote address
    - getHost(): Get local address
    """
    def write(data): ...
    def writeSequence(data): ...
    def loseConnection(): ...
    def abortConnection(): ...
    def getPeer(): ...
    def getHost(): ...

# From twisted.python.failure
class Failure:
    """
    Exception wrapper preserving tracebacks across async boundaries.
    
    Attributes:
    - value: The wrapped exception
    - type: Exception type  
    - tb: Traceback object
    """
    def __init__(self, exc_value=None, exc_type=None, exc_tb=None): ...
    def trap(self, *errorTypes): ...
    def check(self, *errorTypes): ...
    def getErrorMessage(self): ...
    def printTraceback(self, file=None): ...

Required Imports

Complete import statements for using the asynchronous I/O capabilities:

# Core deferred and reactor functionality
from twisted.internet import defer, reactor, protocol, endpoints, task, error
from twisted.internet.interfaces import IDelayedCall, IListeningPort, ITransport
from twisted.python.failure import Failure

# Common import patterns
from twisted.internet.defer import (
    Deferred, DeferredList, DeferredLock, DeferredSemaphore, DeferredQueue,
    succeed, fail, gatherResults, maybeDeferred, inlineCallbacks, returnValue
)

Install with Tessl CLI

npx tessl i tessl/pypi-twisted

docs

application.md

async-io.md

auth.md

distributed.md

dns.md

email.md

http.md

index.md

logging.md

protocols.md

ssh.md

testing.md

tile.json