tessl install tessl/pypi-livekit@1.0.0Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities
Video tracks represent video media streams in a LiveKit room. The SDK provides LocalVideoTrack for publishing local video and RemoteVideoTrack for receiving video from remote participants.
Key concepts:
from livekit import (
LocalVideoTrack,
RemoteVideoTrack,
VideoTrack,
VideoSource,
VideoFrame,
VideoBufferType,
VideoRotation,
TrackKind,
)class LocalVideoTrack(Track):
"""Represents a local video track.
Local video tracks are created from a VideoSource and can be
published to the room for other participants to receive.
"""
def __init__(self, info: proto_track.OwnedTrack) -> None:
"""Initialize a LocalVideoTrack.
Args:
info: Internal track information
Note:
Typically created via create_video_track() static method
rather than direct instantiation.
"""@staticmethod
def create_video_track(name: str, source: VideoSource) -> LocalVideoTrack:
"""Create a local video track from a video source.
Args:
name: Name for the track
Type: str
Displayed to other participants
Examples: "camera", "screen", "presentation"
source: VideoSource instance providing video frames
Must be already created
Provides video frames to track
Returns:
LocalVideoTrack: Track ready for publishing
Raises:
ValueError: If name empty or source invalid
RuntimeError: If track creation fails
Example:
>>> from livekit import VideoSource, LocalVideoTrack
>>>
>>> # Create video source
>>> source = VideoSource(width=1920, height=1080)
>>>
>>> # Create track from source
>>> track = LocalVideoTrack.create_video_track("camera", source)
>>>
>>> # Now publish track
>>> await room.local_participant.publish_track(track)
Note:
Track is NOT automatically published.
Must call publish_track() to share with room.
Source resolution is fixed at creation.
To change resolution, create new source and track.
"""def mute(self) -> None:
"""Mute the video track.
Stops sending video frames to remote participants.
The track publication remains active but sends black frames.
Returns:
None (synchronous operation)
Example:
>>> track.mute()
>>> print(f"Muted: {track.muted}") # True
Note:
Muting is immediate and synchronous.
Sends black frames (not actual video).
Connection maintained (no re-negotiation).
Saves bandwidth (black frames compress efficiently).
Triggers 'track_muted' event for all participants.
Use cases:
- Camera off (privacy)
- Bandwidth saving
- Screen share pause
"""
def unmute(self) -> None:
"""Unmute the video track.
Resumes sending video frames to remote participants.
Returns:
None (synchronous operation)
Example:
>>> track.unmute()
>>> print(f"Muted: {track.muted}") # False
Note:
Unmuting is immediate and synchronous.
Resumes sending actual video immediately.
Triggers 'track_unmuted' event for all participants.
"""class RemoteVideoTrack(Track):
"""Represents a remote video track.
Remote video tracks are received from other participants
and can be consumed via VideoStream.
Read-only - cannot mute remote tracks (they control muting).
"""
def __init__(self, info: proto_track.OwnedTrack) -> None:
"""Initialize a RemoteVideoTrack.
Args:
info: Internal track information
Note:
Created automatically by the SDK when subscribing
to remote participants' video tracks.
Access via track_subscribed event.
"""VideoTrack = Union[LocalVideoTrack, RemoteVideoTrack]
"""Union type for video tracks.
Can be either local or remote.
Use for type hints when accepting any video track.
"""from livekit import Room, VideoSource, LocalVideoTrack, TrackPublishOptions
async def publish_video(room: Room):
"""Publish a local video track to the room."""
# Create video source
# width, height: Resolution in pixels
source = VideoSource(width=1920, height=1080)
# Create track from source
track = LocalVideoTrack.create_video_track("camera", source)
# Publish track with default options
options = TrackPublishOptions()
publication = await room.local_participant.publish_track(track, options)
print(f"Published video track: {publication.sid}")
return source, track, publicationfrom livekit import (
TrackPublishOptions,
VideoEncoding,
VideoCodec,
TrackSource,
)
# Configure video encoding
options = TrackPublishOptions()
options.video_codec = VideoCodec.VP8 # or H264, AV1, VP9, H265
options.simulcast = True # Enable simulcast for adaptive quality
options.source = TrackSource.SOURCE_CAMERA
# Set encoding parameters
encoding = VideoEncoding()
encoding.max_bitrate = 3_000_000 # 3 Mbps
encoding.max_framerate = 30.0 # 30 fps
options.video_encoding = encoding
publication = await room.local_participant.publish_track(track, options)from livekit import VideoSource, LocalVideoTrack, TrackPublishOptions, TrackSource
# Create screen share source (typically higher resolution)
source = VideoSource(width=1920, height=1080)
track = LocalVideoTrack.create_video_track("screen", source)
# Publish as screen share
options = TrackPublishOptions()
options.source = TrackSource.SOURCE_SCREENSHARE
options.video_codec = VideoCodec.VP8
options.simulcast = False # Screen share typically doesn't need simulcast
# Higher bitrate for screen share (text readability)
encoding = VideoEncoding()
encoding.max_bitrate = 5_000_000 # 5 Mbps for text clarity
encoding.max_framerate = 15.0 # 15 fps sufficient for screen
options.video_encoding = encoding
publication = await room.local_participant.publish_track(track, options)from livekit import LocalVideoTrack
track: LocalVideoTrack = ...
# Mute video (synchronous)
track.mute()
print(f"Track muted: {track.muted}") # True
# Unmute video (synchronous)
track.unmute()
print(f"Track muted: {track.muted}") # False
# Toggle mute
if track.muted:
track.unmute()
else:
track.mute()from livekit import (
Room,
RemoteParticipant,
RemoteTrackPublication,
Track,
TrackKind,
VideoStream,
VideoBufferType,
)
@room.on("track_subscribed")
def on_track_subscribed(
track: Track,
publication: RemoteTrackPublication,
participant: RemoteParticipant
):
"""Handle newly subscribed remote tracks."""
if track.kind == TrackKind.KIND_VIDEO:
print(f"Subscribed to video from {participant.identity}")
print(f" Resolution: {publication.width}x{publication.height}")
print(f" Codec: {publication.mime_type}")
# Create video stream to receive frames
# format: Desired output format (converted if needed)
video_stream = VideoStream(
track,
format=VideoBufferType.RGBA # Request RGBA format for display
)
# Process video in background task
asyncio.create_task(process_video_stream(video_stream))
async def process_video_stream(stream: VideoStream):
"""Process video frames from stream."""
try:
async for event in stream:
frame = event.frame
timestamp_us = event.timestamp_us
rotation = event.rotation
print(f"Received video: {frame.width}x{frame.height}, "
f"format: {frame.type}")
# Access video data
# data: memoryview of pixel data
data = frame.data
# Process video frame
# Examples:
# - Display in window
# - Save to file
# - Apply effects
# - Analyze content
# ...
finally:
await stream.aclose()from livekit import RemoteParticipant, RemoteTrackPublication, TrackKind
participant: RemoteParticipant = ...
# Subscribe to all video tracks
for track_sid, publication in participant.track_publications.items():
if isinstance(publication, RemoteTrackPublication):
if publication.kind == TrackKind.KIND_VIDEO:
# Subscribe to video track
publication.set_subscribed(True)
# Check if track is available
if publication.track:
print(f"Subscribed to: {publication.track.name}")from livekit import VideoSource, VideoFrame, VideoBufferType, VideoRotation
source = VideoSource(width=1920, height=1080)
# Create RGBA frame
width, height = 1920, 1080
frame_data = bytearray(width * height * 4) # RGBA = 4 bytes per pixel
# Fill with pixel data (example: solid color)
for i in range(0, len(frame_data), 4):
frame_data[i] = 255 # Red
frame_data[i+1] = 0 # Green
frame_data[i+2] = 0 # Blue
frame_data[i+3] = 255 # Alpha
frame = VideoFrame(
width=width,
height=height,
type=VideoBufferType.RGBA,
data=frame_data
)
# Capture frame to source
# timestamp_us: Microsecond timestamp (0 for automatic)
# rotation: Frame rotation (0, 90, 180, 270 degrees)
source.capture_frame(
frame,
timestamp_us=0,
rotation=VideoRotation.VIDEO_ROTATION_0
)import time
from livekit import VideoSource, VideoFrame
source = VideoSource(1280, 720)
# Get current timestamp in microseconds
timestamp_us = int(time.time() * 1_000_000)
# Capture with explicit timestamp
source.capture_frame(frame, timestamp_us=timestamp_us)from livekit import VideoRotation
# Rotate video 90 degrees clockwise
source.capture_frame(
frame,
rotation=VideoRotation.VIDEO_ROTATION_90
)
# Rotate 180 degrees
source.capture_frame(
frame,
rotation=VideoRotation.VIDEO_ROTATION_180
)import asyncio
import time
from livekit import (
Room,
RoomOptions,
VideoSource,
LocalVideoTrack,
RemoteVideoTrack,
VideoStream,
VideoFrame,
VideoBufferType,
VideoRotation,
TrackPublishOptions,
VideoEncoding,
VideoCodec,
TrackKind,
RemoteParticipant,
Track,
RemoteTrackPublication,
)
async def main():
room = Room()
# Handle remote video tracks
@room.on("track_subscribed")
def on_track_subscribed(
track: Track,
publication: RemoteTrackPublication,
participant: RemoteParticipant
):
if track.kind == TrackKind.KIND_VIDEO:
print(f"Video track from {participant.identity}: {track.name}")
print(f" SID: {track.sid}")
print(f" Muted: {track.muted}")
print(f" Publication: {publication.width}x{publication.height}")
print(f" Codec: {publication.mime_type}")
# Process the video
asyncio.create_task(receive_video(track))
# Handle track mute changes
@room.on("track_muted")
def on_track_muted(participant, publication):
if publication.kind == TrackKind.KIND_VIDEO:
print(f"{participant.identity}'s video muted")
@room.on("track_unmuted")
def on_track_unmuted(participant, publication):
if publication.kind == TrackKind.KIND_VIDEO:
print(f"{participant.identity}'s video unmuted")
# Connect to room
await room.connect(url, token, RoomOptions(auto_subscribe=True))
# Create and publish local video track
source = VideoSource(width=1280, height=720)
track = LocalVideoTrack.create_video_track("my-camera", source)
# Publish with options
options = TrackPublishOptions()
options.video_codec = VideoCodec.VP8
options.simulcast = True
options.source = TrackSource.SOURCE_CAMERA
# Configure encoding
encoding = VideoEncoding()
encoding.max_bitrate = 2_000_000 # 2 Mbps
encoding.max_framerate = 30.0
options.video_encoding = encoding
publication = await room.local_participant.publish_track(track, options)
print(f"Published video track: {publication.sid}")
# Generate and capture video
asyncio.create_task(generate_video(source))
# Mute/unmute demonstration
await asyncio.sleep(5)
track.mute()
print("Video muted")
await asyncio.sleep(2)
track.unmute()
print("Video unmuted")
# Get statistics
stats = await track.get_stats()
print(f"Track stats: {len(stats)} items")
# Keep running
await asyncio.sleep(30)
# Cleanup
await room.local_participant.unpublish_track(track.sid)
await source.aclose()
await room.disconnect()
async def generate_video(source: VideoSource):
"""Generate video frames and capture to source."""
width, height = 1280, 720
fps = 30
frame_duration = 1.0 / fps
frame_count = 0
while True:
# Create frame with test pattern
frame_data = bytearray(width * height * 4) # RGBA
# Fill with color (example: cycling colors)
# Red channel varies with frame count
r = (frame_count * 1) % 256
g = (frame_count * 2) % 256
b = (frame_count * 3) % 256
color = (r, g, b, 255)
for i in range(0, len(frame_data), 4):
frame_data[i:i+4] = color
frame = VideoFrame(
width=width,
height=height,
type=VideoBufferType.RGBA,
data=frame_data
)
# Capture frame with timestamp
timestamp_us = int(time.time() * 1_000_000)
source.capture_frame(
frame,
timestamp_us=timestamp_us,
rotation=VideoRotation.VIDEO_ROTATION_0
)
frame_count += 1
# Wait for next frame
await asyncio.sleep(frame_duration)
async def receive_video(track: RemoteVideoTrack):
"""Receive and process remote video."""
# Create video stream with desired format
# format: Output format (SDK converts if needed)
video_stream = VideoStream(
track,
format=VideoBufferType.RGBA # Convert to RGBA for display
)
try:
async for event in video_stream:
frame = event.frame
timestamp_us = event.timestamp_us
rotation = event.rotation
print(f"Received video frame: {frame.width}x{frame.height}, "
f"format: {frame.type}, timestamp: {timestamp_us}")
# Access video data
# data: memoryview of pixel data
# RGBA: 4 bytes per pixel
# I420: 1.5 bytes per pixel (planar)
data = frame.data
# Process video frame
# Examples:
# - Display in UI
# - Save to file
# - Apply filters
# - Detect objects
# ...
finally:
await video_stream.aclose()
if __name__ == "__main__":
asyncio.run(main())from livekit import VideoBufferType
# RGB formats (packed, 4 bytes per pixel for RGBA variants)
VideoBufferType.RGBA # Red, Green, Blue, Alpha (most common)
VideoBufferType.ABGR # Alpha, Blue, Green, Red
VideoBufferType.ARGB # Alpha, Red, Green, Blue
VideoBufferType.BGRA # Blue, Green, Red, Alpha
VideoBufferType.RGB24 # Red, Green, Blue (3 bytes per pixel, no alpha)
# YUV formats (planar, more efficient for encoding)
VideoBufferType.I420 # YUV 4:2:0 planar (most common for encoding)
VideoBufferType.I420A # YUV 4:2:0 planar with alpha
VideoBufferType.I422 # YUV 4:2:2 planar (higher chroma resolution)
VideoBufferType.I444 # YUV 4:4:4 planar (full chroma, highest quality)
VideoBufferType.I010 # YUV 4:2:0 10-bit planar (HDR)
VideoBufferType.NV12 # YUV 4:2:0 semi-planar (hardware encoders)from livekit import VideoFrame, VideoBufferType
# Convert RGBA to I420 (for encoding)
frame_rgba: VideoFrame = ...
frame_i420 = frame_rgba.convert(VideoBufferType.I420)
# Convert I420 to RGBA (for display)
frame_display = frame_i420.convert(VideoBufferType.RGBA)
# Convert with vertical flip (for some camera APIs)
frame_flipped = frame_rgba.convert(VideoBufferType.RGBA, flip_y=True)from livekit import VideoCodec
# Available codecs
VideoCodec.VP8 # VP8 codec (best compatibility, all browsers)
VideoCodec.H264 # H.264 codec (hardware acceleration, widely supported)
VideoCodec.AV1 # AV1 codec (best compression, newer browsers only)
VideoCodec.VP9 # VP9 codec (better than VP8, good support)
VideoCodec.H265 # H.265 codec (better than H264, limited browser support)
# Codec selection guide:
# - VP8: Best compatibility, works everywhere
# - H264: Best for hardware encoding, good quality
# - AV1: Best compression, but CPU intensive
# - VP9: Good middle ground
# - H265: Good compression, limited supportfrom livekit import VideoRotation
VideoRotation.VIDEO_ROTATION_0 # No rotation (0 degrees)
VideoRotation.VIDEO_ROTATION_90 # 90 degrees clockwise
VideoRotation.VIDEO_ROTATION_180 # 180 degrees
VideoRotation.VIDEO_ROTATION_270 # 270 degrees clockwise (90 CCW)
# Handle device orientation
if is_landscape:
rotation = VideoRotation.VIDEO_ROTATION_0
elif is_portrait:
rotation = VideoRotation.VIDEO_ROTATION_90
source.capture_frame(frame, rotation=rotation)# Common resolutions
resolutions = {
"4K": (3840, 2160), # Very high quality, very high bandwidth
"1080p": (1920, 1080), # High quality, high bandwidth
"720p": (1280, 720), # Good quality, moderate bandwidth (recommended)
"480p": (640, 480), # Acceptable quality, low bandwidth
"360p": (640, 360), # Low quality, very low bandwidth
}
# For screen sharing: Match screen resolution or scale down
screen_width, screen_height = 1920, 1080
source = VideoSource(width=screen_width, height=screen_height)
# For camera: Consider network bandwidth
# Mobile: 640x480 or 1280x720
# Desktop: 1280x720 or 1920x1080
source = VideoSource(width=1280, height=720)options = TrackPublishOptions()
options.simulcast = True # Send multiple qualities (low, medium, high)
# Simulcast benefits:
# - Receivers select appropriate quality
# - Better for mixed network conditions
# - Bandwidth efficient for multiple receivers
# Simulcast costs:
# - Higher CPU usage (encoding multiple streams)
# - Higher upload bandwidth
# - Not needed for 1-to-1 callsencoding = VideoEncoding()
# For camera video
encoding.max_framerate = 30.0 # 30 fps for smooth video
# For screen share
encoding.max_framerate = 15.0 # 15 fps sufficient for screen content
# For low bandwidth
encoding.max_framerate = 15.0 # Reduce fps to save bandwidthsource = None
track = None
try:
source = VideoSource(1280, 720)
track = LocalVideoTrack.create_video_track("camera", source)
await room.local_participant.publish_track(track)
# ... use track
finally:
if track:
await room.local_participant.unpublish_track(track.sid)
if source:
await source.aclose()# Good: Convert once and reuse
rgba_frame: VideoFrame = ...
i420_frame = rgba_frame.convert(VideoBufferType.I420)
# Use i420_frame multiple times
# Bad: Convert repeatedly in hot path
# for _ in range(100):
# i420 = rgba.convert(VideoBufferType.I420) # Expensive conversion each timeimport asyncio
import time
async def generate_at_fps(source: VideoSource, fps: float):
"""Generate video at specific frame rate."""
frame_duration = 1.0 / fps
width, height = 1280, 720
last_time = time.time()
frame_count = 0
while True:
# Generate frame
frame_data = bytearray(width * height * 4)
frame = VideoFrame(width, height, VideoBufferType.RGBA, frame_data)
# Capture
source.capture_frame(frame)
frame_count += 1
# Calculate next frame time
next_time = last_time + frame_duration
now = time.time()
# Sleep until next frame (with drift compensation)
sleep_time = max(0, next_time - now)
await asyncio.sleep(sleep_time)
last_time = next_time
# Log actual FPS periodically
if frame_count % (fps * 5) == 0: # Every 5 seconds
elapsed = time.time() - (last_time - frame_duration * frame_count)
actual_fps = frame_count / elapsed
print(f"Actual FPS: {actual_fps:.1f}")def scale_frame(frame: VideoFrame, target_width: int, target_height: int) -> VideoFrame:
"""Scale video frame to target resolution.
Note: This is a simplified example. Production code should use
proper image scaling library like PIL or OpenCV.
"""
# Convert to RGBA for processing
if frame.type != VideoBufferType.RGBA:
frame = frame.convert(VideoBufferType.RGBA)
# Use image library for scaling
# from PIL import Image
# img = Image.frombytes("RGBA", (frame.width, frame.height), bytes(frame.data))
# img_scaled = img.resize((target_width, target_height))
# return VideoFrame(target_width, target_height, VideoBufferType.RGBA, img_scaled.tobytes())
# For now, return as-is (implement scaling as needed)
return frame