Python implementation of WebRTC and ORTC for real-time peer-to-peer communication
87
Low-level transport layer including ICE connectivity establishment, DTLS security, and SCTP association management for reliable data delivery.
Manages ICE (Interactive Connectivity Establishment) connectivity between peers.
class RTCIceTransport:
"""ICE transport for peer connectivity establishment."""
@property
def iceGatherer(self) -> RTCIceGatherer:
"""Associated ICE gatherer"""
@property
def role(self) -> str:
"""ICE role: "controlling" or "controlled" """
@property
def state(self) -> str:
"""Transport state: "new", "checking", "connected", "completed", "failed", "disconnected", "closed" """
def addRemoteCandidate(self, candidate: RTCIceCandidate) -> None:
"""
Add a remote ICE candidate.
Parameters:
- candidate (RTCIceCandidate): Remote candidate to add
"""
def getRemoteCandidates(self) -> list:
"""
Get list of remote ICE candidates.
Returns:
list: List of RTCIceCandidate objects
"""
async def start(self, remoteParameters: RTCIceParameters) -> None:
"""
Start ICE connectivity checks.
Parameters:
- remoteParameters (RTCIceParameters): Remote ICE parameters
"""
async def stop(self) -> None:
"""Stop ICE transport"""Gathers local ICE candidates for connectivity establishment.
class RTCIceGatherer:
"""ICE candidate gatherer."""
@property
def state(self) -> str:
"""Gathering state: "new", "gathering", "complete" """
async def gather(self, iceServers=None) -> None:
"""
Gather ICE candidates.
Parameters:
- iceServers (list, optional): List of RTCIceServer objects
"""
def getDefaultIceServers(self) -> list:
"""
Get default STUN servers.
Returns:
list: List of default RTCIceServer objects
"""
def getLocalCandidates(self) -> list:
"""
Get gathered local candidates.
Returns:
list: List of RTCIceCandidate objects
"""
def getLocalParameters(self) -> RTCIceParameters:
"""
Get local ICE parameters.
Returns:
RTCIceParameters: Local ICE username fragment and password
"""Provides DTLS (Datagram Transport Layer Security) encryption over ICE transport.
class RTCDtlsTransport:
"""DTLS transport for secure communication."""
@property
def state(self) -> str:
"""Transport state: "new", "connecting", "connected", "closed", "failed" """
@property
def transport(self) -> RTCIceTransport:
"""Associated ICE transport"""
def getLocalParameters(self) -> RTCDtlsParameters:
"""
Get local DTLS parameters.
Returns:
RTCDtlsParameters: Local DTLS fingerprints and role
"""
async def start(self, remoteParameters: RTCDtlsParameters) -> None:
"""
Start DTLS handshake.
Parameters:
- remoteParameters (RTCDtlsParameters): Remote DTLS parameters
"""
async def stop(self) -> None:
"""Stop DTLS transport"""SCTP (Stream Control Transmission Protocol) transport for data channels.
class RTCSctpTransport:
"""SCTP transport for data channels."""
@property
def is_server(self) -> bool:
"""Whether transport acts as SCTP server"""
@property
def maxChannels(self) -> int:
"""Maximum number of simultaneous data channels"""
@property
def port(self) -> int:
"""Local SCTP port"""
@property
def state(self) -> str:
"""Transport state: "connecting", "connected", "closed" """
@property
def transport(self) -> RTCDtlsTransport:
"""Associated DTLS transport"""
@classmethod
def getCapabilities(cls) -> RTCSctpCapabilities:
"""
Get SCTP capabilities.
Returns:
RTCSctpCapabilities: Maximum message size
"""
async def start(self, remoteCaps: RTCSctpCapabilities, remotePort: int = None) -> None:
"""
Start SCTP association.
Parameters:
- remoteCaps (RTCSctpCapabilities): Remote SCTP capabilities
- remotePort (int, optional): Remote SCTP port
"""
async def stop(self) -> None:
"""Stop SCTP transport"""Configuration objects for transport establishment.
class RTCIceCandidate:
"""ICE candidate information."""
def __init__(self, candidate: str, sdpMid: str = None, sdpMLineIndex: int = None):
"""
Create ICE candidate.
Parameters:
- candidate (str): Candidate string
- sdpMid (str, optional): Media identifier
- sdpMLineIndex (int, optional): Media line index
"""
@property
def candidate(self) -> str:
"""Candidate string"""
@property
def sdpMid(self) -> str:
"""Media identifier"""
@property
def sdpMLineIndex(self) -> int:
"""Media line index"""
class RTCIceParameters:
"""ICE connection parameters."""
@property
def usernameFragment(self) -> str:
"""ICE username fragment"""
@property
def password(self) -> str:
"""ICE password"""
class RTCDtlsParameters:
"""DTLS connection parameters."""
@property
def fingerprints(self) -> list:
"""List of RTCDtlsFingerprint objects"""
@property
def role(self) -> str:
"""DTLS role: "client" or "server" """
class RTCDtlsFingerprint:
"""DTLS certificate fingerprint."""
@property
def algorithm(self) -> str:
"""Hash algorithm (e.g., "sha-256")"""
@property
def value(self) -> str:
"""Fingerprint value"""
class RTCSctpCapabilities:
"""SCTP transport capabilities."""
@property
def maxMessageSize(self) -> int:
"""Maximum message size in bytes"""import aiortc
import asyncio
async def ice_transport_setup():
# Create ICE gatherer
gatherer = aiortc.RTCIceGatherer()
# Get default STUN servers
default_servers = gatherer.getDefaultIceServers()
print(f"Default STUN servers: {[server.urls for server in default_servers]}")
# Add custom STUN/TURN servers
ice_servers = [
aiortc.RTCIceServer("stun:stun.l.google.com:19302"),
aiortc.RTCIceServer(
"turn:turnserver.example.com:3478",
username="user",
credential="pass"
)
]
# Start gathering candidates
await gatherer.gather(ice_servers)
# Get local parameters
local_params = gatherer.getLocalParameters()
print(f"Local ICE parameters: {local_params.usernameFragment}")
# Get gathered candidates
local_candidates = gatherer.getLocalCandidates()
print(f"Gathered {len(local_candidates)} candidates")
for candidate in local_candidates:
print(f" {candidate.candidate}")
# Create ICE transport
ice_transport = aiortc.RTCIceTransport(gatherer)
print(f"ICE transport state: {ice_transport.state}")async def dtls_transport_setup():
# Create ICE transport first
gatherer = aiortc.RTCIceGatherer()
await gatherer.gather()
ice_transport = aiortc.RTCIceTransport(gatherer)
# Create DTLS transport over ICE
dtls_transport = aiortc.RTCDtlsTransport(ice_transport)
# Get local DTLS parameters
local_dtls_params = dtls_transport.getLocalParameters()
print(f"DTLS role: {local_dtls_params.role}")
print("DTLS fingerprints:")
for fp in local_dtls_params.fingerprints:
print(f" {fp.algorithm}: {fp.value}")
# Monitor transport state
@dtls_transport.on("statechange")
def on_state_change():
print(f"DTLS state changed to: {dtls_transport.state}")
print(f"Initial DTLS state: {dtls_transport.state}")async def sctp_transport_setup():
# Create DTLS transport (simplified setup)
gatherer = aiortc.RTCIceGatherer()
await gatherer.gather()
ice_transport = aiortc.RTCIceTransport(gatherer)
dtls_transport = aiortc.RTCDtlsTransport(ice_transport)
# Create SCTP transport over DTLS
sctp_transport = aiortc.RTCSctpTransport(dtls_transport)
# Get SCTP capabilities
capabilities = aiortc.RTCSctpTransport.getCapabilities()
print(f"Max SCTP message size: {capabilities.maxMessageSize} bytes")
# Monitor SCTP transport
print(f"SCTP port: {sctp_transport.port}")
print(f"Max channels: {sctp_transport.maxChannels}")
print(f"Is server: {sctp_transport.is_server}")
print(f"Initial state: {sctp_transport.state}")
@sctp_transport.on("statechange")
def on_state_change():
print(f"SCTP state changed to: {sctp_transport.state}")async def complete_transport_stack():
"""Demonstrate the complete transport stack: ICE -> DTLS -> SCTP."""
# 1. ICE Layer - Connectivity
gatherer = aiortc.RTCIceGatherer()
# Configure STUN/TURN servers
ice_servers = [aiortc.RTCIceServer("stun:stun.l.google.com:19302")]
await gatherer.gather(ice_servers)
ice_transport = aiortc.RTCIceTransport(gatherer)
# 2. DTLS Layer - Security
dtls_transport = aiortc.RTCDtlsTransport(ice_transport)
# 3. SCTP Layer - Data delivery
sctp_transport = aiortc.RTCSctpTransport(dtls_transport)
# Monitor all transport states
@ice_transport.on("statechange")
def on_ice_state():
print(f"ICE: {ice_transport.state}")
@dtls_transport.on("statechange")
def on_dtls_state():
print(f"DTLS: {dtls_transport.state}")
@sctp_transport.on("statechange")
def on_sctp_state():
print(f"SCTP: {sctp_transport.state}")
print("Transport stack created:")
print(f" ICE: {ice_transport.state}")
print(f" DTLS: {dtls_transport.state}")
print(f" SCTP: {sctp_transport.state}")
return {
"ice": ice_transport,
"dtls": dtls_transport,
"sctp": sctp_transport
}async def ice_candidate_exchange():
"""Simulate ICE candidate exchange between two peers."""
# Create two peers
gatherer1 = aiortc.RTCIceGatherer()
gatherer2 = aiortc.RTCIceGatherer()
# Gather candidates for both peers
await gatherer1.gather()
await gatherer2.gather()
# Create ICE transports
ice1 = aiortc.RTCIceTransport(gatherer1)
ice2 = aiortc.RTCIceTransport(gatherer2)
# Exchange parameters
params1 = gatherer1.getLocalParameters()
params2 = gatherer2.getLocalParameters()
print(f"Peer 1 ICE params: {params1.usernameFragment}")
print(f"Peer 2 ICE params: {params2.usernameFragment}")
# Exchange candidates
candidates1 = gatherer1.getLocalCandidates()
candidates2 = gatherer2.getLocalCandidates()
print(f"Peer 1 has {len(candidates1)} candidates")
print(f"Peer 2 has {len(candidates2)} candidates")
# Add remote candidates
for candidate in candidates1:
ice2.addRemoteCandidate(candidate)
for candidate in candidates2:
ice1.addRemoteCandidate(candidate)
# Start connectivity checks
await ice1.start(params2)
await ice2.start(params1)
print("ICE connectivity checks started")async def transport_error_handling():
"""Demonstrate transport error handling."""
try:
# Create transport stack
gatherer = aiortc.RTCIceGatherer()
ice_transport = aiortc.RTCIceTransport(gatherer)
dtls_transport = aiortc.RTCDtlsTransport(ice_transport)
# Set up error handlers
@ice_transport.on("error")
def on_ice_error(error):
print(f"ICE error: {error}")
@dtls_transport.on("error")
def on_dtls_error(error):
print(f"DTLS error: {error}")
# Attempt to start without proper setup (will cause errors)
invalid_params = aiortc.RTCIceParameters()
await ice_transport.start(invalid_params)
except Exception as e:
print(f"Transport setup failed: {e}")
finally:
# Clean up transports
if 'dtls_transport' in locals():
await dtls_transport.stop()
if 'ice_transport' in locals():
await ice_transport.stop()
print("Transports cleaned up")async def monitor_transport_states():
"""Monitor transport state transitions."""
# Create transports
gatherer = aiortc.RTCIceGatherer()
await gatherer.gather()
ice_transport = aiortc.RTCIceTransport(gatherer)
dtls_transport = aiortc.RTCDtlsTransport(ice_transport)
sctp_transport = aiortc.RTCSctpTransport(dtls_transport)
# State tracking
states = {
"ice": ice_transport.state,
"dtls": dtls_transport.state,
"sctp": sctp_transport.state
}
def print_states():
print(f"States - ICE: {states['ice']}, DTLS: {states['dtls']}, SCTP: {states['sctp']}")
# Set up state change handlers
@ice_transport.on("statechange")
def on_ice_change():
states["ice"] = ice_transport.state
print_states()
@dtls_transport.on("statechange")
def on_dtls_change():
states["dtls"] = dtls_transport.state
print_states()
@sctp_transport.on("statechange")
def on_sctp_change():
states["sctp"] = sctp_transport.state
print_states()
print("Initial states:")
print_states()
# Monitor for state changes
await asyncio.sleep(10)
print("Final states:")
print_states()Install with Tessl CLI
npx tessl i tessl/pypi-aiortcdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10