or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/livekit@1.0.x

docs

audio-frames-sources.mdaudio-processing.mdaudio-tracks.mddata-streaming.mde2ee.mdevents.mdindex.mdparticipants.mdroom.mdrpc.mdtrack-publications.mdtranscription.mdtypes-enums.mdutilities.mdvideo-frames-sources.mdvideo-processing.mdvideo-tracks.md
tile.json

tessl/pypi-livekit

tessl install tessl/pypi-livekit@1.0.0

Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities

video-tracks.mddocs/

Video Tracks

Overview

Video tracks represent video media streams in a LiveKit room. The SDK provides LocalVideoTrack for publishing local video and RemoteVideoTrack for receiving video from remote participants.

Key concepts:

  • LocalVideoTrack: Created from VideoSource for publishing video
  • RemoteVideoTrack: Received from remote participants for consumption
  • VideoSource: Provides video frames to LocalVideoTrack
  • VideoStream: Consumes video frames from RemoteVideoTrack
  • Mute/Unmute: Control video transmission (sends black frames when muted)
  • Resolution: Width and height in pixels
  • Pixel formats: RGBA, I420, NV12, etc.
  • Codecs: VP8, H264, AV1, VP9, H265

Import

from livekit import (
    LocalVideoTrack,
    RemoteVideoTrack,
    VideoTrack,
    VideoSource,
    VideoFrame,
    VideoBufferType,
    VideoRotation,
    TrackKind,
)

Classes

LocalVideoTrack

class LocalVideoTrack(Track):
    """Represents a local video track.

    Local video tracks are created from a VideoSource and can be
    published to the room for other participants to receive.
    """

    def __init__(self, info: proto_track.OwnedTrack) -> None:
        """Initialize a LocalVideoTrack.

        Args:
            info: Internal track information

        Note:
            Typically created via create_video_track() static method
            rather than direct instantiation.
        """

Static Methods

@staticmethod
def create_video_track(name: str, source: VideoSource) -> LocalVideoTrack:
    """Create a local video track from a video source.

    Args:
        name: Name for the track
             Type: str
             Displayed to other participants
             Examples: "camera", "screen", "presentation"
        source: VideoSource instance providing video frames
               Must be already created
               Provides video frames to track

    Returns:
        LocalVideoTrack: Track ready for publishing
        
    Raises:
        ValueError: If name empty or source invalid
        RuntimeError: If track creation fails

    Example:
        >>> from livekit import VideoSource, LocalVideoTrack
        >>> 
        >>> # Create video source
        >>> source = VideoSource(width=1920, height=1080)
        >>> 
        >>> # Create track from source
        >>> track = LocalVideoTrack.create_video_track("camera", source)
        >>> 
        >>> # Now publish track
        >>> await room.local_participant.publish_track(track)
        
    Note:
        Track is NOT automatically published.
        Must call publish_track() to share with room.
        
        Source resolution is fixed at creation.
        To change resolution, create new source and track.
    """

Methods

def mute(self) -> None:
    """Mute the video track.

    Stops sending video frames to remote participants.
    The track publication remains active but sends black frames.
    
    Returns:
        None (synchronous operation)

    Example:
        >>> track.mute()
        >>> print(f"Muted: {track.muted}")  # True
        
    Note:
        Muting is immediate and synchronous.
        Sends black frames (not actual video).
        Connection maintained (no re-negotiation).
        Saves bandwidth (black frames compress efficiently).
        
        Triggers 'track_muted' event for all participants.
        
        Use cases:
        - Camera off (privacy)
        - Bandwidth saving
        - Screen share pause
    """

def unmute(self) -> None:
    """Unmute the video track.

    Resumes sending video frames to remote participants.
    
    Returns:
        None (synchronous operation)

    Example:
        >>> track.unmute()
        >>> print(f"Muted: {track.muted}")  # False
        
    Note:
        Unmuting is immediate and synchronous.
        Resumes sending actual video immediately.
        
        Triggers 'track_unmuted' event for all participants.
    """

RemoteVideoTrack

class RemoteVideoTrack(Track):
    """Represents a remote video track.

    Remote video tracks are received from other participants
    and can be consumed via VideoStream.
    
    Read-only - cannot mute remote tracks (they control muting).
    """

    def __init__(self, info: proto_track.OwnedTrack) -> None:
        """Initialize a RemoteVideoTrack.

        Args:
            info: Internal track information

        Note:
            Created automatically by the SDK when subscribing
            to remote participants' video tracks.
            Access via track_subscribed event.
        """

Type Aliases

VideoTrack = Union[LocalVideoTrack, RemoteVideoTrack]
"""Union type for video tracks.

Can be either local or remote.
Use for type hints when accepting any video track.
"""

Creating and Publishing Video Tracks

Basic Video Track Publishing

from livekit import Room, VideoSource, LocalVideoTrack, TrackPublishOptions

async def publish_video(room: Room):
    """Publish a local video track to the room."""
    # Create video source
    # width, height: Resolution in pixels
    source = VideoSource(width=1920, height=1080)

    # Create track from source
    track = LocalVideoTrack.create_video_track("camera", source)

    # Publish track with default options
    options = TrackPublishOptions()
    publication = await room.local_participant.publish_track(track, options)

    print(f"Published video track: {publication.sid}")

    return source, track, publication

Publishing with Encoding Options

from livekit import (
    TrackPublishOptions,
    VideoEncoding,
    VideoCodec,
    TrackSource,
)

# Configure video encoding
options = TrackPublishOptions()
options.video_codec = VideoCodec.VP8  # or H264, AV1, VP9, H265
options.simulcast = True   # Enable simulcast for adaptive quality
options.source = TrackSource.SOURCE_CAMERA

# Set encoding parameters
encoding = VideoEncoding()
encoding.max_bitrate = 3_000_000   # 3 Mbps
encoding.max_framerate = 30.0       # 30 fps

options.video_encoding = encoding

publication = await room.local_participant.publish_track(track, options)

Screen Share Track

from livekit import VideoSource, LocalVideoTrack, TrackPublishOptions, TrackSource

# Create screen share source (typically higher resolution)
source = VideoSource(width=1920, height=1080)
track = LocalVideoTrack.create_video_track("screen", source)

# Publish as screen share
options = TrackPublishOptions()
options.source = TrackSource.SOURCE_SCREENSHARE
options.video_codec = VideoCodec.VP8
options.simulcast = False  # Screen share typically doesn't need simulcast

# Higher bitrate for screen share (text readability)
encoding = VideoEncoding()
encoding.max_bitrate = 5_000_000  # 5 Mbps for text clarity
encoding.max_framerate = 15.0      # 15 fps sufficient for screen

options.video_encoding = encoding

publication = await room.local_participant.publish_track(track, options)

Muting and Unmuting

from livekit import LocalVideoTrack

track: LocalVideoTrack = ...

# Mute video (synchronous)
track.mute()
print(f"Track muted: {track.muted}")  # True

# Unmute video (synchronous)
track.unmute()
print(f"Track muted: {track.muted}")  # False

# Toggle mute
if track.muted:
    track.unmute()
else:
    track.mute()

Receiving Remote Video

Subscribe to Remote Video

from livekit import (
    Room,
    RemoteParticipant,
    RemoteTrackPublication,
    Track,
    TrackKind,
    VideoStream,
    VideoBufferType,
)

@room.on("track_subscribed")
def on_track_subscribed(
    track: Track,
    publication: RemoteTrackPublication,
    participant: RemoteParticipant
):
    """Handle newly subscribed remote tracks."""
    if track.kind == TrackKind.KIND_VIDEO:
        print(f"Subscribed to video from {participant.identity}")
        print(f"  Resolution: {publication.width}x{publication.height}")
        print(f"  Codec: {publication.mime_type}")

        # Create video stream to receive frames
        # format: Desired output format (converted if needed)
        video_stream = VideoStream(
            track,
            format=VideoBufferType.RGBA  # Request RGBA format for display
        )

        # Process video in background task
        asyncio.create_task(process_video_stream(video_stream))

async def process_video_stream(stream: VideoStream):
    """Process video frames from stream."""
    try:
        async for event in stream:
            frame = event.frame
            timestamp_us = event.timestamp_us
            rotation = event.rotation
            
            print(f"Received video: {frame.width}x{frame.height}, "
                  f"format: {frame.type}")
            
            # Access video data
            # data: memoryview of pixel data
            data = frame.data
            
            # Process video frame
            # Examples:
            # - Display in window
            # - Save to file
            # - Apply effects
            # - Analyze content
            # ...
    finally:
        await stream.aclose()

Manual Track Subscription

from livekit import RemoteParticipant, RemoteTrackPublication, TrackKind

participant: RemoteParticipant = ...

# Subscribe to all video tracks
for track_sid, publication in participant.track_publications.items():
    if isinstance(publication, RemoteTrackPublication):
        if publication.kind == TrackKind.KIND_VIDEO:
            # Subscribe to video track
            publication.set_subscribed(True)

            # Check if track is available
            if publication.track:
                print(f"Subscribed to: {publication.track.name}")

Capturing Video Frames

Capture Basic Frame

from livekit import VideoSource, VideoFrame, VideoBufferType, VideoRotation

source = VideoSource(width=1920, height=1080)

# Create RGBA frame
width, height = 1920, 1080
frame_data = bytearray(width * height * 4)  # RGBA = 4 bytes per pixel

# Fill with pixel data (example: solid color)
for i in range(0, len(frame_data), 4):
    frame_data[i] = 255     # Red
    frame_data[i+1] = 0     # Green
    frame_data[i+2] = 0     # Blue
    frame_data[i+3] = 255   # Alpha

frame = VideoFrame(
    width=width,
    height=height,
    type=VideoBufferType.RGBA,
    data=frame_data
)

# Capture frame to source
# timestamp_us: Microsecond timestamp (0 for automatic)
# rotation: Frame rotation (0, 90, 180, 270 degrees)
source.capture_frame(
    frame,
    timestamp_us=0,
    rotation=VideoRotation.VIDEO_ROTATION_0
)

Capture with Timestamp

import time
from livekit import VideoSource, VideoFrame

source = VideoSource(1280, 720)

# Get current timestamp in microseconds
timestamp_us = int(time.time() * 1_000_000)

# Capture with explicit timestamp
source.capture_frame(frame, timestamp_us=timestamp_us)

Capture with Rotation

from livekit import VideoRotation

# Rotate video 90 degrees clockwise
source.capture_frame(
    frame,
    rotation=VideoRotation.VIDEO_ROTATION_90
)

# Rotate 180 degrees
source.capture_frame(
    frame,
    rotation=VideoRotation.VIDEO_ROTATION_180
)

Complete Example

import asyncio
import time
from livekit import (
    Room,
    RoomOptions,
    VideoSource,
    LocalVideoTrack,
    RemoteVideoTrack,
    VideoStream,
    VideoFrame,
    VideoBufferType,
    VideoRotation,
    TrackPublishOptions,
    VideoEncoding,
    VideoCodec,
    TrackKind,
    RemoteParticipant,
    Track,
    RemoteTrackPublication,
)

async def main():
    room = Room()

    # Handle remote video tracks
    @room.on("track_subscribed")
    def on_track_subscribed(
        track: Track,
        publication: RemoteTrackPublication,
        participant: RemoteParticipant
    ):
        if track.kind == TrackKind.KIND_VIDEO:
            print(f"Video track from {participant.identity}: {track.name}")
            print(f"  SID: {track.sid}")
            print(f"  Muted: {track.muted}")
            print(f"  Publication: {publication.width}x{publication.height}")
            print(f"  Codec: {publication.mime_type}")

            # Process the video
            asyncio.create_task(receive_video(track))

    # Handle track mute changes
    @room.on("track_muted")
    def on_track_muted(participant, publication):
        if publication.kind == TrackKind.KIND_VIDEO:
            print(f"{participant.identity}'s video muted")

    @room.on("track_unmuted")
    def on_track_unmuted(participant, publication):
        if publication.kind == TrackKind.KIND_VIDEO:
            print(f"{participant.identity}'s video unmuted")

    # Connect to room
    await room.connect(url, token, RoomOptions(auto_subscribe=True))

    # Create and publish local video track
    source = VideoSource(width=1280, height=720)
    track = LocalVideoTrack.create_video_track("my-camera", source)

    # Publish with options
    options = TrackPublishOptions()
    options.video_codec = VideoCodec.VP8
    options.simulcast = True
    options.source = TrackSource.SOURCE_CAMERA

    # Configure encoding
    encoding = VideoEncoding()
    encoding.max_bitrate = 2_000_000  # 2 Mbps
    encoding.max_framerate = 30.0
    options.video_encoding = encoding

    publication = await room.local_participant.publish_track(track, options)

    print(f"Published video track: {publication.sid}")

    # Generate and capture video
    asyncio.create_task(generate_video(source))

    # Mute/unmute demonstration
    await asyncio.sleep(5)
    track.mute()
    print("Video muted")

    await asyncio.sleep(2)
    track.unmute()
    print("Video unmuted")

    # Get statistics
    stats = await track.get_stats()
    print(f"Track stats: {len(stats)} items")

    # Keep running
    await asyncio.sleep(30)

    # Cleanup
    await room.local_participant.unpublish_track(track.sid)
    await source.aclose()
    await room.disconnect()

async def generate_video(source: VideoSource):
    """Generate video frames and capture to source."""
    width, height = 1280, 720
    fps = 30
    frame_duration = 1.0 / fps

    frame_count = 0

    while True:
        # Create frame with test pattern
        frame_data = bytearray(width * height * 4)  # RGBA

        # Fill with color (example: cycling colors)
        # Red channel varies with frame count
        r = (frame_count * 1) % 256
        g = (frame_count * 2) % 256
        b = (frame_count * 3) % 256
        color = (r, g, b, 255)
        
        for i in range(0, len(frame_data), 4):
            frame_data[i:i+4] = color

        frame = VideoFrame(
            width=width,
            height=height,
            type=VideoBufferType.RGBA,
            data=frame_data
        )

        # Capture frame with timestamp
        timestamp_us = int(time.time() * 1_000_000)
        source.capture_frame(
            frame,
            timestamp_us=timestamp_us,
            rotation=VideoRotation.VIDEO_ROTATION_0
        )

        frame_count += 1

        # Wait for next frame
        await asyncio.sleep(frame_duration)

async def receive_video(track: RemoteVideoTrack):
    """Receive and process remote video."""
    # Create video stream with desired format
    # format: Output format (SDK converts if needed)
    video_stream = VideoStream(
        track,
        format=VideoBufferType.RGBA  # Convert to RGBA for display
    )

    try:
        async for event in video_stream:
            frame = event.frame
            timestamp_us = event.timestamp_us
            rotation = event.rotation

            print(f"Received video frame: {frame.width}x{frame.height}, "
                  f"format: {frame.type}, timestamp: {timestamp_us}")

            # Access video data
            # data: memoryview of pixel data
            # RGBA: 4 bytes per pixel
            # I420: 1.5 bytes per pixel (planar)
            data = frame.data

            # Process video frame
            # Examples:
            # - Display in UI
            # - Save to file
            # - Apply filters
            # - Detect objects
            # ...

    finally:
        await video_stream.aclose()

if __name__ == "__main__":
    asyncio.run(main())

Video Formats

Supported Video Buffer Types

from livekit import VideoBufferType

# RGB formats (packed, 4 bytes per pixel for RGBA variants)
VideoBufferType.RGBA   # Red, Green, Blue, Alpha (most common)
VideoBufferType.ABGR   # Alpha, Blue, Green, Red
VideoBufferType.ARGB   # Alpha, Red, Green, Blue
VideoBufferType.BGRA   # Blue, Green, Red, Alpha
VideoBufferType.RGB24  # Red, Green, Blue (3 bytes per pixel, no alpha)

# YUV formats (planar, more efficient for encoding)
VideoBufferType.I420   # YUV 4:2:0 planar (most common for encoding)
VideoBufferType.I420A  # YUV 4:2:0 planar with alpha
VideoBufferType.I422   # YUV 4:2:2 planar (higher chroma resolution)
VideoBufferType.I444   # YUV 4:4:4 planar (full chroma, highest quality)
VideoBufferType.I010   # YUV 4:2:0 10-bit planar (HDR)
VideoBufferType.NV12   # YUV 4:2:0 semi-planar (hardware encoders)

Format Conversion

from livekit import VideoFrame, VideoBufferType

# Convert RGBA to I420 (for encoding)
frame_rgba: VideoFrame = ...
frame_i420 = frame_rgba.convert(VideoBufferType.I420)

# Convert I420 to RGBA (for display)
frame_display = frame_i420.convert(VideoBufferType.RGBA)

# Convert with vertical flip (for some camera APIs)
frame_flipped = frame_rgba.convert(VideoBufferType.RGBA, flip_y=True)

Video Codecs

from livekit import VideoCodec

# Available codecs
VideoCodec.VP8    # VP8 codec (best compatibility, all browsers)
VideoCodec.H264   # H.264 codec (hardware acceleration, widely supported)
VideoCodec.AV1    # AV1 codec (best compression, newer browsers only)
VideoCodec.VP9    # VP9 codec (better than VP8, good support)
VideoCodec.H265   # H.265 codec (better than H264, limited browser support)

# Codec selection guide:
# - VP8: Best compatibility, works everywhere
# - H264: Best for hardware encoding, good quality
# - AV1: Best compression, but CPU intensive
# - VP9: Good middle ground
# - H265: Good compression, limited support

Video Rotation

from livekit import VideoRotation

VideoRotation.VIDEO_ROTATION_0    # No rotation (0 degrees)
VideoRotation.VIDEO_ROTATION_90   # 90 degrees clockwise
VideoRotation.VIDEO_ROTATION_180  # 180 degrees
VideoRotation.VIDEO_ROTATION_270  # 270 degrees clockwise (90 CCW)

# Handle device orientation
if is_landscape:
    rotation = VideoRotation.VIDEO_ROTATION_0
elif is_portrait:
    rotation = VideoRotation.VIDEO_ROTATION_90

source.capture_frame(frame, rotation=rotation)

Best Practices

1. Choose Appropriate Resolution

# Common resolutions
resolutions = {
    "4K": (3840, 2160),      # Very high quality, very high bandwidth
    "1080p": (1920, 1080),   # High quality, high bandwidth
    "720p": (1280, 720),     # Good quality, moderate bandwidth (recommended)
    "480p": (640, 480),      # Acceptable quality, low bandwidth
    "360p": (640, 360),      # Low quality, very low bandwidth
}

# For screen sharing: Match screen resolution or scale down
screen_width, screen_height = 1920, 1080
source = VideoSource(width=screen_width, height=screen_height)

# For camera: Consider network bandwidth
# Mobile: 640x480 or 1280x720
# Desktop: 1280x720 or 1920x1080
source = VideoSource(width=1280, height=720)

2. Enable Simulcast for Adaptive Quality

options = TrackPublishOptions()
options.simulcast = True  # Send multiple qualities (low, medium, high)

# Simulcast benefits:
# - Receivers select appropriate quality
# - Better for mixed network conditions
# - Bandwidth efficient for multiple receivers

# Simulcast costs:
# - Higher CPU usage (encoding multiple streams)
# - Higher upload bandwidth
# - Not needed for 1-to-1 calls

3. Set Appropriate Frame Rate

encoding = VideoEncoding()

# For camera video
encoding.max_framerate = 30.0  # 30 fps for smooth video

# For screen share
encoding.max_framerate = 15.0  # 15 fps sufficient for screen content

# For low bandwidth
encoding.max_framerate = 15.0  # Reduce fps to save bandwidth

4. Clean Up Resources

source = None
track = None
try:
    source = VideoSource(1280, 720)
    track = LocalVideoTrack.create_video_track("camera", source)
    await room.local_participant.publish_track(track)
    # ... use track
finally:
    if track:
        await room.local_participant.unpublish_track(track.sid)
    if source:
        await source.aclose()

5. Handle Format Conversion Efficiently

# Good: Convert once and reuse
rgba_frame: VideoFrame = ...
i420_frame = rgba_frame.convert(VideoBufferType.I420)
# Use i420_frame multiple times

# Bad: Convert repeatedly in hot path
# for _ in range(100):
#     i420 = rgba.convert(VideoBufferType.I420)  # Expensive conversion each time

Advanced Patterns

Frame Rate Control

import asyncio
import time

async def generate_at_fps(source: VideoSource, fps: float):
    """Generate video at specific frame rate."""
    frame_duration = 1.0 / fps
    width, height = 1280, 720
    
    last_time = time.time()
    frame_count = 0
    
    while True:
        # Generate frame
        frame_data = bytearray(width * height * 4)
        frame = VideoFrame(width, height, VideoBufferType.RGBA, frame_data)
        
        # Capture
        source.capture_frame(frame)
        
        frame_count += 1
        
        # Calculate next frame time
        next_time = last_time + frame_duration
        now = time.time()
        
        # Sleep until next frame (with drift compensation)
        sleep_time = max(0, next_time - now)
        await asyncio.sleep(sleep_time)
        
        last_time = next_time
        
        # Log actual FPS periodically
        if frame_count % (fps * 5) == 0:  # Every 5 seconds
            elapsed = time.time() - (last_time - frame_duration * frame_count)
            actual_fps = frame_count / elapsed
            print(f"Actual FPS: {actual_fps:.1f}")

Resolution Scaling

def scale_frame(frame: VideoFrame, target_width: int, target_height: int) -> VideoFrame:
    """Scale video frame to target resolution.
    
    Note: This is a simplified example. Production code should use
    proper image scaling library like PIL or OpenCV.
    """
    # Convert to RGBA for processing
    if frame.type != VideoBufferType.RGBA:
        frame = frame.convert(VideoBufferType.RGBA)
    
    # Use image library for scaling
    # from PIL import Image
    # img = Image.frombytes("RGBA", (frame.width, frame.height), bytes(frame.data))
    # img_scaled = img.resize((target_width, target_height))
    # return VideoFrame(target_width, target_height, VideoBufferType.RGBA, img_scaled.tobytes())
    
    # For now, return as-is (implement scaling as needed)
    return frame

See Also

  • Video Frames and Sources - Low-level video handling
  • Video Processing - Video frame conversion
  • Track Publications - Track publication management
  • Participants - Publishing and managing tracks