Python implementation of WebRTC and ORTC for real-time peer-to-peer communication
87
Audio and video track management with abstract base classes for custom media sources and built-in track implementations for testing and development.
Abstract base class for all media tracks providing common interface and event handling.
class MediaStreamTrack:
"""Abstract base class for media tracks."""
def __init__(self):
"""Initialize media track with unique ID and event emitter."""
@property
def id(self) -> str:
"""Unique track identifier"""
@property
def kind(self) -> str:
"""Media kind: "audio", "video", or "unknown" """
@property
def readyState(self) -> str:
"""Track state: "live" or "ended" """
async def recv(self):
"""
Receive next media frame or audio data.
This is an abstract method that must be implemented by subclasses.
Returns:
AudioFrame or VideoFrame: Next media data
"""
def stop(self) -> None:
"""Stop the track and emit 'ended' event."""Concrete audio track implementation that generates silence by default, suitable for testing or as a base class for custom audio sources.
class AudioStreamTrack(MediaStreamTrack):
"""Audio track that generates silence by default."""
def __init__(self):
"""Initialize audio track."""
@property
def kind(self) -> str:
"""Always returns "audio" """
async def recv(self):
"""
Generate silent audio frame.
Returns:
AudioFrame: 20ms of silence at 8kHz mono
"""Concrete video track implementation that generates green frames by default, suitable for testing or as a base class for custom video sources.
class VideoStreamTrack(MediaStreamTrack):
"""Video track that generates green frames by default."""
def __init__(self):
"""Initialize video track."""
@property
def kind(self) -> str:
"""Always returns "video" """
async def recv(self):
"""
Generate green video frame.
Returns:
VideoFrame: 640x480 green frame at 30fps
"""
async def next_timestamp(self):
"""
Get next timestamp for frame timing.
Returns:
tuple: (pts, time_base) for frame timing
"""Audio and video frame objects used by media tracks.
class AudioFrame:
"""Audio frame containing PCM data."""
def __init__(self, format, layout, samples):
"""
Create audio frame.
Parameters:
- format: Audio format (e.g., "s16")
- layout: Channel layout (e.g., "mono", "stereo")
- samples: Number of samples per channel
"""
@property
def pts(self) -> int:
"""Presentation timestamp"""
@property
def time_base(self):
"""Time base for timestamp"""
class VideoFrame:
"""Video frame containing image data."""
def __init__(self, width, height, format="yuv420p"):
"""
Create video frame.
Parameters:
- width (int): Frame width in pixels
- height (int): Frame height in pixels
- format (str): Pixel format (default: "yuv420p")
"""
@property
def pts(self) -> int:
"""Presentation timestamp"""
@property
def time_base(self):
"""Time base for timestamp"""
@property
def width(self) -> int:
"""Frame width in pixels"""
@property
def height(self) -> int:
"""Frame height in pixels"""Timing and format constants for media processing.
# Audio timing constants
AUDIO_PTIME = 0.020 # 20ms audio packet time
# Video timing constants
VIDEO_CLOCK_RATE = 90000 # 90kHz video clock
VIDEO_PTIME = 1/30 # 30fps frame timeException raised when media stream operations fail.
class MediaStreamError(Exception):
"""Exception for media stream errors."""import aiortc
import asyncio
import numpy as np
class SineWaveAudioTrack(aiortc.AudioStreamTrack):
"""Custom audio track generating sine wave."""
def __init__(self, frequency=440, sample_rate=8000):
super().__init__()
self.frequency = frequency
self.sample_rate = sample_rate
self.samples = 0
async def recv(self):
# Generate 20ms of sine wave audio
samples_per_frame = int(self.sample_rate * 0.020) # 20ms
t = np.arange(samples_per_frame) / self.sample_rate + self.samples / self.sample_rate
# Generate sine wave
audio_data = np.sin(2 * np.pi * self.frequency * t)
audio_data = (audio_data * 32767).astype(np.int16)
self.samples += samples_per_frame
# Create audio frame
frame = AudioFrame(format="s16", layout="mono", samples=samples_per_frame)
frame.planes[0].update(audio_data.tobytes())
# Set timing
pts, time_base = await self.next_timestamp()
frame.pts = pts
frame.time_base = time_base
return frame
# Use custom audio track
async def use_custom_audio():
pc = aiortc.RTCPeerConnection()
# Add custom sine wave audio track
audio_track = SineWaveAudioTrack(frequency=880) # A5 note
pc.addTrack(audio_track)
print(f"Added {audio_track.kind} track with ID: {audio_track.id}")import aiortc
import cv2
import numpy as np
class WebcamVideoTrack(aiortc.VideoStreamTrack):
"""Custom video track from webcam."""
def __init__(self, device_id=0):
super().__init__()
self.cap = cv2.VideoCapture(device_id)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
async def recv(self):
# Read frame from webcam
ret, frame = self.cap.read()
if not ret:
raise MediaStreamError("Failed to read from webcam")
# Convert BGR to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Create video frame
video_frame = VideoFrame.from_ndarray(frame_rgb, format="rgb24")
# Set timing
pts, time_base = await self.next_timestamp()
video_frame.pts = pts
video_frame.time_base = time_base
return video_frame
def stop(self):
super().stop()
if self.cap:
self.cap.release()
# Use custom video track
async def use_custom_video():
pc = aiortc.RTCPeerConnection()
try:
# Add webcam video track
video_track = WebcamVideoTrack(device_id=0)
pc.addTrack(video_track)
print(f"Added {video_track.kind} track with ID: {video_track.id}")
# Process some frames
for i in range(10):
frame = await video_track.recv()
print(f"Frame {i}: {frame.width}x{frame.height}")
finally:
video_track.stop()async def use_builtin_tracks():
pc = aiortc.RTCPeerConnection()
# Add default tracks (silence and green frames)
audio_track = aiortc.AudioStreamTrack()
video_track = aiortc.VideoStreamTrack()
pc.addTrack(audio_track)
pc.addTrack(video_track)
print(f"Audio track ready state: {audio_track.readyState}")
print(f"Video track ready state: {video_track.readyState}")
# Test receiving frames
audio_frame = await audio_track.recv()
video_frame = await video_track.recv()
print(f"Audio frame: {audio_frame}")
print(f"Video frame: {video_frame.width}x{video_frame.height}")
# Stop tracks when done
audio_track.stop()
video_track.stop()
print(f"Audio track final state: {audio_track.readyState}")
print(f"Video track final state: {video_track.readyState}")async def handle_track_events():
track = aiortc.VideoStreamTrack()
# Listen for track events
@track.on("ended")
def on_ended():
print("Track ended")
# Use track
print(f"Track state: {track.readyState}")
# Receive a few frames
for i in range(3):
frame = await track.recv()
print(f"Received frame {i+1}")
# Stop track (triggers 'ended' event)
track.stop()Install with Tessl CLI
npx tessl i tessl/pypi-aiortcdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10