Remote Python Call (RPyC) is a transparent and symmetric distributed computing library
—
Service classes for defining remote interfaces and protocol management for handling RPyC connections. Services define what methods and objects are exposed across connections, while the protocol layer handles the low-level communication and object proxying.
Core service classes that define the interface between local and remote objects.
class Service:
"""
Base class for RPyC services. Override methods to create custom services.
"""
def on_connect(self, conn):
"""Called when client connects"""
def on_disconnect(self, conn):
"""Called when client disconnects"""
def exposed_get_service_name(self):
"""Returns the service name"""
def exposed_get_service_aliases(self):
"""Returns list of service aliases"""
class VoidService(Service):
"""
Empty service that exposes no methods. Used as default service.
"""Pre-built service classes for common distributed computing patterns.
class SlaveService(Service):
"""
Service that provides access to the local namespace and modules.
Used in classic mode for remote Python execution.
"""
def exposed_execute(self, code):
"""Execute Python code remotely"""
def exposed_eval(self, expression):
"""Evaluate Python expression remotely"""
def exposed_getattr(self, obj, name):
"""Get attribute from remote object"""
def exposed_setattr(self, obj, name, value):
"""Set attribute on remote object"""
class MasterService(Service):
"""
Service that provides master-side functionality for distributed computing.
"""
class ClassicService(MasterService, SlaveService):
"""
Combined service providing both master and slave functionality.
Used for symmetric connections where both sides can execute code.
"""The Connection class manages the protocol-level communication and object proxying.
class Connection:
"""
Represents an RPyC connection, handling protocol communication and object proxying.
"""
def __init__(self, service, channel, config=None):
"""
Initialize connection.
Parameters:
- service (Service): Local service instance
- channel: Communication channel
- config (dict): Configuration dictionary
"""
@property
def root(self):
"""Access to remote service's root namespace"""
@property
def closed(self) -> bool:
"""True if connection is closed"""
def close(self):
"""Close the connection"""
def ping(self, data=None, timeout=3):
"""
Ping remote side to test connectivity.
Parameters:
- data: Optional data to send with ping
- timeout (float): Ping timeout in seconds
Returns:
data: Echo of sent data
"""
def serve(self, timeout=1):
"""
Serve incoming requests for specified time.
Parameters:
- timeout (float): Time to serve requests
"""
def serve_all(self):
"""Serve all pending requests"""
def serve_forever(self):
"""Serve requests indefinitely"""
def poll_all(self, timeout=0):
"""
Poll and handle all pending requests.
Parameters:
- timeout (float): Polling timeout
Returns:
bool: True if requests were handled
"""Network references provide transparent proxying of remote objects.
class BaseNetref:
"""
Base class for network references - transparent proxies to remote objects.
"""
def __getattr__(self, name):
"""Get attribute from remote object"""
def __setattr__(self, name, value):
"""Set attribute on remote object"""
def __call__(self, *args, **kwargs):
"""Call remote object if callable"""
def __str__(self):
"""String representation of remote object"""
def __repr__(self):
"""Detailed representation of remote object"""Support for non-blocking remote operations.
class AsyncResult:
"""
Container for asynchronous operation results.
"""
@property
def ready(self) -> bool:
"""True if result is ready"""
@property
def expired(self) -> bool:
"""True if result has expired"""
def wait(self, timeout=None):
"""
Wait for result to be ready.
Parameters:
- timeout (float): Maximum time to wait
Raises:
AsyncResultTimeout: If timeout exceeded
"""
@property
def value(self):
"""Get the result value (blocks if not ready)"""
class AsyncResultTimeout(Exception):
"""Raised when async operations timeout"""Default configuration options for RPyC connections.
DEFAULT_CONFIG = {
'allow_all_attrs': False,
'allow_pickle': False,
'allow_getattr': True,
'allow_setattr': False,
'allow_delattr': False,
'allow_public_attrs': True,
'exposed_prefix': 'exposed_',
'allow_safe_attrs': True,
'safe_attrs': frozenset(['__abs__', '__add__', '__and__', '__bool__', '__cmp__',
'__contains__', '__div__', '__divmod__', '__doc__', '__eq__',
'__float__', '__floordiv__', '__ge__', '__getitem__',
'__gt__', '__hash__', '__hex__', '__iadd__', '__iand__',
'__idiv__', '__ifloordiv__', '__ilshift__', '__imod__',
'__imul__', '__int__', '__invert__', '__ior__', '__ipow__',
'__irshift__', '__isub__', '__iter__', '__itruediv__',
'__ixor__', '__le__', '__len__', '__long__', '__lshift__',
'__lt__', '__mod__', '__mul__', '__ne__', '__neg__',
'__next__', '__oct__', '__or__', '__pos__', '__pow__',
'__radd__', '__rand__', '__rdiv__', '__rdivmod__',
'__repr__', '__rfloordiv__', '__rlshift__', '__rmod__',
'__rmul__', '__ror__', '__rpow__', '__rrshift__',
'__rshift__', '__rsub__', '__rtruediv__', '__rxor__',
'__setitem__', '__str__', '__sub__', '__truediv__',
'__xor__', 'next']),
'instantiate_custom_exceptions': False,
'instantiate_oldstyle_exceptions': False,
'propagate_SystemExit_locally': False,
'propagate_KeyboardInterrupt_locally': False,
'logger': None,
'connid': None,
'credentials': None,
'endpoints_timeout': 3,
'sync_request_timeout': 30,
}import rpyc
from rpyc import exposed
class CalculatorService(rpyc.Service):
"""Custom service providing calculator functionality"""
@exposed
def add(self, a, b):
return a + b
@exposed
def multiply(self, a, b):
return a * b
@exposed
def get_history(self):
# Could return calculation history
return []
def on_connect(self, conn):
print(f"Client connected: {conn}")
def on_disconnect(self, conn):
print(f"Client disconnected: {conn}")
# Use the service
from rpyc.utils.server import ThreadedServer
server = ThreadedServer(CalculatorService, port=12345)
server.start()import rpyc
# Custom configuration
config = {
'allow_pickle': True, # Allow pickle serialization
'sync_request_timeout': 60, # 60 second timeout
}
# Connect with custom config
conn = rpyc.connect('localhost', 12345, config=config)
# Use connection
result = conn.root.some_function()
conn.close()import rpyc
conn = rpyc.connect('localhost', 12345)
# Start async operation
async_result = rpyc.async_(conn.root.long_running_function)()
# Do other work while operation runs
print("Operation started, doing other work...")
# Wait for result
try:
result = async_result.wait(timeout=30)
print(f"Result: {async_result.value}")
except rpyc.AsyncResultTimeout:
print("Operation timed out")
conn.close()import rpyc
# Connect and manage lifecycle
conn = rpyc.connect('localhost', 12345)
try:
# Test connectivity
conn.ping()
# Use connection
result = conn.root.get_data()
# Serve any pending requests
conn.serve_all()
finally:
# Always close connection
conn.close()class GenericException(Exception):
"""Wrapper for remote exceptions"""
class PingError(Exception):
"""Raised when connection ping fails"""Install with Tessl CLI
npx tessl i tessl/pypi-rpyc