CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rpyc

Remote Python Call (RPyC) is a transparent and symmetric distributed computing library

Pending
Overview
Eval results
Files

registry-and-discovery.mddocs/

Registry and Service Discovery

Service registry and discovery system for automatic service location and management. The registry enables services to advertise their availability and clients to automatically discover and connect to services by name, supporting both UDP and TCP protocols.

Capabilities

Registry Servers

Server implementations for hosting service registries that track available RPyC services.

class UDPRegistryServer:
    """
    UDP-based registry server for service discovery.
    Lightweight, broadcast-enabled registry suitable for LAN environments.
    """
    
    def __init__(self, host='0.0.0.0', port=18811, pruning_timeout=3, allow_listing=True):
        """
        Initialize UDP registry server.
        
        Parameters:
        - host (str): Host address to bind to
        - port (int): UDP port to bind to (default 18811)
        - pruning_timeout (float): Timeout for pruning stale services
        - allow_listing (bool): Allow service listing requests
        """
    
    def start(self):
        """Start the registry server"""
    
    def close(self):
        """Close the registry server"""

class TCPRegistryServer:
    """
    TCP-based registry server for service discovery.
    More reliable than UDP, suitable for WAN environments.
    """
    
    def __init__(self, port=18811, pruning_timeout=3, allow_listing=True):
        """
        Initialize TCP registry server.
        
        Parameters:
        - port (int): TCP port to bind to (default 18811)
        - pruning_timeout (float): Timeout for pruning stale services
        - allow_listing (bool): Allow service listing requests
        """
    
    def start(self):
        """Start the registry server"""
    
    def close(self):
        """Close the registry server"""

Registry Clients

Client implementations for registering services and querying service registries.

class UDPRegistryClient:
    """
    UDP registry client for service registration and discovery.
    """
    
    def __init__(self, ip='255.255.255.255', port=18811):
        """
        Initialize UDP registry client.
        
        Parameters:
        - ip (str): Registry server IP (default broadcast)
        - port (int): Registry server port
        """
    
    def register(self, alias, port, interface=''):
        """
        Register service with registry.
        
        Parameters:
        - alias (str): Service name/alias
        - port (int): Service port
        - interface (str): Network interface (optional)
        """
    
    def unregister(self, port):
        """
        Unregister service from registry.
        
        Parameters:
        - port (int): Service port to unregister
        """
    
    def discover(self, name):
        """
        Discover services by name.
        
        Parameters:
        - name (str): Service name to discover
        
        Returns:
        list: List of (host, port) tuples
        """
    
    def list_services(self):
        """
        List all available services.
        
        Returns:
        dict: Dictionary mapping service names to (host, port) tuples
        """

class TCPRegistryClient:
    """
    TCP registry client for service registration and discovery.
    """
    
    def __init__(self, ip='127.0.0.1', port=18811):
        """
        Initialize TCP registry client.
        
        Parameters:
        - ip (str): Registry server IP
        - port (int): Registry server port
        """
    
    def register(self, alias, port, interface=''):
        """
        Register service with registry.
        
        Parameters:
        - alias (str): Service name/alias
        - port (int): Service port
        - interface (str): Network interface (optional)
        """
    
    def unregister(self, port):
        """
        Unregister service from registry.
        
        Parameters:
        - port (int): Service port to unregister
        """
    
    def discover(self, name):
        """
        Discover services by name.
        
        Parameters:
        - name (str): Service name to discover
        
        Returns:
        list: List of (host, port) tuples
        """
    
    def list_services(self):
        """
        List all available services.
        
        Returns:
        dict: Dictionary mapping service names to (host, port) tuples
        """

Service Registration Decorators

Decorators and utilities for automatic service registration.

def register_service(service_class, port, registrar=None, auto_register=True):
    """
    Register service class with registry for automatic discovery.
    
    Parameters:
    - service_class: Service class to register
    - port (int): Port the service will run on
    - registrar: Registry client (optional, uses default)
    - auto_register (bool): Automatically register on server start
    
    Returns:
    Configured server with registry integration
    """

Examples

Setting Up UDP Registry

from rpyc.utils.registry import UDPRegistryServer
import threading

# Start UDP registry server
registry = UDPRegistryServer(host='0.0.0.0', port=18811)
registry_thread = threading.Thread(target=registry.start)
registry_thread.start()

print("UDP Registry running on port 18811")

Registering Services

import rpyc
from rpyc.utils.server import ThreadedServer
from rpyc.utils.registry import UDPRegistryClient

class CalculatorService(rpyc.Service):
    SERVICE_NAME = "CALCULATOR"
    
    @rpyc.exposed
    def add(self, a, b):
        return a + b
    
    @rpyc.exposed
    def multiply(self, a, b):
        return a * b

# Create registry client
registry = UDPRegistryClient()

# Create server with registry integration
server = ThreadedServer(
    CalculatorService,
    port=12345,
    registrar=registry,
    auto_register=True
)

print("Calculator service starting with registry integration")
server.start()

Service Discovery and Connection

import rpyc
from rpyc.utils.registry import UDPRegistryClient

# Create registry client
registry = UDPRegistryClient()

# Discover available services
services = registry.list_services()
print("Available services:", services)

# Discover specific service
calculator_endpoints = registry.discover("CALCULATOR")
if calculator_endpoints:
    host, port = calculator_endpoints[0]
    print(f"Found CALCULATOR service at {host}:{port}")
    
    # Connect to discovered service
    conn = rpyc.connect(host, port)
    result = conn.root.add(5, 3)
    print(f"5 + 3 = {result}")
    conn.close()
else:
    print("CALCULATOR service not found")

TCP Registry with Multiple Services

from rpyc.utils.registry import TCPRegistryServer, TCPRegistryClient
import rpyc
from rpyc.utils.server import ThreadedServer
import threading

# Start TCP registry server
def run_registry():
    registry = TCPRegistryServer(port=18811)
    registry.start()

registry_thread = threading.Thread(target=run_registry)
registry_thread.daemon = True
registry_thread.start()

# Service definitions
class DataService(rpyc.Service):
    SERVICE_NAME = "DATA_PROCESSOR"
    
    @rpyc.exposed
    def process_data(self, data):
        return [x * 2 for x in data]

class AuthService(rpyc.Service):
    SERVICE_NAME = "AUTHENTICATOR"
    
    @rpyc.exposed
    def validate_token(self, token):
        return token == "valid_token"

# Create registry client
registry_client = TCPRegistryClient()

# Start multiple services with registry
services = [
    (DataService, 12001),
    (AuthService, 12002)
]

servers = []
for service_class, port in services:
    server = ThreadedServer(
        service_class,
        port=port,
        registrar=registry_client,
        auto_register=True
    )
    servers.append(server)
    
    # Start server in background
    server_thread = threading.Thread(target=server.start)
    server_thread.daemon = True
    server_thread.start()
    
    print(f"{service_class.SERVICE_NAME} started on port {port}")

# Client discovery and usage
import time
time.sleep(1)  # Wait for services to register

# List all services
all_services = registry_client.list_services()
print("All registered services:", all_services)

# Use discovered services
data_endpoints = registry_client.discover("DATA_PROCESSOR")
if data_endpoints:
    host, port = data_endpoints[0]
    conn = rpyc.connect(host, port)
    result = conn.root.process_data([1, 2, 3, 4])
    print("Processed data:", result)
    conn.close()

auth_endpoints = registry_client.discover("AUTHENTICATOR")
if auth_endpoints:
    host, port = auth_endpoints[0]
    conn = rpyc.connect(host, port)
    is_valid = conn.root.validate_token("valid_token")
    print("Token validation:", is_valid)
    conn.close()

Registry with Service Health Monitoring

from rpyc.utils.registry import UDPRegistryClient
import rpyc
import time
import threading

class MonitoredService(rpyc.Service):
    SERVICE_NAME = "MONITORED_SERVICE"
    
    def __init__(self):
        self.start_time = time.time()
        self.request_count = 0
    
    @rpyc.exposed
    def get_status(self):
        uptime = time.time() - self.start_time
        return {
            'uptime': uptime,
            'requests_served': self.request_count,
            'status': 'healthy'
        }
    
    @rpyc.exposed
    def process_request(self, data):
        self.request_count += 1
        return f"Processed: {data}"

# Service with periodic re-registration (health check)
def maintain_registration(service_port, registry_client):
    while True:
        try:
            # Re-register every 30 seconds to show service is healthy
            registry_client.register("MONITORED_SERVICE", service_port)
            time.sleep(30)
        except Exception as e:
            print(f"Registration failed: {e}")
            time.sleep(5)

# Start service
registry = UDPRegistryClient()
server = ThreadedServer(MonitoredService, port=12345)

# Start registration maintenance
registration_thread = threading.Thread(
    target=maintain_registration, 
    args=(12345, registry)
)
registration_thread.daemon = True
registration_thread.start()

# Start server
server_thread = threading.Thread(target=server.start)
server_thread.daemon = True
server_thread.start()

print("Monitored service with health check registration started")

# Monitor service health
while True:
    try:
        services = registry.discover("MONITORED_SERVICE")
        if services:
            host, port = services[0]
            conn = rpyc.connect(host, port)
            status = conn.root.get_status()
            print(f"Service health: {status}")
            conn.close()
        else:
            print("Service not available")
        
        time.sleep(10)
    except KeyboardInterrupt:
        break
    except Exception as e:
        print(f"Health check failed: {e}")
        time.sleep(5)

Constants

REGISTRY_PORT = 18811                # Default registry port
DEFAULT_PRUNING_TIMEOUT = 3          # Default service timeout (seconds)
UDP_BROADCAST_ADDRESS = '255.255.255.255'  # Default UDP broadcast address

Exceptions

class RegistryError(Exception):
    """Base exception for registry operations"""

class ServiceNotFoundError(RegistryError):
    """Raised when requested service is not found"""

class RegistrationError(RegistryError):
    """Raised when service registration fails"""

Install with Tessl CLI

npx tessl i tessl/pypi-rpyc

docs

authentication-and-security.md

classic-mode.md

cli-tools.md

connection-factory.md

index.md

registry-and-discovery.md

servers.md

services-protocols.md

streams-and-channels.md

utilities.md

tile.json