Remote Python Call (RPyC) is a transparent and symmetric distributed computing library
—
Authentication mechanisms and security features for securing RPyC connections and services. Includes SSL/TLS authentication, custom authenticators, and security configuration options for protecting distributed applications.
SSL/TLS-based authentication providing encrypted communication and client certificate verification.
class SSLAuthenticator:
"""
SSL/TLS authenticator providing certificate-based authentication.
Supports client certificate verification and encrypted communication.
"""
def __init__(self, keyfile, certfile, ca_certs=None, cert_reqs=None,
ssl_version=None, ciphers=None):
"""
Initialize SSL authenticator.
Parameters:
- keyfile (str): Path to private key file
- certfile (str): Path to certificate file
- ca_certs (str): Path to CA certificates file (optional)
- cert_reqs: Certificate requirements (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED)
- ssl_version: SSL/TLS version to use
- ciphers (str): Cipher suites to allow
"""
def __call__(self, sock):
"""
Authenticate connection using SSL/TLS.
Parameters:
- sock: Socket to authenticate
Returns:
tuple: (wrapped_socket, credentials) or raises AuthenticationError
"""Base classes and utilities for implementing custom authentication mechanisms.
class Authenticator:
"""
Base class for custom authenticators.
Subclass to implement custom authentication logic.
"""
def __call__(self, sock):
"""
Authenticate connection.
Parameters:
- sock: Socket to authenticate
Returns:
tuple: (socket, credentials) or raises AuthenticationError
Note: Must be implemented by subclasses
"""
raise NotImplementedError()
def create_authenticator(auth_func):
"""
Create authenticator from function.
Parameters:
- auth_func (callable): Function taking socket, returning (socket, credentials)
Returns:
Authenticator: Callable authenticator object
"""Security-related configuration options and utilities.
# Security configuration constants
SECURE_DEFAULT_CONFIG = {
'allow_all_attrs': False, # Restrict attribute access
'allow_pickle': False, # Disable pickle serialization
'allow_setattr': False, # Disable attribute setting
'allow_delattr': False, # Disable attribute deletion
'allow_public_attrs': True, # Allow public attributes only
'exposed_prefix': 'exposed_', # Required prefix for exposed methods
'safe_attrs': frozenset([ # Safe attributes whitelist
'__abs__', '__add__', '__and__', '__bool__', '__cmp__',
'__contains__', '__div__', '__divmod__', '__doc__', '__eq__',
'__float__', '__floordiv__', '__ge__', '__getitem__',
'__gt__', '__hash__', '__hex__', '__iadd__', '__iand__',
'__idiv__', '__ifloordiv__', '__ilshift__', '__imod__',
'__imul__', '__int__', '__invert__', '__ior__', '__ipow__',
'__irshift__', '__isub__', '__iter__', '__itruediv__',
'__ixor__', '__le__', '__len__', '__long__', '__lshift__',
'__lt__', '__mod__', '__mul__', '__ne__', '__neg__',
'__next__', '__oct__', '__or__', '__pos__', '__pow__',
'__radd__', '__rand__', '__rdiv__', '__rdivmod__',
'__repr__', '__rfloordiv__', '__rlshift__', '__rmod__',
'__rmul__', '__ror__', '__rpow__', '__rrshift__',
'__rshift__', '__rsub__', '__rtruediv__', '__rxor__',
'__setitem__', '__str__', '__sub__', '__truediv__',
'__xor__', 'next'
]),
'instantiate_custom_exceptions': False,
'propagate_SystemExit_locally': False,
'propagate_KeyboardInterrupt_locally': False,
}
def create_secure_config(additional_options=None):
"""
Create secure configuration for RPyC connections.
Parameters:
- additional_options (dict): Additional configuration options
Returns:
dict: Secure configuration dictionary
"""Utilities for implementing fine-grained access control.
def restricted_service(service_class, allowed_methods=None, denied_methods=None):
"""
Create restricted version of service class.
Parameters:
- service_class: Service class to restrict
- allowed_methods (set): Set of allowed method names
- denied_methods (set): Set of denied method names
Returns:
class: Restricted service class
"""
class SecurityPolicy:
"""
Security policy for controlling access to service methods and attributes.
"""
def __init__(self, allowed_methods=None, denied_methods=None,
allowed_attrs=None, denied_attrs=None):
"""
Initialize security policy.
Parameters:
- allowed_methods (set): Allowed method names
- denied_methods (set): Denied method names
- allowed_attrs (set): Allowed attribute names
- denied_attrs (set): Denied attribute names
"""
def check_method_access(self, method_name):
"""
Check if method access is allowed.
Parameters:
- method_name (str): Method name to check
Returns:
bool: True if access allowed
"""
def check_attr_access(self, attr_name, is_write=False):
"""
Check if attribute access is allowed.
Parameters:
- attr_name (str): Attribute name to check
- is_write (bool): True for write access, False for read
Returns:
bool: True if access allowed
"""import rpyc
from rpyc.utils.server import ThreadedServer
from rpyc.utils.authenticators import SSLAuthenticator
import ssl
class SecureService(rpyc.Service):
@rpyc.exposed
def get_secret_data(self):
return "This is confidential information"
@rpyc.exposed
def process_secure_request(self, data):
return f"Securely processed: {data}"
# Create SSL authenticator with client certificate verification
authenticator = SSLAuthenticator(
keyfile='server.key',
certfile='server.crt',
ca_certs='ca.crt',
cert_reqs=ssl.CERT_REQUIRED # Require client certificates
)
# Create secure server
server = ThreadedServer(
SecureService,
port=18821, # Standard SSL port
authenticator=authenticator
)
print("Secure SSL server with client authentication started")
server.start()import rpyc
# Connect with SSL client certificate
conn = rpyc.ssl_connect(
'secure-server.com', 18821,
keyfile='client.key',
certfile='client.crt',
ca_certs='ca.crt'
)
try:
# Use secure connection
secret = conn.root.get_secret_data()
result = conn.root.process_secure_request("sensitive data")
print(f"Secret: {secret}")
print(f"Result: {result}")
finally:
conn.close()import rpyc
from rpyc.utils.server import ThreadedServer
from rpyc.utils.authenticators import AuthenticationError
import hashlib
import time
class TokenAuthenticator:
"""Custom token-based authenticator"""
def __init__(self, valid_tokens):
self.valid_tokens = set(valid_tokens)
def __call__(self, sock):
"""Authenticate using token exchange"""
try:
# Receive token from client
token_data = sock.recv(1024).decode('utf-8')
token_parts = token_data.split(':')
if len(token_parts) != 2:
raise AuthenticationError("Invalid token format")
username, token = token_parts
# Validate token
if token not in self.valid_tokens:
raise AuthenticationError("Invalid token")
# Send confirmation
sock.send(b"AUTH_OK")
# Return socket and credentials
credentials = {'username': username, 'authenticated': True}
return sock, credentials
except Exception as e:
raise AuthenticationError(f"Authentication failed: {e}")
class AuthenticatedService(rpyc.Service):
def on_connect(self, conn):
print(f"User {conn._config.get('credentials', {}).get('username')} connected")
@rpyc.exposed
def get_user_data(self):
# Access credentials from connection
username = self.exposed_get_username()
return f"Data for user: {username}"
@rpyc.exposed
def get_username(self):
# Would access from connection context in real implementation
return "authenticated_user"
# Create server with custom authenticator
valid_tokens = ['token123', 'secret456', 'auth789']
authenticator = TokenAuthenticator(valid_tokens)
server = ThreadedServer(
AuthenticatedService,
port=12345,
authenticator=authenticator
)
print("Server with custom token authentication started")
server.start()import rpyc
from rpyc.utils.authenticators import SSLAuthenticator
import ssl
# Create highly secure configuration
secure_config = {
'allow_all_attrs': False, # No attribute access
'allow_pickle': False, # No pickle serialization
'allow_setattr': False, # No attribute setting
'allow_delattr': False, # No attribute deletion
'allow_public_attrs': False, # No public attributes
'exposed_prefix': 'exposed_', # Require exposed prefix
'instantiate_custom_exceptions': False,
'propagate_SystemExit_locally': True,
'propagate_KeyboardInterrupt_locally': True,
'sync_request_timeout': 10, # Short timeout
}
class HighSecurityService(rpyc.Service):
"""Service with minimal attack surface"""
@rpyc.exposed
def safe_operation(self, data):
# Only basic operations on validated data
if isinstance(data, (int, float, str)):
return f"Processed: {data}"
else:
raise ValueError("Only basic types allowed")
@rpyc.exposed
def get_public_info(self):
return "This is public information"
# SSL authenticator with strict settings
authenticator = SSLAuthenticator(
keyfile='server.key',
certfile='server.crt',
ca_certs='ca.crt',
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_TLS,
ciphers='ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS'
)
# Connect with secure configuration
conn = rpyc.ssl_connect(
'secure-server.com', 18821,
keyfile='client.key',
certfile='client.crt',
ca_certs='ca.crt',
config=secure_config
)
try:
# Only allowed operations work
result = conn.root.safe_operation("test data")
info = conn.root.get_public_info()
print(f"Result: {result}")
print(f"Info: {info}")
# These would fail due to security restrictions:
# conn.root.some_attr # Would raise AttributeError
# conn.root.unsafe_method() # Would not be accessible
finally:
conn.close()import rpyc
from rpyc.utils.server import ThreadedServer
from rpyc.utils.authenticators import AuthenticationError
class RoleBasedAuthenticator:
"""Authenticator that assigns roles based on credentials"""
def __init__(self, user_roles):
self.user_roles = user_roles # {'username': ['role1', 'role2']}
def __call__(self, sock):
# Simple username/password authentication
credentials_data = sock.recv(1024).decode('utf-8')
username, password = credentials_data.split(':')
# In real implementation, verify password against database
if username in self.user_roles:
sock.send(b"AUTH_OK")
roles = self.user_roles[username]
return sock, {'username': username, 'roles': roles}
else:
raise AuthenticationError("Invalid credentials")
class RoleProtectedService(rpyc.Service):
"""Service with role-based method protection"""
def __init__(self):
self._connection = None
def on_connect(self, conn):
self._connection = conn
print(f"User connected: {self.get_username()} with roles: {self.get_roles()}")
def get_username(self):
return self._connection._config.get('credentials', {}).get('username', 'unknown')
def get_roles(self):
return self._connection._config.get('credentials', {}).get('roles', [])
def require_role(self, required_role):
"""Check if user has required role"""
user_roles = self.get_roles()
if required_role not in user_roles:
raise PermissionError(f"Role '{required_role}' required")
@rpyc.exposed
def public_method(self):
return "This method is available to all authenticated users"
@rpyc.exposed
def admin_method(self):
self.require_role('admin')
return "This method requires admin role"
@rpyc.exposed
def user_method(self):
self.require_role('user')
return f"Hello {self.get_username()}, you have user access"
@rpyc.exposed
def manager_method(self):
self.require_role('manager')
return "This method requires manager role"
# Setup users with roles
user_roles = {
'alice': ['user', 'admin'],
'bob': ['user'],
'charlie': ['manager', 'user']
}
authenticator = RoleBasedAuthenticator(user_roles)
server = ThreadedServer(
RoleProtectedService,
port=12345,
authenticator=authenticator
)
print("Role-based access control server started")
server.start()import rpyc
from rpyc.utils.server import ThreadedServer
import logging
import time
class SecurityMonitoringService(rpyc.Service):
"""Service that logs security events"""
def __init__(self):
self.logger = logging.getLogger('rpyc.security')
self.connection_log = {}
def on_connect(self, conn):
client_info = {
'connect_time': time.time(),
'remote_addr': getattr(conn._channel.stream.sock, 'getpeername', lambda: 'unknown')(),
'requests': 0
}
self.connection_log[id(conn)] = client_info
self.logger.info(f"Client connected: {client_info['remote_addr']}")
def on_disconnect(self, conn):
conn_id = id(conn)
if conn_id in self.connection_log:
info = self.connection_log[conn_id]
duration = time.time() - info['connect_time']
self.logger.info(f"Client disconnected after {duration:.2f}s, "
f"{info['requests']} requests")
del self.connection_log[conn_id]
def log_request(self, method_name):
"""Log method invocation"""
# In real implementation, get connection from context
self.logger.info(f"Method called: {method_name}")
@rpyc.exposed
def secure_operation(self, data):
self.log_request('secure_operation')
# Validate input
if not isinstance(data, str) or len(data) > 1000:
self.logger.warning("Invalid input rejected")
raise ValueError("Invalid input")
return f"Processed: {data}"
@rpyc.exposed
def get_stats(self):
self.log_request('get_stats')
return {
'active_connections': len(self.connection_log),
'server_uptime': time.time()
}
# Setup security logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('rpyc_security.log'),
logging.StreamHandler()
]
)
server = ThreadedServer(SecurityMonitoringService, port=12345)
print("Security monitoring server started")
server.start()DEFAULT_SSL_PORT = 18821 # Default SSL server port
SSL_CIPHER_SUITES = 'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS'
AUTH_TIMEOUT = 30 # Authentication timeout (seconds)
MAX_AUTH_ATTEMPTS = 3 # Maximum authentication attemptsclass AuthenticationError(Exception):
"""Raised when authentication fails"""
class SecurityError(Exception):
"""Base exception for security-related errors"""
class PermissionDeniedError(SecurityError):
"""Raised when access is denied due to insufficient permissions"""
class InvalidCredentialsError(AuthenticationError):
"""Raised when provided credentials are invalid"""Install with Tessl CLI
npx tessl i tessl/pypi-rpyc