Python implementation of WebRTC and ORTC for real-time peer-to-peer communication
87
SCTP-based data channels for reliable application data transport with support for ordered/unordered delivery, partial reliability, and binary/text messages.
Bidirectional data channel for sending application data between peers over SCTP transport.
class RTCDataChannel:
"""Data channel for peer-to-peer application data transport."""
@property
def bufferedAmount(self) -> int:
"""Number of bytes queued for transmission"""
@property
def bufferedAmountLowThreshold(self) -> int:
"""Threshold for bufferedamountlow event (default: 0)"""
@bufferedAmountLowThreshold.setter
def bufferedAmountLowThreshold(self, value: int) -> None:
"""Set buffered amount low threshold"""
@property
def id(self) -> int:
"""Channel identifier (0-65534)"""
@property
def label(self) -> str:
"""Channel label/name"""
@property
def maxPacketLifeTime(self) -> int:
"""Maximum packet lifetime in milliseconds (None if not set)"""
@property
def maxRetransmits(self) -> int:
"""Maximum retransmission attempts (None if not set)"""
@property
def negotiated(self) -> bool:
"""Whether channel was pre-negotiated"""
@property
def ordered(self) -> bool:
"""Whether delivery is ordered"""
@property
def protocol(self) -> str:
"""Subprotocol name"""
@property
def readyState(self) -> str:
"""Channel state: "connecting", "open", "closing", "closed" """
def close(self) -> None:
"""Close the data channel"""
def send(self, data) -> None:
"""
Send data over the channel.
Parameters:
- data (str or bytes): Data to send
"""Configuration parameters for data channel creation.
class RTCDataChannelParameters:
"""Data channel configuration parameters."""
def __init__(self, label: str, **options):
"""
Create data channel parameters.
Parameters:
- label (str): Channel label/name
- maxPacketLifeTime (int, optional): Maximum packet lifetime in milliseconds
- maxRetransmits (int, optional): Maximum retransmission attempts
- ordered (bool, optional): Whether to guarantee ordered delivery (default: True)
- protocol (str, optional): Subprotocol name (default: "")
- negotiated (bool, optional): Whether channel is pre-negotiated (default: False)
- id (int, optional): Numeric channel identifier (0-65534)
"""
@property
def label(self) -> str:
"""Channel label"""
@property
def maxPacketLifeTime(self) -> int:
"""Maximum packet lifetime in milliseconds"""
@property
def maxRetransmits(self) -> int:
"""Maximum retransmission attempts"""
@property
def ordered(self) -> bool:
"""Whether delivery is ordered"""
@property
def protocol(self) -> str:
"""Subprotocol name"""
@property
def negotiated(self) -> bool:
"""Whether pre-negotiated"""
@property
def id(self) -> int:
"""Channel identifier"""Data channels emit events for state changes and message reception.
# Event types:
# "open" - Channel opened and ready for data
# "message" - Message received
# "error" - Error occurred
# "close" - Channel closed
# "bufferedamountlow" - Buffered amount below thresholdimport aiortc
import asyncio
async def basic_data_channel():
pc1 = aiortc.RTCPeerConnection()
pc2 = aiortc.RTCPeerConnection()
# PC1 creates data channel
channel1 = pc1.createDataChannel("chat")
# Set up event handlers for PC1 channel
@channel1.on("open")
def on_open():
print("Channel opened on PC1")
channel1.send("Hello from PC1!")
@channel1.on("message")
def on_message(message):
print(f"PC1 received: {message}")
# PC2 handles incoming data channel
@pc2.on("datachannel")
def on_datachannel(channel):
print(f"PC2 received data channel: {channel.label}")
@channel.on("open")
def on_open():
print("Channel opened on PC2")
channel.send("Hello from PC2!")
@channel.on("message")
def on_message(message):
print(f"PC2 received: {message}")
# Perform signaling (simplified)
offer = await pc1.createOffer()
await pc1.setLocalDescription(offer)
await pc2.setRemoteDescription(offer)
answer = await pc2.createAnswer()
await pc2.setLocalDescription(answer)
await pc1.setRemoteDescription(answer)
# Wait for connection
print(f"Channel state: {channel1.readyState}")async def custom_data_channel():
pc = aiortc.RTCPeerConnection()
# Create unreliable, unordered channel for game data
game_channel = pc.createDataChannel(
"game-state",
ordered=False, # Allow out-of-order delivery
maxPacketLifeTime=100, # Drop packets after 100ms
protocol="game-protocol" # Custom subprotocol
)
# Create reliable channel for chat
chat_channel = pc.createDataChannel(
"chat",
ordered=True, # Guarantee order
maxRetransmits=5 # Retry up to 5 times
)
print(f"Game channel: ordered={game_channel.ordered}, "
f"maxPacketLifeTime={game_channel.maxPacketLifeTime}")
print(f"Chat channel: ordered={chat_channel.ordered}, "
f"maxRetransmits={chat_channel.maxRetransmits}")async def binary_data_transfer():
pc1 = aiortc.RTCPeerConnection()
pc2 = aiortc.RTCPeerConnection()
# Create data channel for file transfer
file_channel = pc1.createDataChannel("file-transfer")
@file_channel.on("open")
def on_open():
# Send binary data
with open("example.jpg", "rb") as f:
file_data = f.read()
print(f"Sending {len(file_data)} bytes")
file_channel.send(file_data)
# Handle incoming data channel on PC2
@pc2.on("datachannel")
def on_datachannel(channel):
received_data = b""
@channel.on("message")
def on_message(message):
nonlocal received_data
if isinstance(message, bytes):
received_data += message
print(f"Received {len(message)} bytes, "
f"total: {len(received_data)} bytes")
# Save received file
with open("received_file.jpg", "wb") as f:
f.write(received_data)
print("File saved!")async def flow_control_example():
pc = aiortc.RTCPeerConnection()
channel = pc.createDataChannel("bulk-data")
# Set low threshold for buffered amount
channel.bufferedAmountLowThreshold = 1024 # 1KB threshold
@channel.on("open")
def on_open():
# Send large amount of data
large_data = b"x" * 10000 # 10KB
if channel.bufferedAmount < channel.bufferedAmountLowThreshold:
channel.send(large_data)
print(f"Sent data, buffered: {channel.bufferedAmount} bytes")
@channel.on("bufferedamountlow")
def on_buffered_amount_low():
print("Buffer emptied, can send more data")
# Send more data when buffer is lowasync def multiple_channels():
pc1 = aiortc.RTCPeerConnection()
pc2 = aiortc.RTCPeerConnection()
# Create multiple channels with different purposes
channels = {
"control": pc1.createDataChannel("control", ordered=True),
"video-metadata": pc1.createDataChannel("video-meta", ordered=False),
"bulk-data": pc1.createDataChannel("bulk", maxRetransmits=3)
}
# Set up handlers for each channel
for name, channel in channels.items():
@channel.on("open")
def on_open(channel_name=name):
print(f"Channel '{channel_name}' opened")
@channel.on("message")
def on_message(message, channel_name=name):
print(f"Channel '{channel_name}' received: {message}")
# Handle incoming channels on PC2
@pc2.on("datachannel")
def on_datachannel(channel):
print(f"PC2 received channel: {channel.label} (ID: {channel.id})")
@channel.on("message")
def on_message(message):
print(f"Channel {channel.label} got: {message}")
# Echo message back
channel.send(f"Echo: {message}")async def error_handling():
pc = aiortc.RTCPeerConnection()
channel = pc.createDataChannel("test")
@channel.on("error")
def on_error(error):
print(f"Data channel error: {error}")
@channel.on("close")
def on_close():
print("Data channel closed")
@channel.on("open")
def on_open():
try:
# Try to send data
channel.send("Test message")
except Exception as e:
print(f"Send failed: {e}")
# Monitor channel state
print(f"Initial state: {channel.readyState}")
# Close channel when done
if channel.readyState == "open":
channel.close()
print(f"Final state: {channel.readyState}")Install with Tessl CLI
npx tessl i tessl/pypi-aiortcdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10