Remote Python Call (RPyC) is a transparent and symmetric distributed computing library
—
Low-level transport mechanisms and communication channels for RPyC connections. These classes provide the underlying transport layer for different communication methods including TCP sockets, SSL, pipes, and custom channels.
Low-level stream implementations for different transport mechanisms.
class SocketStream:
"""
TCP socket-based stream for network communication.
"""
def __init__(self, sock):
"""
Initialize socket stream.
Parameters:
- sock: TCP socket object
"""
@classmethod
def connect(cls, host, port, ipv6=False, keepalive=False):
"""
Create connection to remote host.
Parameters:
- host (str): Remote hostname or IP
- port (int): Remote port
- ipv6 (bool): Use IPv6 if True
- keepalive (bool): Enable TCP keepalive
Returns:
SocketStream: Connected socket stream
"""
def close(self):
"""Close the socket stream"""
def read(self, count):
"""
Read data from stream.
Parameters:
- count (int): Number of bytes to read
Returns:
bytes: Data read from stream
"""
def write(self, data):
"""
Write data to stream.
Parameters:
- data (bytes): Data to write
"""
@property
def closed(self) -> bool:
"""True if stream is closed"""
class TunneledSocketStream(SocketStream):
"""
Socket stream that operates through a tunnel (SSH, proxy, etc.).
"""
def __init__(self, sock):
"""
Initialize tunneled socket stream.
Parameters:
- sock: Tunneled socket object
"""
class PipeStream:
"""
Stream implementation using pipes for inter-process communication.
"""
def __init__(self, input, output):
"""
Initialize pipe stream.
Parameters:
- input: Input pipe/file object
- output: Output pipe/file object
"""
@classmethod
def create_pair(cls):
"""
Create pair of connected pipe streams.
Returns:
tuple: (stream1, stream2) connected pipe streams
"""
def close(self):
"""Close pipe stream"""
def read(self, count):
"""
Read data from input pipe.
Parameters:
- count (int): Number of bytes to read
Returns:
bytes: Data read from pipe
"""
def write(self, data):
"""
Write data to output pipe.
Parameters:
- data (bytes): Data to write
"""
@property
def closed(self) -> bool:
"""True if stream is closed"""Higher-level channel abstractions built on top of streams.
class Channel:
"""
Bidirectional communication channel with buffering and framing.
"""
def __init__(self, stream):
"""
Initialize channel.
Parameters:
- stream: Underlying stream object
"""
def close(self):
"""Close the channel"""
def send(self, data):
"""
Send data through channel.
Parameters:
- data (bytes): Data to send
"""
def recv(self):
"""
Receive data from channel.
Returns:
bytes: Received data
"""
def poll(self, timeout=0):
"""
Poll for available data.
Parameters:
- timeout (float): Timeout in seconds
Returns:
bool: True if data is available
"""
@property
def closed(self) -> bool:
"""True if channel is closed"""
class CompressedChannel(Channel):
"""
Channel with data compression for reduced bandwidth usage.
"""
def __init__(self, stream, compression_level=6):
"""
Initialize compressed channel.
Parameters:
- stream: Underlying stream object
- compression_level (int): Compression level (0-9)
"""
class EncryptedChannel(Channel):
"""
Channel with encryption for secure communication.
"""
def __init__(self, stream, key):
"""
Initialize encrypted channel.
Parameters:
- stream: Underlying stream object
- key (bytes): Encryption key
"""Factory functions for creating different types of streams and channels.
def create_socket_stream(host, port, ipv6=False, keepalive=False):
"""
Create TCP socket stream.
Parameters:
- host (str): Remote hostname
- port (int): Remote port
- ipv6 (bool): Use IPv6
- keepalive (bool): Enable keepalive
Returns:
SocketStream: Connected socket stream
"""
def create_ssl_stream(host, port, keyfile=None, certfile=None, ca_certs=None, **kwargs):
"""
Create SSL-encrypted socket stream.
Parameters:
- host (str): Remote hostname
- port (int): Remote port
- keyfile (str): Private key file path
- certfile (str): Certificate file path
- ca_certs (str): CA certificates file path
- kwargs: Additional SSL parameters
Returns:
SocketStream: SSL-encrypted socket stream
"""
def create_pipe_stream(cmd_args):
"""
Create pipe stream to subprocess.
Parameters:
- cmd_args (list): Command and arguments for subprocess
Returns:
PipeStream: Pipe stream to subprocess
"""
def create_unix_stream(path):
"""
Create Unix domain socket stream.
Parameters:
- path (str): Unix socket file path
Returns:
SocketStream: Unix socket stream
"""Classes for optimizing stream performance through buffering and batching.
class BufferedStream:
"""
Stream wrapper with read/write buffering for performance optimization.
"""
def __init__(self, stream, buffer_size=8192):
"""
Initialize buffered stream.
Parameters:
- stream: Underlying stream object
- buffer_size (int): Buffer size in bytes
"""
def flush(self):
"""Flush write buffer"""
def read(self, count):
"""Read with buffering"""
def write(self, data):
"""Write with buffering"""
class BatchingChannel(Channel):
"""
Channel that batches small messages for improved throughput.
"""
def __init__(self, stream, batch_size=10, batch_timeout=0.1):
"""
Initialize batching channel.
Parameters:
- stream: Underlying stream object
- batch_size (int): Maximum messages per batch
- batch_timeout (float): Maximum batch wait time
"""import rpyc
from rpyc.core import SocketStream, Channel
# Create socket stream
stream = SocketStream.connect('localhost', 12345)
# Create channel on top of stream
channel = Channel(stream)
# Send and receive data
channel.send(b'Hello RPyC')
response = channel.recv()
print("Response:", response)
# Cleanup
channel.close()from rpyc.core import create_ssl_stream, Channel
# Create SSL stream with client certificates
ssl_stream = create_ssl_stream(
'secure-server.com', 18821,
keyfile='client.key',
certfile='client.crt',
ca_certs='ca.crt'
)
# Use with RPyC connection
channel = Channel(ssl_stream)
conn = rpyc.Connection(rpyc.VoidService, channel)
# Use connection
result = conn.root.some_function()
conn.close()from rpyc.core import PipeStream
import rpyc
# Create pipe stream to subprocess running RPyC server
pipe_stream = PipeStream.create_pair()
# In real usage, you'd spawn subprocess with other end of pipe
# For demo, create connection directly
channel = rpyc.Channel(pipe_stream)
conn = rpyc.Connection(rpyc.ClassicService, channel)
# Use connection
conn.execute('print("Hello from subprocess")')
conn.close()from rpyc.core import Channel
import socket
import ssl
class CustomSecureStream:
"""Custom stream with additional security features"""
def __init__(self, host, port, auth_token):
# Create SSL socket
context = ssl.create_default_context()
sock = socket.create_connection((host, port))
self.ssl_sock = context.wrap_socket(sock, server_hostname=host)
# Send authentication token
self.ssl_sock.send(auth_token.encode())
response = self.ssl_sock.recv(100)
if response != b'AUTH_OK':
raise Exception("Authentication failed")
def read(self, count):
return self.ssl_sock.recv(count)
def write(self, data):
self.ssl_sock.send(data)
def close(self):
self.ssl_sock.close()
@property
def closed(self):
return self.ssl_sock._closed
# Use custom stream
custom_stream = CustomSecureStream('secure-host.com', 12345, 'secret_token')
channel = Channel(custom_stream)
conn = rpyc.Connection(rpyc.VoidService, channel)
# Use connection normally
result = conn.root.some_method()
conn.close()from rpyc.core import SocketStream, CompressedChannel
import rpyc
# Create compressed channel for bandwidth efficiency
stream = SocketStream.connect('remote-host', 12345)
compressed_channel = CompressedChannel(stream, compression_level=9)
# Create connection with compression
conn = rpyc.Connection(rpyc.VoidService, compressed_channel)
# Large data transfers will be compressed automatically
large_data = list(range(10000))
result = conn.root.process_large_data(large_data)
conn.close()from rpyc.core import SocketStream, BufferedStream, BatchingChannel
import rpyc
# Create performance-optimized connection
base_stream = SocketStream.connect('high-throughput-server', 12345)
buffered_stream = BufferedStream(base_stream, buffer_size=32768) # 32KB buffer
batching_channel = BatchingChannel(buffered_stream, batch_size=20, batch_timeout=0.05)
# Create connection
conn = rpyc.Connection(rpyc.VoidService, batching_channel)
# High-frequency operations benefit from batching and buffering
for i in range(1000):
result = conn.root.quick_operation(i)
conn.close()from rpyc.core import create_unix_stream, Channel
import rpyc
# Create Unix socket stream for local IPC
unix_stream = create_unix_stream('/tmp/rpyc.sock')
channel = Channel(unix_stream)
# Create connection
conn = rpyc.Connection(rpyc.ClassicService, channel)
# Use for local inter-process communication
local_files = conn.modules.os.listdir('/tmp')
print("Local temp files:", local_files)
conn.close()from rpyc.core import SocketStream, Channel
import rpyc
import threading
import queue
class StreamPool:
"""Pool of reusable streams for connection management"""
def __init__(self, host, port, pool_size=10):
self.host = host
self.port = port
self.pool = queue.Queue()
# Create initial pool
for _ in range(pool_size):
stream = SocketStream.connect(host, port)
self.pool.put(stream)
def get_connection(self):
"""Get connection from pool"""
stream = self.pool.get()
channel = Channel(stream)
return rpyc.Connection(rpyc.VoidService, channel)
def return_connection(self, conn):
"""Return connection to pool"""
self.pool.put(conn._channel.stream)
conn.close()
# Use stream pool for efficient connection reuse
pool = StreamPool('server.example.com', 12345, pool_size=20)
def worker_thread(thread_id):
"""Worker thread using pooled connections"""
for i in range(100):
conn = pool.get_connection()
try:
result = conn.root.work_function(thread_id, i)
print(f"Thread {thread_id}, task {i}: {result}")
finally:
pool.return_connection(conn)
# Start multiple worker threads
threads = []
for tid in range(10):
t = threading.Thread(target=worker_thread, args=(tid,))
t.start()
threads.append(t)
# Wait for completion
for t in threads:
t.join()from rpyc.core import SocketStream, Channel
import rpyc
import time
class MonitoredStream:
"""Stream wrapper that monitors traffic for debugging"""
def __init__(self, stream):
self.stream = stream
self.bytes_sent = 0
self.bytes_received = 0
self.start_time = time.time()
def read(self, count):
data = self.stream.read(count)
self.bytes_received += len(data)
print(f"READ: {len(data)} bytes (total: {self.bytes_received})")
return data
def write(self, data):
self.stream.write(data)
self.bytes_sent += len(data)
print(f"WRITE: {len(data)} bytes (total: {self.bytes_sent})")
def close(self):
duration = time.time() - self.start_time
print(f"Stream closed after {duration:.2f}s")
print(f"Total sent: {self.bytes_sent} bytes")
print(f"Total received: {self.bytes_received} bytes")
self.stream.close()
@property
def closed(self):
return self.stream.closed
# Use monitored stream for debugging
base_stream = SocketStream.connect('localhost', 12345)
monitored_stream = MonitoredStream(base_stream)
channel = Channel(monitored_stream)
conn = rpyc.Connection(rpyc.VoidService, channel)
# Operations will show traffic monitoring
result = conn.root.some_operation()
conn.close() # Shows traffic summaryDEFAULT_BUFFER_SIZE = 8192 # Default stream buffer size
MAX_BUFFER_SIZE = 1048576 # Maximum buffer size (1MB)
DEFAULT_BATCH_SIZE = 10 # Default message batch size
DEFAULT_BATCH_TIMEOUT = 0.1 # Default batch timeout (seconds)
DEFAULT_COMPRESSION_LEVEL = 6 # Default compression level
STREAM_CHUNK_SIZE = 64000 # Default chunk size for large transfersclass StreamError(Exception):
"""Base exception for stream operations"""
class ConnectionLostError(StreamError):
"""Raised when stream connection is lost"""
class BufferOverflowError(StreamError):
"""Raised when buffer size limits are exceeded"""
class CompressionError(StreamError):
"""Raised when compression/decompression fails"""
class EncryptionError(StreamError):
"""Raised when encryption/decryption fails"""Install with Tessl CLI
npx tessl i tessl/pypi-rpyc