Remote Python Call (RPyC) is a transparent and symmetric distributed computing library
—
Service registry and discovery system for automatic service location and management. The registry enables services to advertise their availability and clients to automatically discover and connect to services by name, supporting both UDP and TCP protocols.
Server implementations for hosting service registries that track available RPyC services.
class UDPRegistryServer:
"""
UDP-based registry server for service discovery.
Lightweight, broadcast-enabled registry suitable for LAN environments.
"""
def __init__(self, host='0.0.0.0', port=18811, pruning_timeout=3, allow_listing=True):
"""
Initialize UDP registry server.
Parameters:
- host (str): Host address to bind to
- port (int): UDP port to bind to (default 18811)
- pruning_timeout (float): Timeout for pruning stale services
- allow_listing (bool): Allow service listing requests
"""
def start(self):
"""Start the registry server"""
def close(self):
"""Close the registry server"""
class TCPRegistryServer:
"""
TCP-based registry server for service discovery.
More reliable than UDP, suitable for WAN environments.
"""
def __init__(self, port=18811, pruning_timeout=3, allow_listing=True):
"""
Initialize TCP registry server.
Parameters:
- port (int): TCP port to bind to (default 18811)
- pruning_timeout (float): Timeout for pruning stale services
- allow_listing (bool): Allow service listing requests
"""
def start(self):
"""Start the registry server"""
def close(self):
"""Close the registry server"""Client implementations for registering services and querying service registries.
class UDPRegistryClient:
"""
UDP registry client for service registration and discovery.
"""
def __init__(self, ip='255.255.255.255', port=18811):
"""
Initialize UDP registry client.
Parameters:
- ip (str): Registry server IP (default broadcast)
- port (int): Registry server port
"""
def register(self, alias, port, interface=''):
"""
Register service with registry.
Parameters:
- alias (str): Service name/alias
- port (int): Service port
- interface (str): Network interface (optional)
"""
def unregister(self, port):
"""
Unregister service from registry.
Parameters:
- port (int): Service port to unregister
"""
def discover(self, name):
"""
Discover services by name.
Parameters:
- name (str): Service name to discover
Returns:
list: List of (host, port) tuples
"""
def list_services(self):
"""
List all available services.
Returns:
dict: Dictionary mapping service names to (host, port) tuples
"""
class TCPRegistryClient:
"""
TCP registry client for service registration and discovery.
"""
def __init__(self, ip='127.0.0.1', port=18811):
"""
Initialize TCP registry client.
Parameters:
- ip (str): Registry server IP
- port (int): Registry server port
"""
def register(self, alias, port, interface=''):
"""
Register service with registry.
Parameters:
- alias (str): Service name/alias
- port (int): Service port
- interface (str): Network interface (optional)
"""
def unregister(self, port):
"""
Unregister service from registry.
Parameters:
- port (int): Service port to unregister
"""
def discover(self, name):
"""
Discover services by name.
Parameters:
- name (str): Service name to discover
Returns:
list: List of (host, port) tuples
"""
def list_services(self):
"""
List all available services.
Returns:
dict: Dictionary mapping service names to (host, port) tuples
"""Decorators and utilities for automatic service registration.
def register_service(service_class, port, registrar=None, auto_register=True):
"""
Register service class with registry for automatic discovery.
Parameters:
- service_class: Service class to register
- port (int): Port the service will run on
- registrar: Registry client (optional, uses default)
- auto_register (bool): Automatically register on server start
Returns:
Configured server with registry integration
"""from rpyc.utils.registry import UDPRegistryServer
import threading
# Start UDP registry server
registry = UDPRegistryServer(host='0.0.0.0', port=18811)
registry_thread = threading.Thread(target=registry.start)
registry_thread.start()
print("UDP Registry running on port 18811")import rpyc
from rpyc.utils.server import ThreadedServer
from rpyc.utils.registry import UDPRegistryClient
class CalculatorService(rpyc.Service):
SERVICE_NAME = "CALCULATOR"
@rpyc.exposed
def add(self, a, b):
return a + b
@rpyc.exposed
def multiply(self, a, b):
return a * b
# Create registry client
registry = UDPRegistryClient()
# Create server with registry integration
server = ThreadedServer(
CalculatorService,
port=12345,
registrar=registry,
auto_register=True
)
print("Calculator service starting with registry integration")
server.start()import rpyc
from rpyc.utils.registry import UDPRegistryClient
# Create registry client
registry = UDPRegistryClient()
# Discover available services
services = registry.list_services()
print("Available services:", services)
# Discover specific service
calculator_endpoints = registry.discover("CALCULATOR")
if calculator_endpoints:
host, port = calculator_endpoints[0]
print(f"Found CALCULATOR service at {host}:{port}")
# Connect to discovered service
conn = rpyc.connect(host, port)
result = conn.root.add(5, 3)
print(f"5 + 3 = {result}")
conn.close()
else:
print("CALCULATOR service not found")from rpyc.utils.registry import TCPRegistryServer, TCPRegistryClient
import rpyc
from rpyc.utils.server import ThreadedServer
import threading
# Start TCP registry server
def run_registry():
registry = TCPRegistryServer(port=18811)
registry.start()
registry_thread = threading.Thread(target=run_registry)
registry_thread.daemon = True
registry_thread.start()
# Service definitions
class DataService(rpyc.Service):
SERVICE_NAME = "DATA_PROCESSOR"
@rpyc.exposed
def process_data(self, data):
return [x * 2 for x in data]
class AuthService(rpyc.Service):
SERVICE_NAME = "AUTHENTICATOR"
@rpyc.exposed
def validate_token(self, token):
return token == "valid_token"
# Create registry client
registry_client = TCPRegistryClient()
# Start multiple services with registry
services = [
(DataService, 12001),
(AuthService, 12002)
]
servers = []
for service_class, port in services:
server = ThreadedServer(
service_class,
port=port,
registrar=registry_client,
auto_register=True
)
servers.append(server)
# Start server in background
server_thread = threading.Thread(target=server.start)
server_thread.daemon = True
server_thread.start()
print(f"{service_class.SERVICE_NAME} started on port {port}")
# Client discovery and usage
import time
time.sleep(1) # Wait for services to register
# List all services
all_services = registry_client.list_services()
print("All registered services:", all_services)
# Use discovered services
data_endpoints = registry_client.discover("DATA_PROCESSOR")
if data_endpoints:
host, port = data_endpoints[0]
conn = rpyc.connect(host, port)
result = conn.root.process_data([1, 2, 3, 4])
print("Processed data:", result)
conn.close()
auth_endpoints = registry_client.discover("AUTHENTICATOR")
if auth_endpoints:
host, port = auth_endpoints[0]
conn = rpyc.connect(host, port)
is_valid = conn.root.validate_token("valid_token")
print("Token validation:", is_valid)
conn.close()from rpyc.utils.registry import UDPRegistryClient
import rpyc
import time
import threading
class MonitoredService(rpyc.Service):
SERVICE_NAME = "MONITORED_SERVICE"
def __init__(self):
self.start_time = time.time()
self.request_count = 0
@rpyc.exposed
def get_status(self):
uptime = time.time() - self.start_time
return {
'uptime': uptime,
'requests_served': self.request_count,
'status': 'healthy'
}
@rpyc.exposed
def process_request(self, data):
self.request_count += 1
return f"Processed: {data}"
# Service with periodic re-registration (health check)
def maintain_registration(service_port, registry_client):
while True:
try:
# Re-register every 30 seconds to show service is healthy
registry_client.register("MONITORED_SERVICE", service_port)
time.sleep(30)
except Exception as e:
print(f"Registration failed: {e}")
time.sleep(5)
# Start service
registry = UDPRegistryClient()
server = ThreadedServer(MonitoredService, port=12345)
# Start registration maintenance
registration_thread = threading.Thread(
target=maintain_registration,
args=(12345, registry)
)
registration_thread.daemon = True
registration_thread.start()
# Start server
server_thread = threading.Thread(target=server.start)
server_thread.daemon = True
server_thread.start()
print("Monitored service with health check registration started")
# Monitor service health
while True:
try:
services = registry.discover("MONITORED_SERVICE")
if services:
host, port = services[0]
conn = rpyc.connect(host, port)
status = conn.root.get_status()
print(f"Service health: {status}")
conn.close()
else:
print("Service not available")
time.sleep(10)
except KeyboardInterrupt:
break
except Exception as e:
print(f"Health check failed: {e}")
time.sleep(5)REGISTRY_PORT = 18811 # Default registry port
DEFAULT_PRUNING_TIMEOUT = 3 # Default service timeout (seconds)
UDP_BROADCAST_ADDRESS = '255.255.255.255' # Default UDP broadcast addressclass RegistryError(Exception):
"""Base exception for registry operations"""
class ServiceNotFoundError(RegistryError):
"""Raised when requested service is not found"""
class RegistrationError(RegistryError):
"""Raised when service registration fails"""Install with Tessl CLI
npx tessl i tessl/pypi-rpyc