An asynchronous networking framework written in Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Perspective Broker (PB) system for distributed object communication and remote method calls across network boundaries. PB provides a way to transparently call methods on objects located on remote machines.
Core classes for creating remotely accessible objects and managing distributed communication.
class pb.Referenceable:
"""
Base class for objects that can be referenced remotely.
Methods prefixed with 'remote_' are exposed for remote calls.
"""
def __init__(self):
"""Initialize referenceable object."""
def remote_methodName(self, *args, **kwargs):
"""
Example remote method.
Methods starting with 'remote_' are callable from remote clients.
Can return values directly or Deferreds for async operations.
"""
pass
def callRemote(self, method, *args, **kwargs):
"""
Call method on remote object.
Args:
method (str): Method name (without 'remote_' prefix)
*args, **kwargs: Method arguments
Returns:
Deferred: Method result
"""
class pb.Root:
"""
Root object for PB hierarchy - the initial object clients connect to.
The root object is what clients receive when they first connect.
"""
def __init__(self):
"""Initialize root object."""
def rootObject(self, broker):
"""
Get the root object for a broker connection.
Args:
broker: PB broker instance
Returns:
Referenceable: Root object for this connection
"""
return self
class pb.Avatar:
"""
Authenticated user representation in PB system.
Created by realm after successful authentication.
"""
def __init__(self):
"""Initialize avatar."""
def logout(self):
"""Called when user logs out."""
class pb.Perspective(pb.Avatar):
"""
User's view of the PB system after authentication.
Perspectives represent what an authenticated user can see and do.
"""
def __init__(self, avatarId):
"""
Args:
avatarId: Unique identifier for this user
"""
self.avatarId = avatarId
def perspective_methodName(self, *args, **kwargs):
"""
Perspective method callable by authenticated clients.
Methods prefixed with 'perspective_' are exposed to authenticated users.
"""
pass
class pb.Broker:
"""
Message broker managing PB protocol communication.
Handles serialization, transport, and method dispatch for PB.
"""
def __init__(self, isClient=True, security=None):
"""
Args:
isClient (bool): Whether this is a client broker
security: Security policy object
"""
def registerReference(self, object):
"""
Register object for remote references.
Args:
object: Object to register
Returns:
int: Reference ID
"""
def unregisterReference(self, refnum):
"""
Unregister object reference.
Args:
refnum (int): Reference ID to unregister
"""Basic PB Usage Example:
from twisted.spread import pb
from twisted.internet import reactor, endpoints, defer
# Server-side objects
class MathService(pb.Referenceable):
def remote_add(self, a, b):
return a + b
def remote_multiply(self, a, b):
return a * b
def remote_divide(self, a, b):
if b == 0:
raise ValueError("Division by zero")
return a / b
class Calculator(pb.Root):
def rootObject(self, broker):
return MathService()
# Start PB server
factory = pb.PBServerFactory(Calculator())
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8800)
endpoint.listen(factory)
# Client code
@defer.inlineCallbacks
def client_example():
factory = pb.PBClientFactory()
endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8800)
yield endpoint.connect(factory)
root = yield factory.getRootObject()
# Call remote methods
result1 = yield root.callRemote("add", 5, 3)
print(f"5 + 3 = {result1}")
result2 = yield root.callRemote("multiply", 4, 7)
print(f"4 * 7 = {result2}")
try:
result3 = yield root.callRemote("divide", 10, 0)
except Exception as e:
print(f"Division error: {e}")
print("PB server running on port 8800")
reactor.run()Factory classes for creating PB client and server connections.
class pb.PBClientFactory:
"""
Factory for PB client connections.
"""
def __init__(self):
"""Initialize client factory."""
self.deferred = defer.Deferred()
def getRootObject(self):
"""
Get root object from server.
Returns:
Deferred[RemoteReference]: Server's root object
"""
def login(self, credentials, client=None):
"""
Login with credentials.
Args:
credentials: User credentials
client: Client object for authenticated session
Returns:
Deferred[Avatar]: User avatar after authentication
"""
def buildProtocol(self, addr):
"""
Build client protocol.
Args:
addr: Connection address
Returns:
Broker: Client broker
"""
class pb.PBServerFactory:
"""
Factory for PB server connections.
"""
def __init__(self, root, security=None, portal=None):
"""
Args:
root: Root object or realm
security: Security policy
portal: Authentication portal
"""
self.root = root
self.security = security
self.portal = portal
def buildProtocol(self, addr):
"""
Build server protocol.
Args:
addr: Connection address
Returns:
Broker: Server broker
"""
def pb.connect(host, port, factory, contextFactory=None, bindAddress=None):
"""
Connect to PB server.
Args:
host (str): Server hostname
port (int): Server port
factory: Client factory
contextFactory: SSL context factory
bindAddress: Local bind address
Returns:
Deferred: Connection result
"""
def pb.getObjectAt(host, port, timeout=30):
"""
Get root object from PB server.
Args:
host (str): Server hostname
port (int): Server port
timeout (int): Connection timeout
Returns:
Deferred[RemoteReference]: Root object
"""Proxy objects representing remote objects.
class pb.RemoteReference:
"""
Proxy for remote object.
All method calls are transparently forwarded to the remote object.
"""
def callRemote(self, method, *args, **kwargs):
"""
Call method on remote object.
Args:
method (str): Method name
*args, **kwargs: Method arguments
Returns:
Deferred: Method result
"""
def remoteMethod(self, method):
"""
Get callable for remote method.
Args:
method (str): Method name
Returns:
callable: Function that calls remote method
"""
def notifyOnDisconnect(self, callback):
"""
Register callback for disconnection.
Args:
callback: Function to call on disconnect
"""
class pb.ViewPoint:
"""
Client's view of a remote perspective.
"""
def __init__(self, perspective, broker):
"""
Args:
perspective: Remote perspective object
broker: PB broker
"""Integration with Twisted's authentication system.
class pb.IPerspective:
"""
Interface for PB perspectives.
"""
def perspectiveMessageReceived(broker, message, args, kw):
"""
Handle message from client.
Args:
broker: PB broker
message (str): Message name
args (tuple): Message arguments
kw (dict): Message keyword arguments
Returns:
Result of message handling
"""
class pb.Avatar(pb.IPerspective):
"""
Authenticated user avatar for PB.
"""
def __init__(self, avatarId):
self.avatarId = avatarId
def logout(self):
"""Called when user logs out."""
def perspectiveMessageReceived(self, broker, message, args, kw):
"""Handle perspective method calls."""
method = getattr(self, f'perspective_{message}', None)
if method:
return method(*args, **kw)
raise AttributeError(f"No such method: perspective_{message}")
class pb.Realm:
"""
PB realm for creating avatars.
"""
def requestAvatar(self, avatarId, mind, *interfaces):
"""
Create avatar for authenticated user.
Args:
avatarId: User identifier
mind: Client perspective
*interfaces: Requested interfaces
Returns:
tuple: (interface, avatar, logout_callable)
"""
if pb.IPerspective in interfaces:
avatar = pb.Avatar(avatarId)
return (pb.IPerspective, avatar, avatar.logout)
raise NotImplementedError()Authenticated PB Example:
from twisted.spread import pb
from twisted.cred import portal, checkers, credentials
from twisted.internet import reactor, endpoints, defer
# Server with authentication
class UserAvatar(pb.Avatar):
def __init__(self, username):
self.username = username
def perspective_getMessage(self):
return f"Hello, {self.username}!"
def perspective_getTime(self):
import time
return time.ctime()
class SimpleRealm:
def requestAvatar(self, avatarId, mind, *interfaces):
if pb.IPerspective in interfaces:
avatar = UserAvatar(avatarId.decode())
return (pb.IPerspective, avatar, avatar.logout)
raise NotImplementedError()
# Set up authentication
realm = SimpleRealm()
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
checker.addUser(b"alice", b"password")
checker.addUser(b"bob", b"secret")
portal_obj = portal.Portal(realm, [checker])
# Start authenticated server
factory = pb.PBServerFactory(portal_obj)
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8800)
endpoint.listen(factory)
# Authenticated client
@defer.inlineCallbacks
def authenticated_client():
factory = pb.PBClientFactory()
endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8800)
yield endpoint.connect(factory)
# Login with credentials
creds = credentials.UsernamePassword(b"alice", b"password")
avatar = yield factory.login(creds)
# Call perspective methods
message = yield avatar.callRemote("getMessage")
print(f"Message: {message}")
time_str = yield avatar.callRemote("getTime")
print(f"Server time: {time_str}")
authenticated_client()
reactor.run()Objects that can be copied across the network.
class pb.Copyable:
"""
Base class for objects that can be copied to remote side.
The object is serialized and recreated on the remote side.
"""
def __init__(self):
"""Initialize copyable object."""
def getStateToCopy(self):
"""
Get state to copy to remote side.
Returns:
dict: Object state to copy
"""
return self.__dict__.copy()
class pb.RemoteCopy:
"""
Base class for receiving copied objects.
Paired with Copyable objects on the sending side.
"""
def setCopyableState(self, state):
"""
Restore object state from copy.
Args:
state (dict): Object state from remote side
"""
self.__dict__.update(state)
def pb.setUnjellyableForClass(classname, unjellyable):
"""
Register class for unjellying (deserializing).
Args:
classname (str): Class name
unjellyable: Class to create when unjellying
"""
class pb.Cacheable:
"""
Base class for objects that are cached on remote side.
Only one copy exists on remote side, updated when changed.
"""
def getStateToCacheAndObserveFor(self, perspective, observer):
"""
Get state to cache and set up observation.
Args:
perspective: Remote perspective
observer: Observer for changes
Returns:
dict: State to cache
"""
class pb.RemoteCache:
"""
Base class for cached remote objects.
"""
def setCacheableState(self, state):
"""
Set cached object state.
Args:
state (dict): Object state
"""
def observe_update(self, newState):
"""
Handle state update from remote side.
Args:
newState (dict): Updated state
"""PB-specific exceptions and error handling.
class pb.Error(Exception):
"""Base class for PB errors."""
class pb.DeadReferenceError(pb.Error):
"""Reference to dead remote object."""
class pb.PBConnectionLost(pb.Error):
"""PB connection was lost."""
class pb.RemoteError(pb.Error):
"""Error from remote side."""
class pb.CopyableFailure:
"""
Serializable failure that can cross network boundaries.
"""
def __init__(self, failure):
"""
Args:
failure: Failure object to wrap
"""
self.type = qual(failure.type)
self.value = failure.getErrorMessage()
self.traceback = failure.getTraceback()Error Handling Example:
from twisted.spread import pb
from twisted.internet import defer
class ErrorService(pb.Referenceable):
def remote_causeError(self):
raise ValueError("Something went wrong!")
def remote_safeDivide(self, a, b):
try:
return a / b
except ZeroDivisionError:
# PB will serialize this exception
raise ValueError("Cannot divide by zero")
# Client error handling
@defer.inlineCallbacks
def handle_errors():
root = yield factory.getRootObject()
try:
yield root.callRemote("causeError")
except Exception as e:
print(f"Remote error: {e}")
try:
result = yield root.callRemote("safeDivide", 10, 0)
except ValueError as e:
print(f"Division error: {e}")Low-level serialization system used by PB.
def jelly.jelly(object):
"""
Serialize object to jelly format.
Args:
object: Python object to serialize
Returns:
Serialized representation
"""
def jelly.unjelly(jellyData):
"""
Deserialize object from jelly format.
Args:
jellyData: Serialized data
Returns:
Python object
"""
class jelly.Jellyable:
"""
Base class for objects that can control their serialization.
"""
def jellyFor(self, jellier):
"""
Custom serialization method.
Args:
jellier: Jellier instance
Returns:
Serialized representation
"""
class jelly.Unjellyable:
"""
Base class for objects that can control their deserialization.
"""
def unjellyFor(self, unjellier, jellyList):
"""
Custom deserialization method.
Args:
unjellier: Unjellier instance
jellyList: Serialized data
Returns:
Deserialized object
"""Complete PB Application Example:
from twisted.spread import pb
from twisted.internet import reactor, endpoints, defer
from twisted.cred import portal, checkers, credentials
from twisted.application import service
# Shared data objects
class SharedData(pb.Copyable, pb.RemoteCopy):
"""Data that can be copied across network."""
def __init__(self, name, value):
self.name = name
self.value = value
# Register for deserialization
pb.setUnjellyableForClass(SharedData, SharedData)
# Service implementation
class DataService(pb.Avatar):
def __init__(self, username):
self.username = username
self.data = {}
def perspective_store(self, key, data):
"""Store data object."""
self.data[key] = data
return f"Stored {key} for {self.username}"
def perspective_retrieve(self, key):
"""Retrieve data object."""
if key in self.data:
return self.data[key]
raise KeyError(f"No data for key: {key}")
def perspective_list_keys(self):
"""List all stored keys."""
return list(self.data.keys())
# Realm
class DataRealm:
def requestAvatar(self, avatarId, mind, *interfaces):
if pb.IPerspective in interfaces:
avatar = DataService(avatarId.decode())
return (pb.IPerspective, avatar, lambda: None)
raise NotImplementedError()
# Application setup
def setup_server():
realm = DataRealm()
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
checker.addUser(b"user1", b"pass1")
checker.addUser(b"user2", b"pass2")
portal_obj = portal.Portal(realm, [checker])
factory = pb.PBServerFactory(portal_obj)
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8800)
endpoint.listen(factory)
print("PB data service running on port 8800")
# Client example
@defer.inlineCallbacks
def client_session():
factory = pb.PBClientFactory()
endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8800)
yield endpoint.connect(factory)
# Login
creds = credentials.UsernamePassword(b"user1", b"pass1")
avatar = yield factory.login(creds)
# Store data
data = SharedData("test_data", {"count": 42, "active": True})
result = yield avatar.callRemote("store", "item1", data)
print(result)
# Retrieve data
retrieved = yield avatar.callRemote("retrieve", "item1")
print(f"Retrieved: {retrieved.name} = {retrieved.value}")
# List keys
keys = yield avatar.callRemote("list_keys")
print(f"Keys: {keys}")
setup_server()
client_session()
reactor.run()Install with Tessl CLI
npx tessl i tessl/pypi-twisted