Remote Python Call (RPyC) is a transparent and symmetric distributed computing library
—
Additional utilities including asynchronous operations, timeouts, background threads, decorators, and low-level helpers for advanced RPyC usage. These utilities extend RPyC's core functionality with convenience features and advanced patterns.
Tools for non-blocking remote operations and asynchronous programming patterns.
def async_(proxy):
"""
Convert synchronous proxy calls to asynchronous operations.
Parameters:
- proxy: RPyC proxy object (netref)
Returns:
_Async: Async wrapper for proxy
Usage:
async_func = async_(conn.root.slow_function)
result = async_func(args) # Returns AsyncResult
"""
class _Async:
"""
Asynchronous wrapper for proxy objects.
"""
def __call__(self, *args, **kwargs):
"""
Call wrapped function asynchronously.
Returns:
AsyncResult: Async result container
"""Decorators and utilities for adding timeouts and timing to operations.
def timed(timeout, func):
"""
Add timeout to function calls.
Parameters:
- timeout (float): Timeout in seconds
- func (callable): Function to wrap
Returns:
callable: Function with timeout applied
"""
class timed:
"""
Timeout decorator class for adding timeouts to operations.
"""
def __init__(self, timeout):
"""
Initialize timeout decorator.
Parameters:
- timeout (float): Timeout in seconds
"""
def __call__(self, func):
"""Apply timeout to function"""Utilities for running RPyC services in background threads.
class BgServingThread:
"""
Background thread for serving RPyC connections.
Allows main thread to continue while serving requests.
"""
def __init__(self, conn, callback=None):
"""
Initialize background serving thread.
Parameters:
- conn: RPyC connection to serve
- callback (callable): Optional callback for exceptions
"""
def start(self):
"""Start the background serving thread"""
def stop(self):
"""Stop the background serving thread"""
@property
def active(self) -> bool:
"""True if thread is active"""Utilities for working with remote iterators and collections.
def buffiter(obj, chunk=10, max_chunk=1000, factor=2):
"""
Buffered iterator for remote sequences.
Reduces network round-trips by fetching chunks.
Parameters:
- obj: Remote iterable object
- chunk (int): Initial chunk size
- max_chunk (int): Maximum chunk size
- factor (int): Chunk size growth factor
Yields:
Items from remote iterable in buffered chunks
"""Utilities for restricting and controlling access to objects.
def restricted(obj, attrs, wattrs=None):
"""
Create restricted proxy with limited attribute access.
Parameters:
- obj: Object to restrict
- attrs (set): Allowed read attributes
- wattrs (set): Allowed write attributes (optional)
Returns:
Restricted proxy object
"""Decorators for defining and configuring RPyC services.
def exposed(func):
"""
Decorator marking methods as exposed to remote calls.
Parameters:
- func (callable): Function to expose
Returns:
callable: Exposed function
Usage:
@exposed
def my_method(self):
return "Available remotely"
"""
def service(cls):
"""
Class decorator for processing RPyC service classes.
Automatically renames exposed methods with proper prefix.
Parameters:
- cls: Service class to process
Returns:
class: Processed service class
Usage:
@service
class MyService(rpyc.Service):
@exposed
def my_method(self):
return "Hello"
"""Low-level helper functions for advanced RPyC usage and debugging.
def setup_logger(quiet=False, logfile=None, namespace=None):
"""
Setup RPyC logging configuration.
Parameters:
- quiet (bool): Reduce log verbosity if True
- logfile (str): Log to file if specified
- namespace (str): Logger namespace
Returns:
Logger: Configured logger instance
"""
def safe_import(name):
"""
Safely import module with fallback.
Parameters:
- name (str): Module name to import
Returns:
Module or MissingModule: Imported module or placeholder
"""
def spawn(*args, **kwargs):
"""
Start daemon thread with function and arguments.
Parameters:
- args[0] (callable): Function to run in thread
- args[1:]: Arguments for function
- kwargs: Keyword arguments for function
Returns:
Thread: Started daemon thread
"""
def get_id_pack(obj):
"""
Get ID pack for object (used internally for netrefs).
Parameters:
- obj: Object to get ID pack for
Returns:
tuple: Object identification tuple
"""
def get_methods(obj_attrs, obj):
"""
Get all methods of an object.
Parameters:
- obj_attrs (dict): Object attributes
- obj: Object to introspect
Returns:
list: List of (method_name, docstring) tuples
"""Advanced threading utilities for RPyC applications.
def spawn_waitready(init, main):
"""
Start thread with initialization phase.
Parameters:
- init (callable): Initialization function
- main (callable): Main function to run after init
Returns:
tuple: (thread, init_result)
"""
class hybridmethod:
"""
Decorator for methods that work as both instance and class methods.
"""
def __init__(self, func):
"""
Initialize hybrid method decorator.
Parameters:
- func (callable): Function to wrap
"""Utilities for cross-platform and version compatibility.
def hasattr_static(obj, attr):
"""
Check if object has attribute using static introspection.
Parameters:
- obj: Object to check
- attr (str): Attribute name
Returns:
bool: True if object has attribute
"""
class MissingModule:
"""
Placeholder for missing/unavailable modules.
"""
def __init__(self, name):
"""
Initialize missing module placeholder.
Parameters:
- name (str): Module name
"""
def __getattr__(self, name):
"""Raise ImportError for any attribute access"""
def __bool__(self):
"""Always returns False"""import rpyc
from rpyc.utils.helpers import async_
# Connect to server
conn = rpyc.connect('localhost', 12345)
# Make async calls
async_func = async_(conn.root.slow_computation)
result1 = async_func(data1)
result2 = async_func(data2)
# Do other work while computations run
print("Computations started...")
# Wait for results
print("Result 1:", result1.value)
print("Result 2:", result2.value)
conn.close()import rpyc
from rpyc.utils.helpers import BgServingThread
import time
# Connect to server
conn = rpyc.connect('localhost', 12345)
# Start background serving
bg_thread = BgServingThread(conn)
bg_thread.start()
try:
# Main thread can do other work
# while background thread serves requests
for i in range(10):
result = conn.root.get_data(i)
print(f"Got result: {result}")
time.sleep(1)
finally:
# Clean up
bg_thread.stop()
conn.close()import rpyc
from rpyc.utils.helpers import buffiter
conn = rpyc.connect('localhost', 12345)
# Get remote large list
remote_list = conn.root.get_large_dataset()
# Iterate efficiently with buffering
for item in buffiter(remote_list, chunk=100):
process(item)
conn.close()import rpyc
from rpyc.utils.helpers import restricted
conn = rpyc.connect('localhost', 12345)
# Get remote object
remote_obj = conn.root.get_object()
# Create restricted version
safe_obj = restricted(
remote_obj,
attrs={'read_data', 'get_info'}, # Only these methods allowed
wattrs={'status'} # Only this attribute writable
)
# safe_obj only allows specified operations
data = safe_obj.read_data() # OK
info = safe_obj.get_info() # OK
safe_obj.status = 'active' # OK
# safe_obj.delete_all() # Would raise AttributeError
conn.close()import rpyc
from rpyc.utils import exposed, service
@service
class CalculatorService(rpyc.Service):
def __init__(self):
self.history = []
@exposed
def add(self, a, b):
result = a + b
self.history.append(f"{a} + {b} = {result}")
return result
@exposed
def get_history(self):
return self.history.copy()
def _internal_method(self):
# Not exposed - private method
pass
# Service automatically processes exposed methods
from rpyc.utils.server import ThreadedServer
server = ThreadedServer(CalculatorService, port=12345)
server.start()import rpyc
from rpyc.utils.helpers import timed
conn = rpyc.connect('localhost', 12345)
# Add timeout to remote function
timed_func = timed(5.0, conn.root.slow_function)
try:
result = timed_func(args)
print("Completed within timeout:", result)
except Exception as e:
print("Function timed out or failed:", e)
conn.close()import rpyc
from rpyc.lib import setup_logger
# Setup detailed logging
logger = setup_logger(quiet=False, logfile='rpyc.log')
# RPyC operations will now be logged
conn = rpyc.connect('localhost', 12345)
logger.info("Connected to server")
result = conn.root.some_function()
logger.info(f"Function result: {result}")
conn.close()
logger.info("Connection closed")from rpyc.lib import safe_import
# Safely import optional modules
ssl = safe_import('ssl')
if ssl:
# SSL is available
print("SSL support available")
else:
# SSL not available, ssl is MissingModule
print("SSL not available")
# Using the imported module
try:
context = ssl.create_default_context()
except ImportError:
print("SSL operations not supported")SPAWN_THREAD_PREFIX = 'RpycSpawnThread' # Prefix for spawned thread namesInstall with Tessl CLI
npx tessl i tessl/pypi-rpyc