Python implementation of WebRTC and ORTC for real-time peer-to-peer communication
87
Connection statistics, media quality metrics, and transport performance monitoring for debugging and quality assurance.
Container for WebRTC statistics providing dictionary-like access to stats objects.
class RTCStatsReport:
"""Statistics report container."""
def __init__(self):
"""Create empty statistics report."""
def add(self, stats) -> None:
"""
Add statistics object to report.
Parameters:
- stats: Statistics object (RTCInboundRtpStreamStats, RTCOutboundRtpStreamStats, etc.)
"""
def __getitem__(self, key: str):
"""Get statistics object by ID."""
def __iter__(self):
"""Iterate over statistics IDs."""
def keys(self):
"""Get all statistics IDs."""
def values(self):
"""Get all statistics objects."""
def items(self):
"""Get (ID, stats) pairs."""Base class for all statistics objects with common properties.
class RTCStats:
"""Base class for statistics objects."""
@property
def id(self) -> str:
"""Unique statistics identifier"""
@property
def timestamp(self) -> float:
"""Timestamp when statistics were gathered (seconds since epoch)"""
@property
def type(self) -> str:
"""Statistics type identifier"""Statistics for incoming and outgoing RTP media streams.
class RTCInboundRtpStreamStats(RTCStats):
"""Statistics for inbound RTP streams."""
@property
def type(self) -> str:
"""Always "inbound-rtp" """
@property
def ssrc(self) -> int:
"""Synchronization source identifier"""
@property
def kind(self) -> str:
"""Media kind: "audio" or "video" """
@property
def trackId(self) -> str:
"""Associated track identifier"""
@property
def transportId(self) -> str:
"""Associated transport identifier"""
@property
def codecId(self) -> str:
"""Associated codec identifier"""
# Packet statistics
@property
def packetsReceived(self) -> int:
"""Total packets received"""
@property
def packetsLost(self) -> int:
"""Total packets lost"""
@property
def packetsDiscarded(self) -> int:
"""Total packets discarded"""
@property
def packetsRepaired(self) -> int:
"""Total packets repaired"""
# Byte statistics
@property
def bytesReceived(self) -> int:
"""Total bytes received"""
@property
def headerBytesReceived(self) -> int:
"""Total header bytes received"""
# Timing statistics
@property
def jitter(self) -> float:
"""Packet jitter in seconds"""
@property
def fractionLost(self) -> float:
"""Fraction of packets lost (0.0 to 1.0)"""
class RTCOutboundRtpStreamStats(RTCStats):
"""Statistics for outbound RTP streams."""
@property
def type(self) -> str:
"""Always "outbound-rtp" """
@property
def ssrc(self) -> int:
"""Synchronization source identifier"""
@property
def kind(self) -> str:
"""Media kind: "audio" or "video" """
@property
def trackId(self) -> str:
"""Associated track identifier"""
@property
def transportId(self) -> str:
"""Associated transport identifier"""
@property
def codecId(self) -> str:
"""Associated codec identifier"""
# Packet statistics
@property
def packetsSent(self) -> int:
"""Total packets sent"""
@property
def packetsLost(self) -> int:
"""Total packets lost (from RTCP feedback)"""
@property
def retransmittedPacketsSent(self) -> int:
"""Total retransmitted packets sent"""
# Byte statistics
@property
def bytesSent(self) -> int:
"""Total bytes sent"""
@property
def headerBytesSent(self) -> int:
"""Total header bytes sent"""
@property
def retransmittedBytesSent(self) -> int:
"""Total retransmitted bytes sent"""
# Quality statistics
@property
def targetBitrate(self) -> float:
"""Target bitrate in bits per second"""
@property
def framesEncoded(self) -> int:
"""Total frames encoded (video only)"""
@property
def keyFramesEncoded(self) -> int:
"""Total key frames encoded (video only)"""
class RTCRemoteInboundRtpStreamStats(RTCStats):
"""Statistics for remote inbound RTP streams (from RTCP feedback)."""
@property
def type(self) -> str:
"""Always "remote-inbound-rtp" """
@property
def ssrc(self) -> int:
"""Synchronization source identifier"""
@property
def kind(self) -> str:
"""Media kind: "audio" or "video" """
@property
def packetsLost(self) -> int:
"""Total packets lost at remote"""
@property
def fractionLost(self) -> float:
"""Fraction lost at remote (0.0 to 1.0)"""
@property
def roundTripTime(self) -> float:
"""Round trip time in seconds"""
@property
def jitter(self) -> float:
"""Jitter at remote in seconds"""
class RTCRemoteOutboundRtpStreamStats(RTCStats):
"""Statistics for remote outbound RTP streams (from RTCP feedback)."""
@property
def type(self) -> str:
"""Always "remote-outbound-rtp" """
@property
def ssrc(self) -> int:
"""Synchronization source identifier"""
@property
def kind(self) -> str:
"""Media kind: "audio" or "video" """
@property
def packetsSent(self) -> int:
"""Total packets sent by remote"""
@property
def bytesSent(self) -> int:
"""Total bytes sent by remote"""
@property
def remoteTimestamp(self) -> float:
"""Remote timestamp"""Statistics for transport layer performance.
class RTCTransportStats(RTCStats):
"""Statistics for transport layer."""
@property
def type(self) -> str:
"""Always "transport" """
@property
def bytesSent(self) -> int:
"""Total bytes sent over transport"""
@property
def bytesReceived(self) -> int:
"""Total bytes received over transport"""
@property
def dtlsState(self) -> str:
"""DTLS connection state"""
@property
def iceRole(self) -> str:
"""ICE role: "controlling" or "controlled" """
@property
def iceState(self) -> str:
"""ICE connection state"""
@property
def localCandidateId(self) -> str:
"""Selected local candidate ID"""
@property
def remoteCandidateId(self) -> str:
"""Selected remote candidate ID"""
@property
def tlsVersion(self) -> str:
"""TLS/DTLS version used"""
@property
def dtlsCipher(self) -> str:
"""DTLS cipher suite"""
@property
def srtpCipher(self) -> str:
"""SRTP cipher suite"""import aiortc
import asyncio
async def collect_basic_stats():
pc = aiortc.RTCPeerConnection()
# Add media tracks
audio_track = aiortc.AudioStreamTrack()
video_track = aiortc.VideoStreamTrack()
audio_sender = pc.addTrack(audio_track)
video_sender = pc.addTrack(video_track)
# Simulate connection setup (simplified)
offer = await pc.createOffer()
await pc.setLocalDescription(offer)
# Get connection statistics
stats_report = await pc.getStats()
print(f"Total statistics objects: {len(list(stats_report.keys()))}")
# Iterate through all statistics
for stats_id, stats in stats_report.items():
print(f"Stats ID: {stats_id}")
print(f" Type: {stats.type}")
print(f" Timestamp: {stats.timestamp}")
# Print type-specific information
if hasattr(stats, 'ssrc'):
print(f" SSRC: {stats.ssrc}")
if hasattr(stats, 'kind'):
print(f" Kind: {stats.kind}")async def monitor_rtp_stats():
pc = aiortc.RTCPeerConnection()
# Add tracks
audio_sender = pc.addTrack(aiortc.AudioStreamTrack())
video_sender = pc.addTrack(aiortc.VideoStreamTrack())
async def print_rtp_stats():
while True:
try:
# Get sender statistics
audio_stats = await audio_sender.getStats()
video_stats = await video_sender.getStats()
print("=== RTP Statistics ===")
# Process audio sender stats
for stats in audio_stats.values():
if isinstance(stats, aiortc.RTCOutboundRtpStreamStats):
print(f"Audio Outbound:")
print(f" Packets sent: {stats.packetsSent}")
print(f" Bytes sent: {stats.bytesSent}")
print(f" Packets lost: {stats.packetsLost}")
# Process video sender stats
for stats in video_stats.values():
if isinstance(stats, aiortc.RTCOutboundRtpStreamStats):
print(f"Video Outbound:")
print(f" Packets sent: {stats.packetsSent}")
print(f" Bytes sent: {stats.bytesSent}")
print(f" Frames encoded: {stats.framesEncoded}")
print(f" Key frames: {stats.keyFramesEncoded}")
await asyncio.sleep(5) # Update every 5 seconds
except Exception as e:
print(f"Error getting stats: {e}")
break
# Start monitoring
monitor_task = asyncio.create_task(print_rtp_stats())
# Let it run for a while
await asyncio.sleep(30)
monitor_task.cancel()async def analyze_receiver_stats():
pc = aiortc.RTCPeerConnection()
@pc.on("track")
def on_track(track):
print(f"Received {track.kind} track")
# Find the receiver for this track
receiver = None
for transceiver in pc.getTransceivers():
if transceiver.receiver.track == track:
receiver = transceiver.receiver
break
if receiver:
async def monitor_receiver():
while True:
try:
stats_report = await receiver.getStats()
for stats in stats_report.values():
if isinstance(stats, aiortc.RTCInboundRtpStreamStats):
print(f"{track.kind.capitalize()} Inbound:")
print(f" Packets received: {stats.packetsReceived}")
print(f" Packets lost: {stats.packetsLost}")
print(f" Bytes received: {stats.bytesReceived}")
print(f" Jitter: {stats.jitter:.4f}s")
print(f" Fraction lost: {stats.fractionLost:.2%}")
await asyncio.sleep(3)
except Exception as e:
print(f"Error monitoring receiver: {e}")
break
asyncio.create_task(monitor_receiver())async def monitor_transport_stats():
pc = aiortc.RTCPeerConnection()
async def print_transport_stats():
stats_report = await pc.getStats()
for stats in stats_report.values():
if isinstance(stats, aiortc.RTCTransportStats):
print("Transport Statistics:")
print(f" Bytes sent: {stats.bytesSent}")
print(f" Bytes received: {stats.bytesReceived}")
print(f" DTLS state: {stats.dtlsState}")
print(f" ICE state: {stats.iceState}")
print(f" ICE role: {stats.iceRole}")
print(f" TLS version: {stats.tlsVersion}")
print(f" DTLS cipher: {stats.dtlsCipher}")
print(f" SRTP cipher: {stats.srtpCipher}")
print(f" Local candidate: {stats.localCandidateId}")
print(f" Remote candidate: {stats.remoteCandidateId}")
# Monitor transport stats
await print_transport_stats()async def quality_metrics_dashboard():
"""Create a comprehensive quality metrics dashboard."""
pc = aiortc.RTCPeerConnection()
# Add tracks
audio_sender = pc.addTrack(aiortc.AudioStreamTrack())
video_sender = pc.addTrack(aiortc.VideoStreamTrack())
class QualityMetrics:
def __init__(self):
self.reset()
def reset(self):
self.total_packets_sent = 0
self.total_packets_lost = 0
self.total_bytes_sent = 0
self.average_jitter = 0.0
self.packet_loss_rate = 0.0
self.bitrate = 0.0
self.last_timestamp = None
self.last_bytes = 0
audio_metrics = QualityMetrics()
video_metrics = QualityMetrics()
async def update_quality_metrics():
nonlocal audio_metrics, video_metrics
try:
# Get all connection statistics
stats_report = await pc.getStats()
current_time = asyncio.get_event_loop().time()
for stats in stats_report.values():
if isinstance(stats, aiortc.RTCOutboundRtpStreamStats):
metrics = audio_metrics if stats.kind == "audio" else video_metrics
# Update packet statistics
metrics.total_packets_sent = stats.packetsSent
metrics.total_packets_lost = stats.packetsLost
metrics.total_bytes_sent = stats.bytesSent
# Calculate packet loss rate
total_packets = metrics.total_packets_sent + metrics.total_packets_lost
if total_packets > 0:
metrics.packet_loss_rate = metrics.total_packets_lost / total_packets
# Calculate bitrate
if metrics.last_timestamp:
time_diff = current_time - metrics.last_timestamp
byte_diff = stats.bytesSent - metrics.last_bytes
if time_diff > 0:
metrics.bitrate = (byte_diff * 8) / time_diff # bits per second
metrics.last_timestamp = current_time
metrics.last_bytes = stats.bytesSent
elif isinstance(stats, aiortc.RTCInboundRtpStreamStats):
metrics = audio_metrics if stats.kind == "audio" else video_metrics
metrics.average_jitter = stats.jitter
except Exception as e:
print(f"Error updating metrics: {e}")
def print_dashboard():
print("\n" + "="*50)
print("QUALITY METRICS DASHBOARD")
print("="*50)
print(f"AUDIO:")
print(f" Packets sent: {audio_metrics.total_packets_sent}")
print(f" Packets lost: {audio_metrics.total_packets_lost}")
print(f" Loss rate: {audio_metrics.packet_loss_rate:.2%}")
print(f" Bitrate: {audio_metrics.bitrate/1000:.1f} kbps")
print(f" Jitter: {audio_metrics.average_jitter*1000:.1f} ms")
print(f"VIDEO:")
print(f" Packets sent: {video_metrics.total_packets_sent}")
print(f" Packets lost: {video_metrics.total_packets_lost}")
print(f" Loss rate: {video_metrics.packet_loss_rate:.2%}")
print(f" Bitrate: {video_metrics.bitrate/1000:.1f} kbps")
print(f" Jitter: {video_metrics.average_jitter*1000:.1f} ms")
# Run dashboard updates
for i in range(10): # Run for 10 iterations
await update_quality_metrics()
print_dashboard()
await asyncio.sleep(2)async def export_statistics():
"""Export statistics to different formats."""
pc = aiortc.RTCPeerConnection()
# Add some tracks
pc.addTrack(aiortc.AudioStreamTrack())
pc.addTrack(aiortc.VideoStreamTrack())
# Get statistics
stats_report = await pc.getStats()
# Export to JSON
import json
stats_data = {}
for stats_id, stats in stats_report.items():
stats_dict = {
"id": stats.id,
"type": stats.type,
"timestamp": stats.timestamp
}
# Add type-specific fields
if hasattr(stats, 'ssrc'):
stats_dict['ssrc'] = stats.ssrc
if hasattr(stats, 'kind'):
stats_dict['kind'] = stats.kind
if hasattr(stats, 'packetsSent'):
stats_dict['packetsSent'] = stats.packetsSent
if hasattr(stats, 'bytesSent'):
stats_dict['bytesSent'] = stats.bytesSent
# Add more fields as needed
stats_data[stats_id] = stats_dict
# Save to file
with open("webrtc_stats.json", "w") as f:
json.dump(stats_data, f, indent=2)
print("Statistics exported to webrtc_stats.json")
# Export to CSV
import csv
with open("webrtc_stats.csv", "w", newline="") as f:
writer = csv.writer(f)
# Write header
writer.writerow(["ID", "Type", "Timestamp", "SSRC", "Kind", "PacketsSent", "BytesSent"])
# Write data
for stats_id, stats in stats_report.items():
row = [
stats.id,
stats.type,
stats.timestamp,
getattr(stats, 'ssrc', ''),
getattr(stats, 'kind', ''),
getattr(stats, 'packetsSent', ''),
getattr(stats, 'bytesSent', '')
]
writer.writerow(row)
print("Statistics exported to webrtc_stats.csv")async def statistics_alerts():
"""Monitor statistics and trigger alerts for issues."""
pc = aiortc.RTCPeerConnection()
audio_sender = pc.addTrack(aiortc.AudioStreamTrack())
video_sender = pc.addTrack(aiortc.VideoStreamTrack())
# Alert thresholds
PACKET_LOSS_THRESHOLD = 0.05 # 5%
HIGH_JITTER_THRESHOLD = 0.050 # 50ms
LOW_BITRATE_THRESHOLD = 50000 # 50 kbps
async def check_alerts():
while True:
try:
stats_report = await pc.getStats()
for stats in stats_report.values():
if isinstance(stats, aiortc.RTCOutboundRtpStreamStats):
# Check packet loss
total_packets = stats.packetsSent + stats.packetsLost
if total_packets > 0:
loss_rate = stats.packetsLost / total_packets
if loss_rate > PACKET_LOSS_THRESHOLD:
print(f"🚨 HIGH PACKET LOSS: {loss_rate:.2%} on {stats.kind}")
# Check bitrate (simplified calculation)
if hasattr(stats, 'targetBitrate') and stats.targetBitrate < LOW_BITRATE_THRESHOLD:
print(f"⚠️ LOW BITRATE: {stats.targetBitrate/1000:.1f} kbps on {stats.kind}")
elif isinstance(stats, aiortc.RTCInboundRtpStreamStats):
# Check jitter
if stats.jitter > HIGH_JITTER_THRESHOLD:
print(f"⚠️ HIGH JITTER: {stats.jitter*1000:.1f} ms on {stats.kind}")
# Check fraction lost
if stats.fractionLost > PACKET_LOSS_THRESHOLD:
print(f"🚨 HIGH LOSS FRACTION: {stats.fractionLost:.2%} on {stats.kind}")
await asyncio.sleep(5) # Check every 5 seconds
except Exception as e:
print(f"Error checking alerts: {e}")
break
# Start alert monitoring
await check_alerts()Install with Tessl CLI
npx tessl i tessl/pypi-aiortcdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10