CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rpyc

Remote Python Call (RPyC) is a transparent and symmetric distributed computing library

Pending
Overview
Eval results
Files

authentication-and-security.mddocs/

Authentication and Security

Authentication mechanisms and security features for securing RPyC connections and services. Includes SSL/TLS authentication, custom authenticators, and security configuration options for protecting distributed applications.

Capabilities

SSL/TLS Authentication

SSL/TLS-based authentication providing encrypted communication and client certificate verification.

class SSLAuthenticator:
    """
    SSL/TLS authenticator providing certificate-based authentication.
    Supports client certificate verification and encrypted communication.
    """
    
    def __init__(self, keyfile, certfile, ca_certs=None, cert_reqs=None, 
                 ssl_version=None, ciphers=None):
        """
        Initialize SSL authenticator.
        
        Parameters:
        - keyfile (str): Path to private key file
        - certfile (str): Path to certificate file
        - ca_certs (str): Path to CA certificates file (optional)
        - cert_reqs: Certificate requirements (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED)
        - ssl_version: SSL/TLS version to use
        - ciphers (str): Cipher suites to allow
        """
    
    def __call__(self, sock):
        """
        Authenticate connection using SSL/TLS.
        
        Parameters:
        - sock: Socket to authenticate
        
        Returns:
        tuple: (wrapped_socket, credentials) or raises AuthenticationError
        """

Custom Authentication

Base classes and utilities for implementing custom authentication mechanisms.

class Authenticator:
    """
    Base class for custom authenticators.
    Subclass to implement custom authentication logic.
    """
    
    def __call__(self, sock):
        """
        Authenticate connection.
        
        Parameters:
        - sock: Socket to authenticate
        
        Returns:
        tuple: (socket, credentials) or raises AuthenticationError
        
        Note: Must be implemented by subclasses
        """
        raise NotImplementedError()

def create_authenticator(auth_func):
    """
    Create authenticator from function.
    
    Parameters:
    - auth_func (callable): Function taking socket, returning (socket, credentials)
    
    Returns:
    Authenticator: Callable authenticator object
    """

Security Configuration

Security-related configuration options and utilities.

# Security configuration constants
SECURE_DEFAULT_CONFIG = {
    'allow_all_attrs': False,        # Restrict attribute access
    'allow_pickle': False,           # Disable pickle serialization
    'allow_setattr': False,          # Disable attribute setting
    'allow_delattr': False,          # Disable attribute deletion
    'allow_public_attrs': True,      # Allow public attributes only
    'exposed_prefix': 'exposed_',    # Required prefix for exposed methods
    'safe_attrs': frozenset([        # Safe attributes whitelist
        '__abs__', '__add__', '__and__', '__bool__', '__cmp__',
        '__contains__', '__div__', '__divmod__', '__doc__', '__eq__',
        '__float__', '__floordiv__', '__ge__', '__getitem__',
        '__gt__', '__hash__', '__hex__', '__iadd__', '__iand__',
        '__idiv__', '__ifloordiv__', '__ilshift__', '__imod__',
        '__imul__', '__int__', '__invert__', '__ior__', '__ipow__',
        '__irshift__', '__isub__', '__iter__', '__itruediv__',
        '__ixor__', '__le__', '__len__', '__long__', '__lshift__',
        '__lt__', '__mod__', '__mul__', '__ne__', '__neg__',
        '__next__', '__oct__', '__or__', '__pos__', '__pow__',
        '__radd__', '__rand__', '__rdiv__', '__rdivmod__',
        '__repr__', '__rfloordiv__', '__rlshift__', '__rmod__',
        '__rmul__', '__ror__', '__rpow__', '__rrshift__',
        '__rshift__', '__rsub__', '__rtruediv__', '__rxor__',
        '__setitem__', '__str__', '__sub__', '__truediv__',
        '__xor__', 'next'
    ]),
    'instantiate_custom_exceptions': False,
    'propagate_SystemExit_locally': False,
    'propagate_KeyboardInterrupt_locally': False,
}

def create_secure_config(additional_options=None):
    """
    Create secure configuration for RPyC connections.
    
    Parameters:
    - additional_options (dict): Additional configuration options
    
    Returns:
    dict: Secure configuration dictionary
    """

Access Control Utilities

Utilities for implementing fine-grained access control.

def restricted_service(service_class, allowed_methods=None, denied_methods=None):
    """
    Create restricted version of service class.
    
    Parameters:
    - service_class: Service class to restrict
    - allowed_methods (set): Set of allowed method names
    - denied_methods (set): Set of denied method names
    
    Returns:
    class: Restricted service class
    """

class SecurityPolicy:
    """
    Security policy for controlling access to service methods and attributes.
    """
    
    def __init__(self, allowed_methods=None, denied_methods=None,
                 allowed_attrs=None, denied_attrs=None):
        """
        Initialize security policy.
        
        Parameters:
        - allowed_methods (set): Allowed method names
        - denied_methods (set): Denied method names  
        - allowed_attrs (set): Allowed attribute names
        - denied_attrs (set): Denied attribute names
        """
    
    def check_method_access(self, method_name):
        """
        Check if method access is allowed.
        
        Parameters:
        - method_name (str): Method name to check
        
        Returns:
        bool: True if access allowed
        """
    
    def check_attr_access(self, attr_name, is_write=False):
        """
        Check if attribute access is allowed.
        
        Parameters:
        - attr_name (str): Attribute name to check
        - is_write (bool): True for write access, False for read
        
        Returns:
        bool: True if access allowed
        """

Examples

SSL Server with Client Authentication

import rpyc
from rpyc.utils.server import ThreadedServer
from rpyc.utils.authenticators import SSLAuthenticator
import ssl

class SecureService(rpyc.Service):
    @rpyc.exposed
    def get_secret_data(self):
        return "This is confidential information"
    
    @rpyc.exposed
    def process_secure_request(self, data):
        return f"Securely processed: {data}"

# Create SSL authenticator with client certificate verification
authenticator = SSLAuthenticator(
    keyfile='server.key',
    certfile='server.crt',
    ca_certs='ca.crt',
    cert_reqs=ssl.CERT_REQUIRED  # Require client certificates
)

# Create secure server
server = ThreadedServer(
    SecureService,
    port=18821,  # Standard SSL port
    authenticator=authenticator
)

print("Secure SSL server with client authentication started")
server.start()

SSL Client Connection

import rpyc

# Connect with SSL client certificate
conn = rpyc.ssl_connect(
    'secure-server.com', 18821,
    keyfile='client.key',
    certfile='client.crt',
    ca_certs='ca.crt'
)

try:
    # Use secure connection
    secret = conn.root.get_secret_data()
    result = conn.root.process_secure_request("sensitive data")
    print(f"Secret: {secret}")
    print(f"Result: {result}")
finally:
    conn.close()

Custom Authentication

import rpyc
from rpyc.utils.server import ThreadedServer
from rpyc.utils.authenticators import AuthenticationError
import hashlib
import time

class TokenAuthenticator:
    """Custom token-based authenticator"""
    
    def __init__(self, valid_tokens):
        self.valid_tokens = set(valid_tokens)
    
    def __call__(self, sock):
        """Authenticate using token exchange"""
        try:
            # Receive token from client
            token_data = sock.recv(1024).decode('utf-8')
            token_parts = token_data.split(':')
            
            if len(token_parts) != 2:
                raise AuthenticationError("Invalid token format")
            
            username, token = token_parts
            
            # Validate token
            if token not in self.valid_tokens:
                raise AuthenticationError("Invalid token")
            
            # Send confirmation
            sock.send(b"AUTH_OK")
            
            # Return socket and credentials
            credentials = {'username': username, 'authenticated': True}
            return sock, credentials
            
        except Exception as e:
            raise AuthenticationError(f"Authentication failed: {e}")

class AuthenticatedService(rpyc.Service):
    def on_connect(self, conn):
        print(f"User {conn._config.get('credentials', {}).get('username')} connected")
    
    @rpyc.exposed
    def get_user_data(self):
        # Access credentials from connection
        username = self.exposed_get_username()
        return f"Data for user: {username}"
    
    @rpyc.exposed
    def get_username(self):
        # Would access from connection context in real implementation
        return "authenticated_user"

# Create server with custom authenticator
valid_tokens = ['token123', 'secret456', 'auth789']
authenticator = TokenAuthenticator(valid_tokens)

server = ThreadedServer(
    AuthenticatedService,
    port=12345,
    authenticator=authenticator
)

print("Server with custom token authentication started")
server.start()

Secure Configuration

import rpyc
from rpyc.utils.authenticators import SSLAuthenticator
import ssl

# Create highly secure configuration
secure_config = {
    'allow_all_attrs': False,           # No attribute access
    'allow_pickle': False,              # No pickle serialization
    'allow_setattr': False,             # No attribute setting
    'allow_delattr': False,             # No attribute deletion
    'allow_public_attrs': False,        # No public attributes
    'exposed_prefix': 'exposed_',       # Require exposed prefix
    'instantiate_custom_exceptions': False,
    'propagate_SystemExit_locally': True,
    'propagate_KeyboardInterrupt_locally': True,
    'sync_request_timeout': 10,         # Short timeout
}

class HighSecurityService(rpyc.Service):
    """Service with minimal attack surface"""
    
    @rpyc.exposed
    def safe_operation(self, data):
        # Only basic operations on validated data
        if isinstance(data, (int, float, str)):
            return f"Processed: {data}"
        else:
            raise ValueError("Only basic types allowed")
    
    @rpyc.exposed
    def get_public_info(self):
        return "This is public information"

# SSL authenticator with strict settings
authenticator = SSLAuthenticator(
    keyfile='server.key',
    certfile='server.crt',
    ca_certs='ca.crt',
    cert_reqs=ssl.CERT_REQUIRED,
    ssl_version=ssl.PROTOCOL_TLS,
    ciphers='ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS'
)

# Connect with secure configuration
conn = rpyc.ssl_connect(
    'secure-server.com', 18821,
    keyfile='client.key',
    certfile='client.crt',
    ca_certs='ca.crt',
    config=secure_config
)

try:
    # Only allowed operations work
    result = conn.root.safe_operation("test data")
    info = conn.root.get_public_info()
    print(f"Result: {result}")
    print(f"Info: {info}")
    
    # These would fail due to security restrictions:
    # conn.root.some_attr  # Would raise AttributeError
    # conn.root.unsafe_method()  # Would not be accessible
    
finally:
    conn.close()

Role-Based Access Control

import rpyc
from rpyc.utils.server import ThreadedServer
from rpyc.utils.authenticators import AuthenticationError

class RoleBasedAuthenticator:
    """Authenticator that assigns roles based on credentials"""
    
    def __init__(self, user_roles):
        self.user_roles = user_roles  # {'username': ['role1', 'role2']}
    
    def __call__(self, sock):
        # Simple username/password authentication
        credentials_data = sock.recv(1024).decode('utf-8')
        username, password = credentials_data.split(':')
        
        # In real implementation, verify password against database
        if username in self.user_roles:
            sock.send(b"AUTH_OK")
            roles = self.user_roles[username]
            return sock, {'username': username, 'roles': roles}
        else:
            raise AuthenticationError("Invalid credentials")

class RoleProtectedService(rpyc.Service):
    """Service with role-based method protection"""
    
    def __init__(self):
        self._connection = None
    
    def on_connect(self, conn):
        self._connection = conn
        print(f"User connected: {self.get_username()} with roles: {self.get_roles()}")
    
    def get_username(self):
        return self._connection._config.get('credentials', {}).get('username', 'unknown')
    
    def get_roles(self):
        return self._connection._config.get('credentials', {}).get('roles', [])
    
    def require_role(self, required_role):
        """Check if user has required role"""
        user_roles = self.get_roles()
        if required_role not in user_roles:
            raise PermissionError(f"Role '{required_role}' required")
    
    @rpyc.exposed
    def public_method(self):
        return "This method is available to all authenticated users"
    
    @rpyc.exposed
    def admin_method(self):
        self.require_role('admin')
        return "This method requires admin role"
    
    @rpyc.exposed
    def user_method(self):
        self.require_role('user')
        return f"Hello {self.get_username()}, you have user access"
    
    @rpyc.exposed
    def manager_method(self):
        self.require_role('manager')
        return "This method requires manager role"

# Setup users with roles
user_roles = {
    'alice': ['user', 'admin'],
    'bob': ['user'],
    'charlie': ['manager', 'user']
}

authenticator = RoleBasedAuthenticator(user_roles)
server = ThreadedServer(
    RoleProtectedService,
    port=12345,
    authenticator=authenticator
)

print("Role-based access control server started")
server.start()

Connection Security Monitoring

import rpyc
from rpyc.utils.server import ThreadedServer
import logging
import time

class SecurityMonitoringService(rpyc.Service):
    """Service that logs security events"""
    
    def __init__(self):
        self.logger = logging.getLogger('rpyc.security')
        self.connection_log = {}
    
    def on_connect(self, conn):
        client_info = {
            'connect_time': time.time(),
            'remote_addr': getattr(conn._channel.stream.sock, 'getpeername', lambda: 'unknown')(),
            'requests': 0
        }
        self.connection_log[id(conn)] = client_info
        self.logger.info(f"Client connected: {client_info['remote_addr']}")
    
    def on_disconnect(self, conn):
        conn_id = id(conn)
        if conn_id in self.connection_log:
            info = self.connection_log[conn_id]
            duration = time.time() - info['connect_time']
            self.logger.info(f"Client disconnected after {duration:.2f}s, "
                           f"{info['requests']} requests")
            del self.connection_log[conn_id]
    
    def log_request(self, method_name):
        """Log method invocation"""
        # In real implementation, get connection from context
        self.logger.info(f"Method called: {method_name}")
    
    @rpyc.exposed
    def secure_operation(self, data):
        self.log_request('secure_operation')
        # Validate input
        if not isinstance(data, str) or len(data) > 1000:
            self.logger.warning("Invalid input rejected")  
            raise ValueError("Invalid input")
        return f"Processed: {data}"
    
    @rpyc.exposed
    def get_stats(self):
        self.log_request('get_stats')
        return {
            'active_connections': len(self.connection_log),
            'server_uptime': time.time()
        }

# Setup security logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('rpyc_security.log'),
        logging.StreamHandler()
    ]
)

server = ThreadedServer(SecurityMonitoringService, port=12345)
print("Security monitoring server started")
server.start()

Constants

DEFAULT_SSL_PORT = 18821                 # Default SSL server port
SSL_CIPHER_SUITES = 'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS'
AUTH_TIMEOUT = 30                        # Authentication timeout (seconds)
MAX_AUTH_ATTEMPTS = 3                    # Maximum authentication attempts

Exceptions

class AuthenticationError(Exception):
    """Raised when authentication fails"""

class SecurityError(Exception):
    """Base exception for security-related errors"""

class PermissionDeniedError(SecurityError):
    """Raised when access is denied due to insufficient permissions"""

class InvalidCredentialsError(AuthenticationError):
    """Raised when provided credentials are invalid"""

Install with Tessl CLI

npx tessl i tessl/pypi-rpyc

docs

authentication-and-security.md

classic-mode.md

cli-tools.md

connection-factory.md

index.md

registry-and-discovery.md

servers.md

services-protocols.md

streams-and-channels.md

utilities.md

tile.json