CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rpyc

Remote Python Call (RPyC) is a transparent and symmetric distributed computing library

Pending
Overview
Eval results
Files

servers.mddocs/

Server Implementations

Multi-threaded, forking, and specialized server implementations for hosting RPyC services. These servers provide different concurrency models and deployment options for various use cases, from development to high-performance production environments.

Capabilities

Base Server Class

Foundation server class providing common functionality for all server implementations.

class Server:
    """
    Base class for RPyC servers providing common server functionality.
    """
    
    def __init__(self, service, hostname='0.0.0.0', port=0, backlog=10,
                 reuse_addr=True, authenticator=None, registrar=None, auto_register=None):
        """
        Initialize server.
        
        Parameters:
        - service: Service class or instance to serve
        - hostname (str): Host address to bind to
        - port (int): Port to bind to (0 for automatic)
        - backlog (int): Socket listen backlog
        - reuse_addr (bool): Enable SO_REUSEADDR
        - authenticator: Authentication handler
        - registrar: Service registry client
        - auto_register (bool): Auto-register with registry
        """
    
    @property
    def closed(self) -> bool:
        """True if server is closed"""
    
    def start(self):
        """Start the server (blocks indefinitely)"""
    
    def close(self):
        """Close the server and cleanup resources"""
    
    def accept(self):
        """Accept a single client connection"""
    
    def _authenticate_and_serve_client(self, sock):
        """Authenticate and serve single client"""

Threaded Server

Multi-threaded server that creates a new thread for each client connection.

class ThreadedServer(Server):
    """
    Multi-threaded server creating new thread per client connection.
    Good for moderate concurrency with I/O-bound services.
    """
    
    def __init__(self, service, hostname='0.0.0.0', port=0, backlog=10,
                 reuse_addr=True, authenticator=None, registrar=None, auto_register=None):
        """
        Initialize threaded server.
        
        Parameters: Same as Server base class
        """
    
    def _serve_client(self, sock, credentials):
        """Serve client in dedicated thread"""

Thread Pool Server

Thread pool server that uses a fixed pool of worker threads to serve clients.

class ThreadPoolServer(Server):
    """
    Thread pool server using fixed number of worker threads.
    Better resource management than ThreadedServer for high concurrency.
    """
    
    def __init__(self, service, hostname='0.0.0.0', port=0, backlog=10,
                 reuse_addr=True, authenticator=None, registrar=None, auto_register=None,
                 nbthreads=10):
        """
        Initialize thread pool server.  
        
        Parameters:
        - nbthreads (int): Number of threads in pool
        - Other parameters: Same as Server base class
        """
    
    def _get_next_thread(self):
        """Get next available thread from pool"""

Forking Server

Process-forking server that creates new process for each client connection (Unix only).

class ForkingServer(Server):
    """
    Forking server creating new process per client connection.
    Provides process isolation but higher resource overhead (Unix only).
    """
    
    def __init__(self, service, hostname='0.0.0.0', port=0, backlog=10,
                 reuse_addr=True, authenticator=None, registrar=None, auto_register=None):
        """
        Initialize forking server.
        
        Parameters: Same as Server base class
        
        Note: Only available on Unix systems
        """
    
    def _serve_client(self, sock, credentials):
        """Serve client in dedicated process"""

One Shot Server

Single-connection server that handles one client and then terminates.

class OneShotServer(Server):
    """
    Single-connection server for one client only.
    Useful for testing or simple point-to-point communication.
    """
    
    def __init__(self, service, hostname='0.0.0.0', port=0, backlog=10,
                 reuse_addr=True, authenticator=None, registrar=None, auto_register=None):
        """
        Initialize one-shot server.
        
        Parameters: Same as Server base class
        """
    
    def start(self):
        """Start server and handle single client connection"""

Gevent Server

Gevent-based server using cooperative multitasking for high concurrency (requires gevent).

class GeventServer(Server):
    """
    Gevent-based server using cooperative multitasking.
    Provides high concurrency with low resource overhead (requires gevent).
    """
    
    def __init__(self, service, hostname='0.0.0.0', port=0, backlog=10,
                 reuse_addr=True, authenticator=None, registrar=None, auto_register=None):
        """
        Initialize gevent server.
        
        Parameters: Same as Server base class
        
        Note: Requires gevent to be installed
        """

Examples

Basic Threaded Server

import rpyc
from rpyc.utils.server import ThreadedServer

class MyService(rpyc.Service):
    @rpyc.exposed
    def get_data(self, key):
        return f"Data for {key}"

# Create and start threaded server
server = ThreadedServer(MyService, port=12345)
print(f"Server listening on port {server.port}")
server.start()  # Blocks indefinitely

Thread Pool Server with Configuration

import rpyc
from rpyc.utils.server import ThreadPoolServer

class ComputeService(rpyc.Service):
    @rpyc.exposed
    def heavy_computation(self, data):
        # Simulate CPU-intensive work
        import time
        time.sleep(1)
        return sum(data)

# Thread pool server with 20 worker threads
server = ThreadPoolServer(
    ComputeService,
    hostname='0.0.0.0',
    port=12345,
    nbthreads=20
)

print(f"Compute server with 20 threads on port {server.port}")
server.start()

Forking Server with Authentication

import rpyc
from rpyc.utils.server import ForkingServer
from rpyc.utils.authenticators import SSLAuthenticator

class SecureService(rpyc.Service):
    @rpyc.exposed
    def sensitive_operation(self):
        return "Secret data"

# SSL authenticator
auth = SSLAuthenticator('server.key', 'server.crt')

# Forking server with SSL auth
server = ForkingServer(
    SecureService,
    port=12345,
    authenticator=auth
)

print("Secure forking server started")
server.start()

One Shot Server for Testing

import rpyc
from rpyc.utils.server import OneShotServer

class TestService(rpyc.Service):
    @rpyc.exposed
    def test_function(self):
        return "Test successful"

# One-shot server for single test connection
server = OneShotServer(TestService, port=0)  # Auto-assign port
print(f"Test server on port {server.port}")

# In testing scenario, this would handle one connection and exit
server.start()

Server with Service Registry

import rpyc
from rpyc.utils.server import ThreadedServer
from rpyc.utils.registry import UDPRegistryClient

class CalculatorService(rpyc.Service):
    SERVICE_NAME = "CALCULATOR"
    
    @rpyc.exposed
    def add(self, a, b):
        return a + b
    
    @rpyc.exposed
    def multiply(self, a, b):
        return a * b

# Connect to registry
registry = UDPRegistryClient()

# Server with auto-registration
server = ThreadedServer(
    CalculatorService,
    port=12345,
    registrar=registry,
    auto_register=True
)

print("Calculator service registered and running")
server.start()

Gevent Server for High Concurrency

import rpyc
from rpyc.utils.server import GeventServer

class HighConcurrencyService(rpyc.Service):
    @rpyc.exposed
    def io_operation(self, data):
        # Simulate I/O-bound work
        import gevent
        gevent.sleep(0.1)
        return f"Processed: {data}"

# Gevent server handles thousands of concurrent connections
server = GeventServer(HighConcurrencyService, port=12345)
print("High concurrency gevent server started")
server.start()

Server Context Manager

import rpyc
from rpyc.utils.server import ThreadedServer

class MyService(rpyc.Service):
    @rpyc.exposed
    def get_status(self):
        return "Running"

# Use server as context manager
with ThreadedServer(MyService, port=12345) as server:
    print(f"Server running on port {server.port}")
    # Server automatically closes when leaving context
    
    # Could run for specific time or until condition
    import time
    time.sleep(10)  # Run for 10 seconds

Custom Server Subclass

import rpyc
from rpyc.utils.server import ThreadedServer
import logging

class LoggingServer(ThreadedServer):
    """Custom server with detailed logging"""
    
    def _authenticate_and_serve_client(self, sock):
        client_addr = sock.getpeername()
        logging.info(f"Client connecting from {client_addr}")
        
        try:
            super()._authenticate_and_serve_client(sock)
            logging.info(f"Client {client_addr} disconnected normally")
        except Exception as e:
            logging.error(f"Client {client_addr} error: {e}")

class MyService(rpyc.Service):
    @rpyc.exposed
    def hello(self):
        return "Hello from logged server"

# Setup logging
logging.basicConfig(level=logging.INFO)

# Use custom server
server = LoggingServer(MyService, port=12345)
server.start()

Server Configuration Options

import rpyc
from rpyc.utils.server import ThreadedServer

class ConfiguredService(rpyc.Service):
    @rpyc.exposed
    def get_config(self):
        return "Service running with custom config"

# Server with various configuration options
server = ThreadedServer(
    ConfiguredService,
    hostname='127.0.0.1',      # Bind to localhost only
    port=12345,                # Specific port
    backlog=50,                # Large connection queue
    reuse_addr=True           # Allow address reuse
)

# Configure the service's protocol
server.protocol_config = {
    'sync_request_timeout': 60,    # 60 second request timeout
    'allow_pickle': False,         # Disable pickle for security
    'allow_all_attrs': False      # Restrict attribute access
}

server.start()

Install with Tessl CLI

npx tessl i tessl/pypi-rpyc

docs

authentication-and-security.md

classic-mode.md

cli-tools.md

connection-factory.md

index.md

registry-and-discovery.md

servers.md

services-protocols.md

streams-and-channels.md

utilities.md

tile.json