CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-twisted

An asynchronous networking framework written in Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

distributed.mddocs/

Distributed Objects

Perspective Broker (PB) system for distributed object communication and remote method calls across network boundaries. PB provides a way to transparently call methods on objects located on remote machines.

Capabilities

Perspective Broker Core

Core classes for creating remotely accessible objects and managing distributed communication.

class pb.Referenceable:
    """
    Base class for objects that can be referenced remotely.
    
    Methods prefixed with 'remote_' are exposed for remote calls.
    """
    def __init__(self):
        """Initialize referenceable object."""
    
    def remote_methodName(self, *args, **kwargs):
        """
        Example remote method.
        
        Methods starting with 'remote_' are callable from remote clients.
        Can return values directly or Deferreds for async operations.
        """
        pass
    
    def callRemote(self, method, *args, **kwargs):
        """
        Call method on remote object.
        
        Args:
            method (str): Method name (without 'remote_' prefix)
            *args, **kwargs: Method arguments
            
        Returns:
            Deferred: Method result
        """

class pb.Root:
    """
    Root object for PB hierarchy - the initial object clients connect to.
    
    The root object is what clients receive when they first connect.
    """
    def __init__(self):
        """Initialize root object."""
    
    def rootObject(self, broker):
        """
        Get the root object for a broker connection.
        
        Args:
            broker: PB broker instance
            
        Returns:
            Referenceable: Root object for this connection
        """
        return self

class pb.Avatar:
    """
    Authenticated user representation in PB system.
    
    Created by realm after successful authentication.
    """
    def __init__(self):
        """Initialize avatar."""
    
    def logout(self):
        """Called when user logs out."""

class pb.Perspective(pb.Avatar):
    """
    User's view of the PB system after authentication.
    
    Perspectives represent what an authenticated user can see and do.
    """
    def __init__(self, avatarId):
        """
        Args:
            avatarId: Unique identifier for this user
        """
        self.avatarId = avatarId
    
    def perspective_methodName(self, *args, **kwargs):
        """
        Perspective method callable by authenticated clients.
        
        Methods prefixed with 'perspective_' are exposed to authenticated users.
        """
        pass

class pb.Broker:
    """
    Message broker managing PB protocol communication.
    
    Handles serialization, transport, and method dispatch for PB.
    """
    def __init__(self, isClient=True, security=None):
        """
        Args:
            isClient (bool): Whether this is a client broker
            security: Security policy object
        """
    
    def registerReference(self, object):
        """
        Register object for remote references.
        
        Args:
            object: Object to register
            
        Returns:
            int: Reference ID
        """
    
    def unregisterReference(self, refnum):
        """
        Unregister object reference.
        
        Args:
            refnum (int): Reference ID to unregister
        """

Basic PB Usage Example:

from twisted.spread import pb
from twisted.internet import reactor, endpoints, defer

# Server-side objects
class MathService(pb.Referenceable):
    def remote_add(self, a, b):
        return a + b
    
    def remote_multiply(self, a, b):
        return a * b
    
    def remote_divide(self, a, b):
        if b == 0:
            raise ValueError("Division by zero")
        return a / b

class Calculator(pb.Root):
    def rootObject(self, broker):
        return MathService()

# Start PB server
factory = pb.PBServerFactory(Calculator())
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8800)
endpoint.listen(factory)

# Client code
@defer.inlineCallbacks
def client_example():
    factory = pb.PBClientFactory()
    endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8800)
    
    yield endpoint.connect(factory)
    root = yield factory.getRootObject()
    
    # Call remote methods
    result1 = yield root.callRemote("add", 5, 3)
    print(f"5 + 3 = {result1}")
    
    result2 = yield root.callRemote("multiply", 4, 7)
    print(f"4 * 7 = {result2}")
    
    try:
        result3 = yield root.callRemote("divide", 10, 0)
    except Exception as e:
        print(f"Division error: {e}")

print("PB server running on port 8800")
reactor.run()

PB Factories

Factory classes for creating PB client and server connections.

class pb.PBClientFactory:
    """
    Factory for PB client connections.
    """
    def __init__(self):
        """Initialize client factory."""
        self.deferred = defer.Deferred()
    
    def getRootObject(self):
        """
        Get root object from server.
        
        Returns:
            Deferred[RemoteReference]: Server's root object
        """
    
    def login(self, credentials, client=None):
        """
        Login with credentials.
        
        Args:
            credentials: User credentials
            client: Client object for authenticated session
            
        Returns:
            Deferred[Avatar]: User avatar after authentication
        """
    
    def buildProtocol(self, addr):
        """
        Build client protocol.
        
        Args:
            addr: Connection address
            
        Returns:
            Broker: Client broker
        """

class pb.PBServerFactory:
    """
    Factory for PB server connections.
    """
    def __init__(self, root, security=None, portal=None):
        """
        Args:
            root: Root object or realm
            security: Security policy
            portal: Authentication portal
        """
        self.root = root
        self.security = security
        self.portal = portal
    
    def buildProtocol(self, addr):
        """
        Build server protocol.
        
        Args:
            addr: Connection address
            
        Returns:
            Broker: Server broker
        """

def pb.connect(host, port, factory, contextFactory=None, bindAddress=None):
    """
    Connect to PB server.
    
    Args:
        host (str): Server hostname
        port (int): Server port
        factory: Client factory
        contextFactory: SSL context factory
        bindAddress: Local bind address
        
    Returns:
        Deferred: Connection result
    """

def pb.getObjectAt(host, port, timeout=30):
    """
    Get root object from PB server.
    
    Args:
        host (str): Server hostname
        port (int): Server port
        timeout (int): Connection timeout
        
    Returns:
        Deferred[RemoteReference]: Root object
    """

Remote References

Proxy objects representing remote objects.

class pb.RemoteReference:
    """
    Proxy for remote object.
    
    All method calls are transparently forwarded to the remote object.
    """
    def callRemote(self, method, *args, **kwargs):
        """
        Call method on remote object.
        
        Args:
            method (str): Method name
            *args, **kwargs: Method arguments
            
        Returns:
            Deferred: Method result
        """
    
    def remoteMethod(self, method):
        """
        Get callable for remote method.
        
        Args:
            method (str): Method name
            
        Returns:
            callable: Function that calls remote method
        """
    
    def notifyOnDisconnect(self, callback):
        """
        Register callback for disconnection.
        
        Args:
            callback: Function to call on disconnect
        """

class pb.ViewPoint:
    """
    Client's view of a remote perspective.
    """
    def __init__(self, perspective, broker):
        """
        Args:
            perspective: Remote perspective object
            broker: PB broker
        """

Authentication Integration

Integration with Twisted's authentication system.

class pb.IPerspective:
    """
    Interface for PB perspectives.
    """
    def perspectiveMessageReceived(broker, message, args, kw):
        """
        Handle message from client.
        
        Args:
            broker: PB broker
            message (str): Message name
            args (tuple): Message arguments
            kw (dict): Message keyword arguments
            
        Returns:
            Result of message handling
        """

class pb.Avatar(pb.IPerspective):
    """
    Authenticated user avatar for PB.
    """
    def __init__(self, avatarId):
        self.avatarId = avatarId
    
    def logout(self):
        """Called when user logs out."""
    
    def perspectiveMessageReceived(self, broker, message, args, kw):
        """Handle perspective method calls."""
        method = getattr(self, f'perspective_{message}', None)
        if method:
            return method(*args, **kw)
        raise AttributeError(f"No such method: perspective_{message}")

class pb.Realm:
    """
    PB realm for creating avatars.
    """
    def requestAvatar(self, avatarId, mind, *interfaces):
        """
        Create avatar for authenticated user.
        
        Args:
            avatarId: User identifier
            mind: Client perspective
            *interfaces: Requested interfaces
            
        Returns:
            tuple: (interface, avatar, logout_callable)
        """
        if pb.IPerspective in interfaces:
            avatar = pb.Avatar(avatarId)
            return (pb.IPerspective, avatar, avatar.logout)
        raise NotImplementedError()

Authenticated PB Example:

from twisted.spread import pb
from twisted.cred import portal, checkers, credentials
from twisted.internet import reactor, endpoints, defer

# Server with authentication
class UserAvatar(pb.Avatar):
    def __init__(self, username):
        self.username = username
    
    def perspective_getMessage(self):
        return f"Hello, {self.username}!"
    
    def perspective_getTime(self):
        import time
        return time.ctime()

class SimpleRealm:
    def requestAvatar(self, avatarId, mind, *interfaces):
        if pb.IPerspective in interfaces:
            avatar = UserAvatar(avatarId.decode())
            return (pb.IPerspective, avatar, avatar.logout)
        raise NotImplementedError()

# Set up authentication
realm = SimpleRealm()
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
checker.addUser(b"alice", b"password")
checker.addUser(b"bob", b"secret")

portal_obj = portal.Portal(realm, [checker])

# Start authenticated server
factory = pb.PBServerFactory(portal_obj)
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8800)
endpoint.listen(factory)

# Authenticated client
@defer.inlineCallbacks
def authenticated_client():
    factory = pb.PBClientFactory()
    endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8800)
    
    yield endpoint.connect(factory)
    
    # Login with credentials
    creds = credentials.UsernamePassword(b"alice", b"password")
    avatar = yield factory.login(creds)
    
    # Call perspective methods
    message = yield avatar.callRemote("getMessage")
    print(f"Message: {message}")
    
    time_str = yield avatar.callRemote("getTime")
    print(f"Server time: {time_str}")

authenticated_client()
reactor.run()

Copyable Objects

Objects that can be copied across the network.

class pb.Copyable:
    """
    Base class for objects that can be copied to remote side.
    
    The object is serialized and recreated on the remote side.
    """
    def __init__(self):
        """Initialize copyable object."""
    
    def getStateToCopy(self):
        """
        Get state to copy to remote side.
        
        Returns:
            dict: Object state to copy
        """
        return self.__dict__.copy()

class pb.RemoteCopy:
    """
    Base class for receiving copied objects.
    
    Paired with Copyable objects on the sending side.
    """
    def setCopyableState(self, state):
        """
        Restore object state from copy.
        
        Args:
            state (dict): Object state from remote side
        """
        self.__dict__.update(state)

def pb.setUnjellyableForClass(classname, unjellyable):
    """
    Register class for unjellying (deserializing).
    
    Args:
        classname (str): Class name
        unjellyable: Class to create when unjellying
    """

class pb.Cacheable:
    """
    Base class for objects that are cached on remote side.
    
    Only one copy exists on remote side, updated when changed.
    """
    def getStateToCacheAndObserveFor(self, perspective, observer):
        """
        Get state to cache and set up observation.
        
        Args:
            perspective: Remote perspective
            observer: Observer for changes
            
        Returns:
            dict: State to cache
        """

class pb.RemoteCache:
    """
    Base class for cached remote objects.
    """
    def setCacheableState(self, state):
        """
        Set cached object state.
        
        Args:
            state (dict): Object state
        """
    
    def observe_update(self, newState):
        """
        Handle state update from remote side.
        
        Args:
            newState (dict): Updated state
        """

Error Handling

PB-specific exceptions and error handling.

class pb.Error(Exception):
    """Base class for PB errors."""

class pb.DeadReferenceError(pb.Error):
    """Reference to dead remote object."""

class pb.PBConnectionLost(pb.Error):
    """PB connection was lost."""

class pb.RemoteError(pb.Error):
    """Error from remote side."""

class pb.CopyableFailure:
    """
    Serializable failure that can cross network boundaries.
    """
    def __init__(self, failure):
        """
        Args:
            failure: Failure object to wrap
        """
        self.type = qual(failure.type)
        self.value = failure.getErrorMessage()
        self.traceback = failure.getTraceback()

Error Handling Example:

from twisted.spread import pb
from twisted.internet import defer

class ErrorService(pb.Referenceable):
    def remote_causeError(self):
        raise ValueError("Something went wrong!")
    
    def remote_safeDivide(self, a, b):
        try:
            return a / b
        except ZeroDivisionError:
            # PB will serialize this exception
            raise ValueError("Cannot divide by zero")

# Client error handling
@defer.inlineCallbacks
def handle_errors():
    root = yield factory.getRootObject()
    
    try:
        yield root.callRemote("causeError")
    except Exception as e:
        print(f"Remote error: {e}")
    
    try:
        result = yield root.callRemote("safeDivide", 10, 0)
    except ValueError as e:
        print(f"Division error: {e}")

Jelly Serialization

Low-level serialization system used by PB.

def jelly.jelly(object):
    """
    Serialize object to jelly format.
    
    Args:
        object: Python object to serialize
        
    Returns:
        Serialized representation
    """

def jelly.unjelly(jellyData):
    """
    Deserialize object from jelly format.
    
    Args:
        jellyData: Serialized data
        
    Returns:
        Python object
    """

class jelly.Jellyable:
    """
    Base class for objects that can control their serialization.
    """
    def jellyFor(self, jellier):
        """
        Custom serialization method.
        
        Args:
            jellier: Jellier instance
            
        Returns:
            Serialized representation
        """

class jelly.Unjellyable:
    """
    Base class for objects that can control their deserialization.
    """
    def unjellyFor(self, unjellier, jellyList):
        """
        Custom deserialization method.
        
        Args:
            unjellier: Unjellier instance
            jellyList: Serialized data
            
        Returns:
            Deserialized object
        """

Complete PB Application Example:

from twisted.spread import pb
from twisted.internet import reactor, endpoints, defer
from twisted.cred import portal, checkers, credentials
from twisted.application import service

# Shared data objects
class SharedData(pb.Copyable, pb.RemoteCopy):
    """Data that can be copied across network."""
    def __init__(self, name, value):
        self.name = name
        self.value = value

# Register for deserialization
pb.setUnjellyableForClass(SharedData, SharedData)

# Service implementation
class DataService(pb.Avatar):
    def __init__(self, username):
        self.username = username
        self.data = {}
    
    def perspective_store(self, key, data):
        """Store data object."""
        self.data[key] = data
        return f"Stored {key} for {self.username}"
    
    def perspective_retrieve(self, key):
        """Retrieve data object."""
        if key in self.data:
            return self.data[key]
        raise KeyError(f"No data for key: {key}")
    
    def perspective_list_keys(self):
        """List all stored keys."""
        return list(self.data.keys())

# Realm
class DataRealm:
    def requestAvatar(self, avatarId, mind, *interfaces):
        if pb.IPerspective in interfaces:
            avatar = DataService(avatarId.decode())
            return (pb.IPerspective, avatar, lambda: None)
        raise NotImplementedError()

# Application setup
def setup_server():
    realm = DataRealm()
    checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
    checker.addUser(b"user1", b"pass1")
    checker.addUser(b"user2", b"pass2")
    
    portal_obj = portal.Portal(realm, [checker])
    factory = pb.PBServerFactory(portal_obj)
    
    endpoint = endpoints.TCP4ServerEndpoint(reactor, 8800)
    endpoint.listen(factory)
    print("PB data service running on port 8800")

# Client example
@defer.inlineCallbacks
def client_session():
    factory = pb.PBClientFactory()
    endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8800)
    
    yield endpoint.connect(factory)
    
    # Login
    creds = credentials.UsernamePassword(b"user1", b"pass1")
    avatar = yield factory.login(creds)
    
    # Store data
    data = SharedData("test_data", {"count": 42, "active": True})
    result = yield avatar.callRemote("store", "item1", data)
    print(result)
    
    # Retrieve data
    retrieved = yield avatar.callRemote("retrieve", "item1")
    print(f"Retrieved: {retrieved.name} = {retrieved.value}")
    
    # List keys
    keys = yield avatar.callRemote("list_keys")
    print(f"Keys: {keys}")

setup_server()
client_session()
reactor.run()

Install with Tessl CLI

npx tessl i tessl/pypi-twisted

docs

application.md

async-io.md

auth.md

distributed.md

dns.md

email.md

http.md

index.md

logging.md

protocols.md

ssh.md

testing.md

tile.json