CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rpyc

Remote Python Call (RPyC) is a transparent and symmetric distributed computing library

Pending
Overview
Eval results
Files

utilities.mddocs/

Utilities and Helpers

Additional utilities including asynchronous operations, timeouts, background threads, decorators, and low-level helpers for advanced RPyC usage. These utilities extend RPyC's core functionality with convenience features and advanced patterns.

Capabilities

Asynchronous Utilities

Tools for non-blocking remote operations and asynchronous programming patterns.

def async_(proxy):
    """
    Convert synchronous proxy calls to asynchronous operations.
    
    Parameters:
    - proxy: RPyC proxy object (netref)
    
    Returns:
    _Async: Async wrapper for proxy
    
    Usage:
    async_func = async_(conn.root.slow_function)
    result = async_func(args)  # Returns AsyncResult
    """

class _Async:
    """
    Asynchronous wrapper for proxy objects.
    """
    
    def __call__(self, *args, **kwargs):
        """
        Call wrapped function asynchronously.
        
        Returns:
        AsyncResult: Async result container
        """

Timeout and Timing Utilities

Decorators and utilities for adding timeouts and timing to operations.

def timed(timeout, func):
    """
    Add timeout to function calls.
    
    Parameters:
    - timeout (float): Timeout in seconds
    - func (callable): Function to wrap
    
    Returns:
    callable: Function with timeout applied
    """

class timed:
    """
    Timeout decorator class for adding timeouts to operations.
    """
    
    def __init__(self, timeout):
        """
        Initialize timeout decorator.
        
        Parameters:
        - timeout (float): Timeout in seconds
        """
    
    def __call__(self, func):
        """Apply timeout to function"""

Background Service Threads

Utilities for running RPyC services in background threads.

class BgServingThread:
    """
    Background thread for serving RPyC connections.
    Allows main thread to continue while serving requests.
    """
    
    def __init__(self, conn, callback=None):
        """
        Initialize background serving thread.
        
        Parameters:
        - conn: RPyC connection to serve
        - callback (callable): Optional callback for exceptions
        """
    
    def start(self):
        """Start the background serving thread"""
    
    def stop(self):
        """Stop the background serving thread"""
    
    @property
    def active(self) -> bool:
        """True if thread is active"""

Iterator and Collection Utilities

Utilities for working with remote iterators and collections.

def buffiter(obj, chunk=10, max_chunk=1000, factor=2):
    """
    Buffered iterator for remote sequences.
    Reduces network round-trips by fetching chunks.
    
    Parameters:
    - obj: Remote iterable object
    - chunk (int): Initial chunk size  
    - max_chunk (int): Maximum chunk size
    - factor (int): Chunk size growth factor
    
    Yields:
    Items from remote iterable in buffered chunks
    """

Security and Access Control

Utilities for restricting and controlling access to objects.

def restricted(obj, attrs, wattrs=None):
    """
    Create restricted proxy with limited attribute access.
    
    Parameters:
    - obj: Object to restrict
    - attrs (set): Allowed read attributes
    - wattrs (set): Allowed write attributes (optional)
    
    Returns:
    Restricted proxy object
    """

Service Decorators

Decorators for defining and configuring RPyC services.

def exposed(func):
    """
    Decorator marking methods as exposed to remote calls.
    
    Parameters:
    - func (callable): Function to expose
    
    Returns:
    callable: Exposed function
    
    Usage:
    @exposed
    def my_method(self):
        return "Available remotely"
    """

def service(cls):
    """
    Class decorator for processing RPyC service classes.
    Automatically renames exposed methods with proper prefix.
    
    Parameters:
    - cls: Service class to process
    
    Returns:
    class: Processed service class
    
    Usage:
    @service
    class MyService(rpyc.Service):
        @exposed
        def my_method(self):
            return "Hello"
    """

Low-level Utilities

Low-level helper functions for advanced RPyC usage and debugging.

def setup_logger(quiet=False, logfile=None, namespace=None):
    """
    Setup RPyC logging configuration.
    
    Parameters:
    - quiet (bool): Reduce log verbosity if True
    - logfile (str): Log to file if specified
    - namespace (str): Logger namespace
    
    Returns:
    Logger: Configured logger instance
    """

def safe_import(name):
    """
    Safely import module with fallback.
    
    Parameters:
    - name (str): Module name to import
    
    Returns:
    Module or MissingModule: Imported module or placeholder
    """

def spawn(*args, **kwargs):
    """
    Start daemon thread with function and arguments.
    
    Parameters:
    - args[0] (callable): Function to run in thread
    - args[1:]: Arguments for function
    - kwargs: Keyword arguments for function
    
    Returns:
    Thread: Started daemon thread
    """

def get_id_pack(obj):
    """
    Get ID pack for object (used internally for netrefs).
    
    Parameters:
    - obj: Object to get ID pack for
    
    Returns:
    tuple: Object identification tuple
    """

def get_methods(obj_attrs, obj):
    """
    Get all methods of an object.
    
    Parameters:
    - obj_attrs (dict): Object attributes
    - obj: Object to introspect
    
    Returns:
    list: List of (method_name, docstring) tuples
    """

Threading Utilities

Advanced threading utilities for RPyC applications.

def spawn_waitready(init, main):
    """
    Start thread with initialization phase.
    
    Parameters:
    - init (callable): Initialization function
    - main (callable): Main function to run after init
    
    Returns:
    tuple: (thread, init_result)
    """

class hybridmethod:
    """
    Decorator for methods that work as both instance and class methods.
    """
    
    def __init__(self, func):
        """
        Initialize hybrid method decorator.
        
        Parameters:
        - func (callable): Function to wrap
        """

Compatibility and Platform Utilities

Utilities for cross-platform and version compatibility.

def hasattr_static(obj, attr):
    """
    Check if object has attribute using static introspection.
    
    Parameters:
    - obj: Object to check
    - attr (str): Attribute name
    
    Returns:
    bool: True if object has attribute
    """

class MissingModule:
    """
    Placeholder for missing/unavailable modules.
    """
    
    def __init__(self, name):
        """
        Initialize missing module placeholder.
        
        Parameters:
        - name (str): Module name
        """
    
    def __getattr__(self, name):
        """Raise ImportError for any attribute access"""
    
    def __bool__(self):
        """Always returns False"""

Examples

Asynchronous Operations

import rpyc
from rpyc.utils.helpers import async_

# Connect to server
conn = rpyc.connect('localhost', 12345)

# Make async calls
async_func = async_(conn.root.slow_computation)
result1 = async_func(data1)
result2 = async_func(data2)

# Do other work while computations run
print("Computations started...")

# Wait for results
print("Result 1:", result1.value)
print("Result 2:", result2.value)

conn.close()

Background Serving

import rpyc
from rpyc.utils.helpers import BgServingThread
import time

# Connect to server
conn = rpyc.connect('localhost', 12345)

# Start background serving
bg_thread = BgServingThread(conn)
bg_thread.start()

try:
    # Main thread can do other work
    # while background thread serves requests
    for i in range(10):
        result = conn.root.get_data(i)
        print(f"Got result: {result}")
        time.sleep(1)
        
finally:
    # Clean up
    bg_thread.stop()
    conn.close()

Buffered Iteration

import rpyc
from rpyc.utils.helpers import buffiter

conn = rpyc.connect('localhost', 12345)

# Get remote large list
remote_list = conn.root.get_large_dataset()

# Iterate efficiently with buffering
for item in buffiter(remote_list, chunk=100):
    process(item)

conn.close()

Restricted Access

import rpyc
from rpyc.utils.helpers import restricted

conn = rpyc.connect('localhost', 12345)

# Get remote object
remote_obj = conn.root.get_object()

# Create restricted version
safe_obj = restricted(
    remote_obj, 
    attrs={'read_data', 'get_info'},  # Only these methods allowed
    wattrs={'status'}                # Only this attribute writable
)

# safe_obj only allows specified operations
data = safe_obj.read_data()     # OK
info = safe_obj.get_info()      # OK
safe_obj.status = 'active'      # OK
# safe_obj.delete_all()         # Would raise AttributeError

conn.close()

Service Definition with Decorators

import rpyc
from rpyc.utils import exposed, service

@service
class CalculatorService(rpyc.Service):
    def __init__(self):
        self.history = []
    
    @exposed
    def add(self, a, b):
        result = a + b
        self.history.append(f"{a} + {b} = {result}")
        return result
    
    @exposed
    def get_history(self):
        return self.history.copy()
    
    def _internal_method(self):
        # Not exposed - private method
        pass

# Service automatically processes exposed methods
from rpyc.utils.server import ThreadedServer
server = ThreadedServer(CalculatorService, port=12345)
server.start()

Timeout Operations

import rpyc
from rpyc.utils.helpers import timed

conn = rpyc.connect('localhost', 12345)

# Add timeout to remote function
timed_func = timed(5.0, conn.root.slow_function)

try:
    result = timed_func(args)
    print("Completed within timeout:", result)
except Exception as e:
    print("Function timed out or failed:", e)

conn.close()

Custom Logging

import rpyc
from rpyc.lib import setup_logger

# Setup detailed logging
logger = setup_logger(quiet=False, logfile='rpyc.log')

# RPyC operations will now be logged
conn = rpyc.connect('localhost', 12345)
logger.info("Connected to server")

result = conn.root.some_function()
logger.info(f"Function result: {result}")

conn.close()
logger.info("Connection closed")

Safe Module Import

from rpyc.lib import safe_import

# Safely import optional modules
ssl = safe_import('ssl')
if ssl:
    # SSL is available
    print("SSL support available")
else:
    # SSL not available, ssl is MissingModule
    print("SSL not available")

# Using the imported module
try:
    context = ssl.create_default_context()
except ImportError:
    print("SSL operations not supported")

Constants

SPAWN_THREAD_PREFIX = 'RpycSpawnThread'  # Prefix for spawned thread names

Install with Tessl CLI

npx tessl i tessl/pypi-rpyc

docs

authentication-and-security.md

classic-mode.md

cli-tools.md

connection-factory.md

index.md

registry-and-discovery.md

servers.md

services-protocols.md

streams-and-channels.md

utilities.md

tile.json