CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiortc

Python implementation of WebRTC and ORTC for real-time peer-to-peer communication

87

1.02x
Overview
Eval results
Files

network-transport.mddocs/

Network Transport

Low-level transport layer including ICE connectivity establishment, DTLS security, and SCTP association management for reliable data delivery.

Capabilities

RTCIceTransport Class

Manages ICE (Interactive Connectivity Establishment) connectivity between peers.

class RTCIceTransport:
    """ICE transport for peer connectivity establishment."""
    
    @property
    def iceGatherer(self) -> RTCIceGatherer:
        """Associated ICE gatherer"""
    
    @property
    def role(self) -> str:
        """ICE role: "controlling" or "controlled" """
    
    @property
    def state(self) -> str:
        """Transport state: "new", "checking", "connected", "completed", "failed", "disconnected", "closed" """
    
    def addRemoteCandidate(self, candidate: RTCIceCandidate) -> None:
        """
        Add a remote ICE candidate.
        
        Parameters:
        - candidate (RTCIceCandidate): Remote candidate to add
        """
    
    def getRemoteCandidates(self) -> list:
        """
        Get list of remote ICE candidates.
        
        Returns:
        list: List of RTCIceCandidate objects
        """
    
    async def start(self, remoteParameters: RTCIceParameters) -> None:
        """
        Start ICE connectivity checks.
        
        Parameters:
        - remoteParameters (RTCIceParameters): Remote ICE parameters
        """
    
    async def stop(self) -> None:
        """Stop ICE transport"""

RTCIceGatherer Class

Gathers local ICE candidates for connectivity establishment.

class RTCIceGatherer:
    """ICE candidate gatherer."""
    
    @property  
    def state(self) -> str:
        """Gathering state: "new", "gathering", "complete" """
    
    async def gather(self, iceServers=None) -> None:
        """
        Gather ICE candidates.
        
        Parameters:
        - iceServers (list, optional): List of RTCIceServer objects
        """
    
    def getDefaultIceServers(self) -> list:
        """
        Get default STUN servers.
        
        Returns:
        list: List of default RTCIceServer objects
        """
    
    def getLocalCandidates(self) -> list:
        """
        Get gathered local candidates.
        
        Returns:
        list: List of RTCIceCandidate objects
        """
    
    def getLocalParameters(self) -> RTCIceParameters:
        """
        Get local ICE parameters.
        
        Returns:
        RTCIceParameters: Local ICE username fragment and password
        """

RTCDtlsTransport Class

Provides DTLS (Datagram Transport Layer Security) encryption over ICE transport.

class RTCDtlsTransport:
    """DTLS transport for secure communication."""
    
    @property
    def state(self) -> str:
        """Transport state: "new", "connecting", "connected", "closed", "failed" """
    
    @property
    def transport(self) -> RTCIceTransport:
        """Associated ICE transport"""
    
    def getLocalParameters(self) -> RTCDtlsParameters:
        """
        Get local DTLS parameters.
        
        Returns:
        RTCDtlsParameters: Local DTLS fingerprints and role
        """
    
    async def start(self, remoteParameters: RTCDtlsParameters) -> None:
        """
        Start DTLS handshake.
        
        Parameters:
        - remoteParameters (RTCDtlsParameters): Remote DTLS parameters
        """
    
    async def stop(self) -> None:
        """Stop DTLS transport"""

RTCSctpTransport Class

SCTP (Stream Control Transmission Protocol) transport for data channels.

class RTCSctpTransport:
    """SCTP transport for data channels."""
    
    @property
    def is_server(self) -> bool:
        """Whether transport acts as SCTP server"""
    
    @property
    def maxChannels(self) -> int:
        """Maximum number of simultaneous data channels"""
    
    @property
    def port(self) -> int:
        """Local SCTP port"""
    
    @property
    def state(self) -> str:
        """Transport state: "connecting", "connected", "closed" """
    
    @property
    def transport(self) -> RTCDtlsTransport:
        """Associated DTLS transport"""
    
    @classmethod
    def getCapabilities(cls) -> RTCSctpCapabilities:
        """
        Get SCTP capabilities.
        
        Returns:
        RTCSctpCapabilities: Maximum message size
        """
    
    async def start(self, remoteCaps: RTCSctpCapabilities, remotePort: int = None) -> None:
        """
        Start SCTP association.
        
        Parameters:
        - remoteCaps (RTCSctpCapabilities): Remote SCTP capabilities
        - remotePort (int, optional): Remote SCTP port
        """
    
    async def stop(self) -> None:
        """Stop SCTP transport"""

Transport Parameters and Data Types

Configuration objects for transport establishment.

class RTCIceCandidate:
    """ICE candidate information."""
    
    def __init__(self, candidate: str, sdpMid: str = None, sdpMLineIndex: int = None):
        """
        Create ICE candidate.
        
        Parameters:
        - candidate (str): Candidate string
        - sdpMid (str, optional): Media identifier
        - sdpMLineIndex (int, optional): Media line index
        """
    
    @property
    def candidate(self) -> str:
        """Candidate string"""
    
    @property
    def sdpMid(self) -> str:
        """Media identifier"""
    
    @property
    def sdpMLineIndex(self) -> int:
        """Media line index"""

class RTCIceParameters:
    """ICE connection parameters."""
    
    @property
    def usernameFragment(self) -> str:
        """ICE username fragment"""
    
    @property
    def password(self) -> str:
        """ICE password"""

class RTCDtlsParameters:
    """DTLS connection parameters."""
    
    @property
    def fingerprints(self) -> list:
        """List of RTCDtlsFingerprint objects"""
    
    @property
    def role(self) -> str:
        """DTLS role: "client" or "server" """

class RTCDtlsFingerprint:
    """DTLS certificate fingerprint."""
    
    @property
    def algorithm(self) -> str:
        """Hash algorithm (e.g., "sha-256")"""
    
    @property
    def value(self) -> str:
        """Fingerprint value"""

class RTCSctpCapabilities:
    """SCTP transport capabilities."""
    
    @property
    def maxMessageSize(self) -> int:
        """Maximum message size in bytes"""

Usage Examples

ICE Transport Setup

import aiortc
import asyncio

async def ice_transport_setup():
    # Create ICE gatherer
    gatherer = aiortc.RTCIceGatherer()
    
    # Get default STUN servers
    default_servers = gatherer.getDefaultIceServers()
    print(f"Default STUN servers: {[server.urls for server in default_servers]}")
    
    # Add custom STUN/TURN servers
    ice_servers = [
        aiortc.RTCIceServer("stun:stun.l.google.com:19302"),
        aiortc.RTCIceServer(
            "turn:turnserver.example.com:3478",
            username="user",
            credential="pass"
        )
    ]
    
    # Start gathering candidates
    await gatherer.gather(ice_servers)
    
    # Get local parameters
    local_params = gatherer.getLocalParameters()
    print(f"Local ICE parameters: {local_params.usernameFragment}")
    
    # Get gathered candidates
    local_candidates = gatherer.getLocalCandidates()
    print(f"Gathered {len(local_candidates)} candidates")
    
    for candidate in local_candidates:
        print(f"  {candidate.candidate}")
    
    # Create ICE transport
    ice_transport = aiortc.RTCIceTransport(gatherer)
    print(f"ICE transport state: {ice_transport.state}")

DTLS Transport Security

async def dtls_transport_setup():
    # Create ICE transport first
    gatherer = aiortc.RTCIceGatherer()
    await gatherer.gather()
    ice_transport = aiortc.RTCIceTransport(gatherer)
    
    # Create DTLS transport over ICE
    dtls_transport = aiortc.RTCDtlsTransport(ice_transport)
    
    # Get local DTLS parameters
    local_dtls_params = dtls_transport.getLocalParameters()
    print(f"DTLS role: {local_dtls_params.role}")
    print("DTLS fingerprints:")
    for fp in local_dtls_params.fingerprints:
        print(f"  {fp.algorithm}: {fp.value}")
    
    # Monitor transport state
    @dtls_transport.on("statechange")
    def on_state_change():
        print(f"DTLS state changed to: {dtls_transport.state}")
    
    print(f"Initial DTLS state: {dtls_transport.state}")

SCTP Data Transport

async def sctp_transport_setup():
    # Create DTLS transport (simplified setup)
    gatherer = aiortc.RTCIceGatherer()
    await gatherer.gather()
    ice_transport = aiortc.RTCIceTransport(gatherer)
    dtls_transport = aiortc.RTCDtlsTransport(ice_transport)
    
    # Create SCTP transport over DTLS
    sctp_transport = aiortc.RTCSctpTransport(dtls_transport)
    
    # Get SCTP capabilities
    capabilities = aiortc.RTCSctpTransport.getCapabilities()
    print(f"Max SCTP message size: {capabilities.maxMessageSize} bytes")
    
    # Monitor SCTP transport
    print(f"SCTP port: {sctp_transport.port}")
    print(f"Max channels: {sctp_transport.maxChannels}")
    print(f"Is server: {sctp_transport.is_server}")
    print(f"Initial state: {sctp_transport.state}")
    
    @sctp_transport.on("statechange")
    def on_state_change():
        print(f"SCTP state changed to: {sctp_transport.state}")

Complete Transport Stack

async def complete_transport_stack():
    """Demonstrate the complete transport stack: ICE -> DTLS -> SCTP."""
    
    # 1. ICE Layer - Connectivity
    gatherer = aiortc.RTCIceGatherer()
    
    # Configure STUN/TURN servers
    ice_servers = [aiortc.RTCIceServer("stun:stun.l.google.com:19302")]
    await gatherer.gather(ice_servers)
    
    ice_transport = aiortc.RTCIceTransport(gatherer)
    
    # 2. DTLS Layer - Security
    dtls_transport = aiortc.RTCDtlsTransport(ice_transport)
    
    # 3. SCTP Layer - Data delivery
    sctp_transport = aiortc.RTCSctpTransport(dtls_transport)
    
    # Monitor all transport states
    @ice_transport.on("statechange")
    def on_ice_state():
        print(f"ICE: {ice_transport.state}")
    
    @dtls_transport.on("statechange") 
    def on_dtls_state():
        print(f"DTLS: {dtls_transport.state}")
    
    @sctp_transport.on("statechange")
    def on_sctp_state():
        print(f"SCTP: {sctp_transport.state}")
    
    print("Transport stack created:")
    print(f"  ICE: {ice_transport.state}")
    print(f"  DTLS: {dtls_transport.state}")  
    print(f"  SCTP: {sctp_transport.state}")
    
    return {
        "ice": ice_transport,
        "dtls": dtls_transport,
        "sctp": sctp_transport
    }

ICE Candidate Exchange

async def ice_candidate_exchange():
    """Simulate ICE candidate exchange between two peers."""
    
    # Create two peers
    gatherer1 = aiortc.RTCIceGatherer()
    gatherer2 = aiortc.RTCIceGatherer()
    
    # Gather candidates for both peers
    await gatherer1.gather()
    await gatherer2.gather()
    
    # Create ICE transports
    ice1 = aiortc.RTCIceTransport(gatherer1)
    ice2 = aiortc.RTCIceTransport(gatherer2)
    
    # Exchange parameters
    params1 = gatherer1.getLocalParameters()
    params2 = gatherer2.getLocalParameters()
    
    print(f"Peer 1 ICE params: {params1.usernameFragment}")
    print(f"Peer 2 ICE params: {params2.usernameFragment}")
    
    # Exchange candidates
    candidates1 = gatherer1.getLocalCandidates()
    candidates2 = gatherer2.getLocalCandidates()
    
    print(f"Peer 1 has {len(candidates1)} candidates")
    print(f"Peer 2 has {len(candidates2)} candidates")
    
    # Add remote candidates
    for candidate in candidates1:
        ice2.addRemoteCandidate(candidate)
    
    for candidate in candidates2:
        ice1.addRemoteCandidate(candidate)
    
    # Start connectivity checks
    await ice1.start(params2)
    await ice2.start(params1)
    
    print("ICE connectivity checks started")

Transport Error Handling

async def transport_error_handling():
    """Demonstrate transport error handling."""
    
    try:
        # Create transport stack
        gatherer = aiortc.RTCIceGatherer()
        ice_transport = aiortc.RTCIceTransport(gatherer)
        dtls_transport = aiortc.RTCDtlsTransport(ice_transport)
        
        # Set up error handlers
        @ice_transport.on("error")
        def on_ice_error(error):
            print(f"ICE error: {error}")
        
        @dtls_transport.on("error")
        def on_dtls_error(error):
            print(f"DTLS error: {error}")
        
        # Attempt to start without proper setup (will cause errors)
        invalid_params = aiortc.RTCIceParameters()
        await ice_transport.start(invalid_params)
        
    except Exception as e:
        print(f"Transport setup failed: {e}")
    
    finally:
        # Clean up transports
        if 'dtls_transport' in locals():
            await dtls_transport.stop()
        if 'ice_transport' in locals():
            await ice_transport.stop()
        
        print("Transports cleaned up")

Transport State Monitoring

async def monitor_transport_states():
    """Monitor transport state transitions."""
    
    # Create transports
    gatherer = aiortc.RTCIceGatherer()
    await gatherer.gather()
    
    ice_transport = aiortc.RTCIceTransport(gatherer)
    dtls_transport = aiortc.RTCDtlsTransport(ice_transport)
    sctp_transport = aiortc.RTCSctpTransport(dtls_transport)
    
    # State tracking
    states = {
        "ice": ice_transport.state,
        "dtls": dtls_transport.state,
        "sctp": sctp_transport.state
    }
    
    def print_states():
        print(f"States - ICE: {states['ice']}, DTLS: {states['dtls']}, SCTP: {states['sctp']}")
    
    # Set up state change handlers
    @ice_transport.on("statechange")
    def on_ice_change():
        states["ice"] = ice_transport.state
        print_states()
    
    @dtls_transport.on("statechange")
    def on_dtls_change():
        states["dtls"] = dtls_transport.state
        print_states()
    
    @sctp_transport.on("statechange")  
    def on_sctp_change():
        states["sctp"] = sctp_transport.state
        print_states()
    
    print("Initial states:")
    print_states()
    
    # Monitor for state changes
    await asyncio.sleep(10)
    
    print("Final states:")
    print_states()

Install with Tessl CLI

npx tessl i tessl/pypi-aiortc

docs

configuration.md

data-channels.md

index.md

media-streaming.md

network-transport.md

peer-connection.md

rtp-transport.md

statistics.md

tile.json