CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rpyc

Remote Python Call (RPyC) is a transparent and symmetric distributed computing library

Pending
Overview
Eval results
Files

streams-and-channels.mddocs/

Streams and Channels

Low-level transport mechanisms and communication channels for RPyC connections. These classes provide the underlying transport layer for different communication methods including TCP sockets, SSL, pipes, and custom channels.

Capabilities

Stream Classes

Low-level stream implementations for different transport mechanisms.

class SocketStream:
    """
    TCP socket-based stream for network communication.
    """
    
    def __init__(self, sock):
        """
        Initialize socket stream.
        
        Parameters:
        - sock: TCP socket object
        """
    
    @classmethod
    def connect(cls, host, port, ipv6=False, keepalive=False):
        """
        Create connection to remote host.
        
        Parameters:
        - host (str): Remote hostname or IP
        - port (int): Remote port
        - ipv6 (bool): Use IPv6 if True
        - keepalive (bool): Enable TCP keepalive
        
        Returns:
        SocketStream: Connected socket stream
        """
    
    def close(self):
        """Close the socket stream"""
    
    def read(self, count):
        """
        Read data from stream.
        
        Parameters:
        - count (int): Number of bytes to read
        
        Returns:
        bytes: Data read from stream
        """
    
    def write(self, data):
        """
        Write data to stream.
        
        Parameters:
        - data (bytes): Data to write
        """
    
    @property
    def closed(self) -> bool:
        """True if stream is closed"""

class TunneledSocketStream(SocketStream):
    """
    Socket stream that operates through a tunnel (SSH, proxy, etc.).
    """
    
    def __init__(self, sock):
        """
        Initialize tunneled socket stream.
        
        Parameters:
        - sock: Tunneled socket object
        """

class PipeStream:
    """
    Stream implementation using pipes for inter-process communication.
    """
    
    def __init__(self, input, output):
        """
        Initialize pipe stream.
        
        Parameters:
        - input: Input pipe/file object
        - output: Output pipe/file object
        """
    
    @classmethod  
    def create_pair(cls):
        """
        Create pair of connected pipe streams.
        
        Returns:
        tuple: (stream1, stream2) connected pipe streams
        """
    
    def close(self):
        """Close pipe stream"""
    
    def read(self, count):
        """
        Read data from input pipe.
        
        Parameters:
        - count (int): Number of bytes to read
        
        Returns:
        bytes: Data read from pipe
        """
    
    def write(self, data):
        """
        Write data to output pipe.
        
        Parameters:
        - data (bytes): Data to write
        """
    
    @property
    def closed(self) -> bool:
        """True if stream is closed"""

Channel Classes

Higher-level channel abstractions built on top of streams.

class Channel:
    """
    Bidirectional communication channel with buffering and framing.
    """
    
    def __init__(self, stream):
        """
        Initialize channel.
        
        Parameters:  
        - stream: Underlying stream object
        """
    
    def close(self):
        """Close the channel"""
    
    def send(self, data):
        """
        Send data through channel.
        
        Parameters:
        - data (bytes): Data to send
        """
    
    def recv(self):
        """
        Receive data from channel.
        
        Returns:
        bytes: Received data
        """
    
    def poll(self, timeout=0):
        """
        Poll for available data.
        
        Parameters:
        - timeout (float): Timeout in seconds
        
        Returns:
        bool: True if data is available
        """
    
    @property
    def closed(self) -> bool:
        """True if channel is closed"""

class CompressedChannel(Channel):
    """
    Channel with data compression for reduced bandwidth usage.
    """
    
    def __init__(self, stream, compression_level=6):
        """
        Initialize compressed channel.
        
        Parameters:
        - stream: Underlying stream object
        - compression_level (int): Compression level (0-9)
        """

class EncryptedChannel(Channel):
    """
    Channel with encryption for secure communication.
    """
    
    def __init__(self, stream, key):
        """
        Initialize encrypted channel.
        
        Parameters:
        - stream: Underlying stream object
        - key (bytes): Encryption key
        """

Stream Factories

Factory functions for creating different types of streams and channels.

def create_socket_stream(host, port, ipv6=False, keepalive=False):
    """
    Create TCP socket stream.
    
    Parameters:
    - host (str): Remote hostname
    - port (int): Remote port  
    - ipv6 (bool): Use IPv6
    - keepalive (bool): Enable keepalive
    
    Returns:
    SocketStream: Connected socket stream
    """

def create_ssl_stream(host, port, keyfile=None, certfile=None, ca_certs=None, **kwargs):
    """
    Create SSL-encrypted socket stream.
    
    Parameters:
    - host (str): Remote hostname
    - port (int): Remote port
    - keyfile (str): Private key file path
    - certfile (str): Certificate file path
    - ca_certs (str): CA certificates file path
    - kwargs: Additional SSL parameters
    
    Returns:
    SocketStream: SSL-encrypted socket stream
    """

def create_pipe_stream(cmd_args):
    """
    Create pipe stream to subprocess.
    
    Parameters:
    - cmd_args (list): Command and arguments for subprocess
    
    Returns:
    PipeStream: Pipe stream to subprocess
    """

def create_unix_stream(path):
    """
    Create Unix domain socket stream.
    
    Parameters:
    - path (str): Unix socket file path
    
    Returns:
    SocketStream: Unix socket stream
    """

Buffering and Performance

Classes for optimizing stream performance through buffering and batching.

class BufferedStream:
    """
    Stream wrapper with read/write buffering for performance optimization.
    """
    
    def __init__(self, stream, buffer_size=8192):
        """
        Initialize buffered stream.
        
        Parameters:
        - stream: Underlying stream object
        - buffer_size (int): Buffer size in bytes
        """
    
    def flush(self):
        """Flush write buffer"""
    
    def read(self, count):
        """Read with buffering"""
    
    def write(self, data):
        """Write with buffering"""

class BatchingChannel(Channel):
    """
    Channel that batches small messages for improved throughput.
    """
    
    def __init__(self, stream, batch_size=10, batch_timeout=0.1):
        """
        Initialize batching channel.
        
        Parameters:
        - stream: Underlying stream object
        - batch_size (int): Maximum messages per batch
        - batch_timeout (float): Maximum batch wait time
        """

Examples

Basic Stream Usage

import rpyc
from rpyc.core import SocketStream, Channel

# Create socket stream
stream = SocketStream.connect('localhost', 12345)

# Create channel on top of stream
channel = Channel(stream)

# Send and receive data
channel.send(b'Hello RPyC')
response = channel.recv()
print("Response:", response)

# Cleanup
channel.close()

SSL Stream Connection

from rpyc.core import create_ssl_stream, Channel

# Create SSL stream with client certificates
ssl_stream = create_ssl_stream(
    'secure-server.com', 18821,
    keyfile='client.key',
    certfile='client.crt',
    ca_certs='ca.crt'
)

# Use with RPyC connection
channel = Channel(ssl_stream)
conn = rpyc.Connection(rpyc.VoidService, channel)

# Use connection
result = conn.root.some_function()
conn.close()

Pipe Stream to Subprocess

from rpyc.core import PipeStream
import rpyc

# Create pipe stream to subprocess running RPyC server
pipe_stream = PipeStream.create_pair()

# In real usage, you'd spawn subprocess with other end of pipe
# For demo, create connection directly
channel = rpyc.Channel(pipe_stream)
conn = rpyc.Connection(rpyc.ClassicService, channel)

# Use connection
conn.execute('print("Hello from subprocess")')
conn.close()

Custom Stream Implementation

from rpyc.core import Channel
import socket
import ssl

class CustomSecureStream:
    """Custom stream with additional security features"""
    
    def __init__(self, host, port, auth_token):
        # Create SSL socket
        context = ssl.create_default_context()
        sock = socket.create_connection((host, port))
        self.ssl_sock = context.wrap_socket(sock, server_hostname=host)
        
        # Send authentication token
        self.ssl_sock.send(auth_token.encode())
        response = self.ssl_sock.recv(100)
        if response != b'AUTH_OK':
            raise Exception("Authentication failed")
    
    def read(self, count):
        return self.ssl_sock.recv(count)
    
    def write(self, data):
        self.ssl_sock.send(data)
    
    def close(self):
        self.ssl_sock.close()
    
    @property
    def closed(self):
        return self.ssl_sock._closed

# Use custom stream
custom_stream = CustomSecureStream('secure-host.com', 12345, 'secret_token')
channel = Channel(custom_stream)
conn = rpyc.Connection(rpyc.VoidService, channel)

# Use connection normally
result = conn.root.some_method()
conn.close()

Compressed Communication

from rpyc.core import SocketStream, CompressedChannel
import rpyc

# Create compressed channel for bandwidth efficiency
stream = SocketStream.connect('remote-host', 12345)
compressed_channel = CompressedChannel(stream, compression_level=9)

# Create connection with compression
conn = rpyc.Connection(rpyc.VoidService, compressed_channel)

# Large data transfers will be compressed automatically
large_data = list(range(10000))
result = conn.root.process_large_data(large_data)

conn.close()

Performance Optimized Streaming

from rpyc.core import SocketStream, BufferedStream, BatchingChannel
import rpyc

# Create performance-optimized connection
base_stream = SocketStream.connect('high-throughput-server', 12345)
buffered_stream = BufferedStream(base_stream, buffer_size=32768)  # 32KB buffer
batching_channel = BatchingChannel(buffered_stream, batch_size=20, batch_timeout=0.05)

# Create connection
conn = rpyc.Connection(rpyc.VoidService, batching_channel)

# High-frequency operations benefit from batching and buffering
for i in range(1000):
    result = conn.root.quick_operation(i)

conn.close()

Unix Domain Socket Connection

from rpyc.core import create_unix_stream, Channel
import rpyc

# Create Unix socket stream for local IPC
unix_stream = create_unix_stream('/tmp/rpyc.sock')
channel = Channel(unix_stream)

# Create connection
conn = rpyc.Connection(rpyc.ClassicService, channel)

# Use for local inter-process communication
local_files = conn.modules.os.listdir('/tmp')
print("Local temp files:", local_files)

conn.close()

Multi-threaded Stream Pool

from rpyc.core import SocketStream, Channel
import rpyc
import threading
import queue

class StreamPool:
    """Pool of reusable streams for connection management"""
    
    def __init__(self, host, port, pool_size=10):
        self.host = host
        self.port = port
        self.pool = queue.Queue()
        
        # Create initial pool
        for _ in range(pool_size):
            stream = SocketStream.connect(host, port)
            self.pool.put(stream)
    
    def get_connection(self):
        """Get connection from pool"""
        stream = self.pool.get()
        channel = Channel(stream)
        return rpyc.Connection(rpyc.VoidService, channel)
    
    def return_connection(self, conn):
        """Return connection to pool"""
        self.pool.put(conn._channel.stream)
        conn.close()

# Use stream pool for efficient connection reuse
pool = StreamPool('server.example.com', 12345, pool_size=20)

def worker_thread(thread_id):
    """Worker thread using pooled connections"""
    for i in range(100):
        conn = pool.get_connection()
        try:
            result = conn.root.work_function(thread_id, i)
            print(f"Thread {thread_id}, task {i}: {result}")
        finally:
            pool.return_connection(conn)

# Start multiple worker threads
threads = []
for tid in range(10):
    t = threading.Thread(target=worker_thread, args=(tid,))
    t.start()
    threads.append(t)

# Wait for completion
for t in threads:
    t.join()

Stream Monitoring and Debugging

from rpyc.core import SocketStream, Channel
import rpyc
import time

class MonitoredStream:
    """Stream wrapper that monitors traffic for debugging"""
    
    def __init__(self, stream):
        self.stream = stream
        self.bytes_sent = 0
        self.bytes_received = 0
        self.start_time = time.time()
    
    def read(self, count):
        data = self.stream.read(count)
        self.bytes_received += len(data)
        print(f"READ: {len(data)} bytes (total: {self.bytes_received})")
        return data
    
    def write(self, data):
        self.stream.write(data)
        self.bytes_sent += len(data)
        print(f"WRITE: {len(data)} bytes (total: {self.bytes_sent})")
    
    def close(self):
        duration = time.time() - self.start_time
        print(f"Stream closed after {duration:.2f}s")
        print(f"Total sent: {self.bytes_sent} bytes")
        print(f"Total received: {self.bytes_received} bytes")
        self.stream.close()
    
    @property
    def closed(self):
        return self.stream.closed

# Use monitored stream for debugging
base_stream = SocketStream.connect('localhost', 12345)
monitored_stream = MonitoredStream(base_stream)
channel = Channel(monitored_stream)

conn = rpyc.Connection(rpyc.VoidService, channel)

# Operations will show traffic monitoring
result = conn.root.some_operation()
conn.close()  # Shows traffic summary

Constants

DEFAULT_BUFFER_SIZE = 8192           # Default stream buffer size
MAX_BUFFER_SIZE = 1048576           # Maximum buffer size (1MB)
DEFAULT_BATCH_SIZE = 10             # Default message batch size
DEFAULT_BATCH_TIMEOUT = 0.1         # Default batch timeout (seconds)
DEFAULT_COMPRESSION_LEVEL = 6       # Default compression level
STREAM_CHUNK_SIZE = 64000          # Default chunk size for large transfers

Exceptions

class StreamError(Exception):
    """Base exception for stream operations"""

class ConnectionLostError(StreamError):
    """Raised when stream connection is lost"""

class BufferOverflowError(StreamError):
    """Raised when buffer size limits are exceeded"""

class CompressionError(StreamError):
    """Raised when compression/decompression fails"""

class EncryptionError(StreamError):
    """Raised when encryption/decryption fails"""

Install with Tessl CLI

npx tessl i tessl/pypi-rpyc

docs

authentication-and-security.md

classic-mode.md

cli-tools.md

connection-factory.md

index.md

registry-and-discovery.md

servers.md

services-protocols.md

streams-and-channels.md

utilities.md

tile.json