Remote Python Call (RPyC) is a transparent and symmetric distributed computing library
—
Multi-threaded, forking, and specialized server implementations for hosting RPyC services. These servers provide different concurrency models and deployment options for various use cases, from development to high-performance production environments.
Foundation server class providing common functionality for all server implementations.
class Server:
"""
Base class for RPyC servers providing common server functionality.
"""
def __init__(self, service, hostname='0.0.0.0', port=0, backlog=10,
reuse_addr=True, authenticator=None, registrar=None, auto_register=None):
"""
Initialize server.
Parameters:
- service: Service class or instance to serve
- hostname (str): Host address to bind to
- port (int): Port to bind to (0 for automatic)
- backlog (int): Socket listen backlog
- reuse_addr (bool): Enable SO_REUSEADDR
- authenticator: Authentication handler
- registrar: Service registry client
- auto_register (bool): Auto-register with registry
"""
@property
def closed(self) -> bool:
"""True if server is closed"""
def start(self):
"""Start the server (blocks indefinitely)"""
def close(self):
"""Close the server and cleanup resources"""
def accept(self):
"""Accept a single client connection"""
def _authenticate_and_serve_client(self, sock):
"""Authenticate and serve single client"""Multi-threaded server that creates a new thread for each client connection.
class ThreadedServer(Server):
"""
Multi-threaded server creating new thread per client connection.
Good for moderate concurrency with I/O-bound services.
"""
def __init__(self, service, hostname='0.0.0.0', port=0, backlog=10,
reuse_addr=True, authenticator=None, registrar=None, auto_register=None):
"""
Initialize threaded server.
Parameters: Same as Server base class
"""
def _serve_client(self, sock, credentials):
"""Serve client in dedicated thread"""Thread pool server that uses a fixed pool of worker threads to serve clients.
class ThreadPoolServer(Server):
"""
Thread pool server using fixed number of worker threads.
Better resource management than ThreadedServer for high concurrency.
"""
def __init__(self, service, hostname='0.0.0.0', port=0, backlog=10,
reuse_addr=True, authenticator=None, registrar=None, auto_register=None,
nbthreads=10):
"""
Initialize thread pool server.
Parameters:
- nbthreads (int): Number of threads in pool
- Other parameters: Same as Server base class
"""
def _get_next_thread(self):
"""Get next available thread from pool"""Process-forking server that creates new process for each client connection (Unix only).
class ForkingServer(Server):
"""
Forking server creating new process per client connection.
Provides process isolation but higher resource overhead (Unix only).
"""
def __init__(self, service, hostname='0.0.0.0', port=0, backlog=10,
reuse_addr=True, authenticator=None, registrar=None, auto_register=None):
"""
Initialize forking server.
Parameters: Same as Server base class
Note: Only available on Unix systems
"""
def _serve_client(self, sock, credentials):
"""Serve client in dedicated process"""Single-connection server that handles one client and then terminates.
class OneShotServer(Server):
"""
Single-connection server for one client only.
Useful for testing or simple point-to-point communication.
"""
def __init__(self, service, hostname='0.0.0.0', port=0, backlog=10,
reuse_addr=True, authenticator=None, registrar=None, auto_register=None):
"""
Initialize one-shot server.
Parameters: Same as Server base class
"""
def start(self):
"""Start server and handle single client connection"""Gevent-based server using cooperative multitasking for high concurrency (requires gevent).
class GeventServer(Server):
"""
Gevent-based server using cooperative multitasking.
Provides high concurrency with low resource overhead (requires gevent).
"""
def __init__(self, service, hostname='0.0.0.0', port=0, backlog=10,
reuse_addr=True, authenticator=None, registrar=None, auto_register=None):
"""
Initialize gevent server.
Parameters: Same as Server base class
Note: Requires gevent to be installed
"""import rpyc
from rpyc.utils.server import ThreadedServer
class MyService(rpyc.Service):
@rpyc.exposed
def get_data(self, key):
return f"Data for {key}"
# Create and start threaded server
server = ThreadedServer(MyService, port=12345)
print(f"Server listening on port {server.port}")
server.start() # Blocks indefinitelyimport rpyc
from rpyc.utils.server import ThreadPoolServer
class ComputeService(rpyc.Service):
@rpyc.exposed
def heavy_computation(self, data):
# Simulate CPU-intensive work
import time
time.sleep(1)
return sum(data)
# Thread pool server with 20 worker threads
server = ThreadPoolServer(
ComputeService,
hostname='0.0.0.0',
port=12345,
nbthreads=20
)
print(f"Compute server with 20 threads on port {server.port}")
server.start()import rpyc
from rpyc.utils.server import ForkingServer
from rpyc.utils.authenticators import SSLAuthenticator
class SecureService(rpyc.Service):
@rpyc.exposed
def sensitive_operation(self):
return "Secret data"
# SSL authenticator
auth = SSLAuthenticator('server.key', 'server.crt')
# Forking server with SSL auth
server = ForkingServer(
SecureService,
port=12345,
authenticator=auth
)
print("Secure forking server started")
server.start()import rpyc
from rpyc.utils.server import OneShotServer
class TestService(rpyc.Service):
@rpyc.exposed
def test_function(self):
return "Test successful"
# One-shot server for single test connection
server = OneShotServer(TestService, port=0) # Auto-assign port
print(f"Test server on port {server.port}")
# In testing scenario, this would handle one connection and exit
server.start()import rpyc
from rpyc.utils.server import ThreadedServer
from rpyc.utils.registry import UDPRegistryClient
class CalculatorService(rpyc.Service):
SERVICE_NAME = "CALCULATOR"
@rpyc.exposed
def add(self, a, b):
return a + b
@rpyc.exposed
def multiply(self, a, b):
return a * b
# Connect to registry
registry = UDPRegistryClient()
# Server with auto-registration
server = ThreadedServer(
CalculatorService,
port=12345,
registrar=registry,
auto_register=True
)
print("Calculator service registered and running")
server.start()import rpyc
from rpyc.utils.server import GeventServer
class HighConcurrencyService(rpyc.Service):
@rpyc.exposed
def io_operation(self, data):
# Simulate I/O-bound work
import gevent
gevent.sleep(0.1)
return f"Processed: {data}"
# Gevent server handles thousands of concurrent connections
server = GeventServer(HighConcurrencyService, port=12345)
print("High concurrency gevent server started")
server.start()import rpyc
from rpyc.utils.server import ThreadedServer
class MyService(rpyc.Service):
@rpyc.exposed
def get_status(self):
return "Running"
# Use server as context manager
with ThreadedServer(MyService, port=12345) as server:
print(f"Server running on port {server.port}")
# Server automatically closes when leaving context
# Could run for specific time or until condition
import time
time.sleep(10) # Run for 10 secondsimport rpyc
from rpyc.utils.server import ThreadedServer
import logging
class LoggingServer(ThreadedServer):
"""Custom server with detailed logging"""
def _authenticate_and_serve_client(self, sock):
client_addr = sock.getpeername()
logging.info(f"Client connecting from {client_addr}")
try:
super()._authenticate_and_serve_client(sock)
logging.info(f"Client {client_addr} disconnected normally")
except Exception as e:
logging.error(f"Client {client_addr} error: {e}")
class MyService(rpyc.Service):
@rpyc.exposed
def hello(self):
return "Hello from logged server"
# Setup logging
logging.basicConfig(level=logging.INFO)
# Use custom server
server = LoggingServer(MyService, port=12345)
server.start()import rpyc
from rpyc.utils.server import ThreadedServer
class ConfiguredService(rpyc.Service):
@rpyc.exposed
def get_config(self):
return "Service running with custom config"
# Server with various configuration options
server = ThreadedServer(
ConfiguredService,
hostname='127.0.0.1', # Bind to localhost only
port=12345, # Specific port
backlog=50, # Large connection queue
reuse_addr=True # Allow address reuse
)
# Configure the service's protocol
server.protocol_config = {
'sync_request_timeout': 60, # 60 second request timeout
'allow_pickle': False, # Disable pickle for security
'allow_all_attrs': False # Restrict attribute access
}
server.start()Install with Tessl CLI
npx tessl i tessl/pypi-rpyc