or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/livekit@1.0.x

docs

audio-frames-sources.mdaudio-processing.mdaudio-tracks.mddata-streaming.mde2ee.mdevents.mdindex.mdparticipants.mdroom.mdrpc.mdtrack-publications.mdtranscription.mdtypes-enums.mdutilities.mdvideo-frames-sources.mdvideo-processing.mdvideo-tracks.md
tile.json

tessl/pypi-livekit

tessl install tessl/pypi-livekit@1.0.0

Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities

video-frames-sources.mddocs/

Video Frames and Sources

Overview

Low-level video handling for creating, managing, and capturing video frames to sources. Provides direct control over video data for advanced use cases.

Key concepts:

  • VideoFrame: Container for pixel data in various formats
  • VideoSource: Real-time video output for publishing
  • VideoStream: Async iterator for receiving video frames
  • Pixel formats: RGBA (packed), I420 (planar), NV12, etc.
  • Resolution: Width and height in pixels
  • Rotation: 0, 90, 180, 270 degrees

Import

from livekit import (
    VideoFrame, VideoSource, VideoStream, VideoFrameEvent,
    VideoBufferType, VideoRotation
)

VideoFrame

class VideoFrame:
    """Represents a video frame with pixel data.
    
    Stores raw pixel data in specified format.
    """

    def __init__(
        self,
        width: int,
        height: int,
        type: proto_video.VideoBufferType.ValueType,
        data: Union[bytes, bytearray, memoryview]
    ) -> None:
        """Initialize VideoFrame with pixel data.
        
        Args:
            width: Width in pixels
                  Type: int
                  Must be positive
                  Typically multiple of 2 for YUV formats
                  
            height: Height in pixels
                   Type: int
                   Must be positive
                   Typically multiple of 2 for YUV formats
                   
            type: Format type
                 Type: VideoBufferType enum
                 Examples:
                 - RGBA: 4 bytes per pixel (width * height * 4)
                 - I420: 1.5 bytes per pixel (width * height * 1.5)
                 - RGB24: 3 bytes per pixel (width * height * 3)
                 
            data: Raw pixel data
                 Type: bytes | bytearray | memoryview
                 Size must match format requirements
                 
        Raises:
            ValueError: If data size doesn't match format requirements
            
        Example:
            >>> # RGBA frame
            >>> width, height = 1920, 1080
            >>> data = bytearray(width * height * 4)
            >>> frame = VideoFrame(width, height, VideoBufferType.RGBA, data)
            >>> 
            >>> # I420 frame
            >>> data_i420 = bytearray(int(width * height * 1.5))
            >>> frame_i420 = VideoFrame(width, height, VideoBufferType.I420, data_i420)
        """

    @property
    def width(self) -> int:
        """Width in pixels.
        
        Returns:
            int: Frame width
        """

    @property
    def height(self) -> int:
        """Height in pixels.
        
        Returns:
            int: Frame height
        """

    @property
    def type(self) -> proto_video.VideoBufferType.ValueType:
        """Format type.
        
        Returns:
            VideoBufferType: Pixel format (RGBA, I420, etc.)
        """

    @property
    def data(self) -> memoryview:
        """Raw pixel data as memoryview.
        
        Returns:
            memoryview: View of pixel data
                       Can be modified in-place
                       
        Example:
            >>> frame = VideoFrame(640, 480, VideoBufferType.RGBA, data)
            >>> pixels = frame.data
            >>> 
            >>> # Modify pixel at (x=10, y=10)
            >>> x, y = 10, 10
            >>> idx = (y * frame.width + x) * 4  # RGBA = 4 bytes per pixel
            >>> pixels[idx] = 255      # Red
            >>> pixels[idx+1] = 0      # Green
            >>> pixels[idx+2] = 0      # Blue
            >>> pixels[idx+3] = 255    # Alpha
            
        Note:
            For packed formats (RGBA): Single buffer
            For planar formats (I420): Use get_plane() instead
        """

    def get_plane(self, plane_nth: int) -> Optional[memoryview]:
        """Get a specific plane for multi-plane formats.
        
        See video-processing.md for details.
        """

    def convert(
        self,
        type: proto_video.VideoBufferType.ValueType,
        *,
        flip_y: bool = False
    ) -> VideoFrame:
        """Convert frame to different format.
        
        See video-processing.md for details.
        """

VideoSource

class VideoSource:
    """Video source for publishing video frames.
    
    Manages video frame publishing with specified resolution.
    """

    def __init__(self, width: int, height: int) -> None:
        """Initialize VideoSource.
        
        Args:
            width: Width in pixels
                  Type: int
                  Must be positive
                  Typically multiple of 2
                  
            height: Height in pixels
                   Type: int
                   Must be positive
                   Typically multiple of 2
                   
        Returns:
            VideoSource instance
            
        Raises:
            ValueError: If width or height invalid
            
        Example:
            >>> # HD resolution
            >>> source = VideoSource(width=1920, height=1080)
            >>> 
            >>> # 720p resolution
            >>> source = VideoSource(width=1280, height=720)
            >>> 
            >>> # VGA resolution
            >>> source = VideoSource(width=640, height=480)
            
        Note:
            Resolution is fixed at creation.
            To change resolution, create new source.
            
            All captured frames should match source resolution.
            Mismatched frames may be rejected or scaled.
        """

    def capture_frame(
        self,
        frame: VideoFrame,
        *,
        timestamp_us: int = 0,
        rotation: proto_video.VideoRotation.ValueType = proto_video.VideoRotation.VIDEO_ROTATION_0
    ) -> None:
        """Capture a video frame for publishing.
        
        Args:
            frame: VideoFrame to capture
                  Must match source width and height
                  Can be any supported format
                  
            timestamp_us: Timestamp in microseconds
                         Type: int
                         Default: 0 (automatic timestamping)
                         Used for frame timing and synchronization
                         
            rotation: Video rotation
                     Type: VideoRotation enum
                     Default: VIDEO_ROTATION_0 (no rotation)
                     Options:
                     - VIDEO_ROTATION_0: No rotation
                     - VIDEO_ROTATION_90: 90° clockwise
                     - VIDEO_ROTATION_180: 180°
                     - VIDEO_ROTATION_270: 270° clockwise (90° CCW)
        
        Returns:
            None (synchronous operation)
            
        Raises:
            ValueError: If frame dimensions don't match source
            RuntimeError: If capture fails
            
        Example:
            >>> source = VideoSource(1920, 1080)
            >>> frame = VideoFrame(1920, 1080, VideoBufferType.RGBA, data)
            >>> 
            >>> # Capture with automatic timestamp
            >>> source.capture_frame(frame)
            >>> 
            >>> # Capture with explicit timestamp
            >>> timestamp = int(time.time() * 1_000_000)
            >>> source.capture_frame(frame, timestamp_us=timestamp)
            >>> 
            >>> # Capture with rotation (for portrait video)
            >>> source.capture_frame(
            ...     frame,
            ...     rotation=VideoRotation.VIDEO_ROTATION_90
            ... )
            
        Note:
            Synchronous operation (doesn't await).
            Frame is published immediately to track.
            No internal queue (unlike AudioSource).
            
            Frame rate control is manual:
            >>> import asyncio
            >>> fps = 30
            >>> for frame in frames:
            ...     source.capture_frame(frame)
            ...     await asyncio.sleep(1.0 / fps)
        """

    async def aclose(self) -> None:
        """Close the video source and clean up resources.
        
        Returns:
            None (awaitable)
            
        Example:
            >>> source = VideoSource(1920, 1080)
            >>> # ... use source
            >>> await source.aclose()
            
        Note:
            Always close sources when done.
            Releases internal resources.
            Source cannot be reused after closing.
        """

VideoStream

@dataclass
class VideoFrameEvent:
    """Event representing a received video frame.
    
    Attributes:
        frame: The video frame
              Type: VideoFrame
              
        timestamp_us: Frame timestamp in microseconds
                     Type: int
                     
        rotation: Frame rotation
                 Type: VideoRotation enum
    """
    frame: VideoFrame
    timestamp_us: int
    rotation: proto_video_frame.VideoRotation

class VideoStream(AsyncIterator[VideoFrameEvent]):
    """Asynchronous video stream for receiving video frames.
    
    Async iterator that yields VideoFrameEvent objects.
    """

    def __init__(
        self,
        track: Track,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
        **kwargs
    ) -> None:
        """Initialize VideoStream.
        
        Args:
            track: Video track to receive from
                  Type: Track (RemoteVideoTrack or LocalVideoTrack)
                  
            loop: Event loop to use
                 Type: asyncio.AbstractEventLoop | None
                 Default: None (uses current loop)
                 
            capacity: Internal frame queue capacity
                     Type: int
                     Default: 0 (unbounded)
                     >0: Bounded queue (drops frames if full)
                     
            format: Desired output format
                   Type: VideoBufferType | None
                   Default: None (uses track's native format)
                   SDK converts frames to this format if specified
                   
        Example:
            >>> # Basic stream (native format)
            >>> stream = VideoStream(track)
            >>> 
            >>> # Stream with format conversion
            >>> stream = VideoStream(
            ...     track,
            ...     format=VideoBufferType.RGBA  # Convert to RGBA
            ... )
            >>> 
            >>> # Stream with bounded queue
            >>> stream = VideoStream(track, capacity=30)  # Buffer 30 frames
        """

    @classmethod
    def from_participant(
        cls,
        *,
        participant: Participant,
        track_source: TrackSource.ValueType,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
        capacity: int = 0
    ) -> VideoStream:
        """Create a VideoStream from participant's video track.
        
        Args:
            participant: Participant to receive video from
            track_source: Track source type (e.g., SOURCE_CAMERA)
            ... (other args same as __init__)
            
        Returns:
            VideoStream: Stream for specified track source
            
        Raises:
            ValueError: If participant doesn't have track with specified source
            
        Example:
            >>> # Stream camera video from participant
            >>> stream = VideoStream.from_participant(
            ...     participant=remote_participant,
            ...     track_source=TrackSource.SOURCE_CAMERA,
            ...     format=VideoBufferType.RGBA
            ... )
        """

    @classmethod
    def from_track(
        cls,
        *,
        track: Track,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
        capacity: int = 0
    ) -> VideoStream:
        """Create a VideoStream from existing track.
        
        Alternative constructor (same as __init__).
        """

    async def aclose(self) -> None:
        """Close the video stream and clean up resources.
        
        Returns:
            None (awaitable)
            
        Example:
            >>> stream = VideoStream(track)
            >>> try:
            ...     async for event in stream:
            ...         process(event.frame)
            ... finally:
            ...     await stream.aclose()
        """

    def __aiter__(self) -> AsyncIterator[VideoFrameEvent]:
        """Return self as async iterator."""

    async def __anext__(self) -> VideoFrameEvent:
        """Get next video frame event.
        
        Returns:
            VideoFrameEvent: Next frame event with frame, timestamp, rotation
            
        Raises:
            StopAsyncIteration: When stream ends
        """

Complete Example

import asyncio
from livekit import (
    Room, VideoSource, LocalVideoTrack, VideoFrame,
    VideoBufferType, VideoRotation, VideoStream, TrackKind
)

async def main():
    room = Room()
    
    # Publishing video
    source = VideoSource(1280, 720)
    track = LocalVideoTrack.create_video_track("camera", source)
    await room.local_participant.publish_track(track)
    
    # Generate and capture video
    asyncio.create_task(generate_video(source))
    
    # Receiving video
    @room.on("track_subscribed")
    def on_track(track, publication, participant):
        if track.kind == TrackKind.KIND_VIDEO:
            asyncio.create_task(receive_video(track))
    
    await room.connect(url, token)
    await asyncio.sleep(30)
    
    # Cleanup
    await source.aclose()
    await room.disconnect()

async def generate_video(source: VideoSource):
    """Generate video frames."""
    width, height = 1280, 720
    fps = 30
    frame_duration = 1.0 / fps
    
    frame_count = 0
    
    while True:
        # Create frame data
        data = bytearray(width * height * 4)  # RGBA
        
        # Fill with color pattern
        color = (
            (frame_count * 1) % 256,  # Red
            (frame_count * 2) % 256,  # Green
            (frame_count * 3) % 256,  # Blue
            255                        # Alpha
        )
        
        for i in range(0, len(data), 4):
            data[i:i+4] = color
        
        # Create frame
        frame = VideoFrame(width, height, VideoBufferType.RGBA, data)
        
        # Capture with timestamp
        timestamp_us = int(time.time() * 1_000_000)
        source.capture_frame(
            frame,
            timestamp_us=timestamp_us,
            rotation=VideoRotation.VIDEO_ROTATION_0
        )
        
        frame_count += 1
        
        # Wait for next frame
        await asyncio.sleep(frame_duration)

async def receive_video(track):
    """Receive and process video frames."""
    # Create stream with RGBA format
    stream = VideoStream(track, format=VideoBufferType.RGBA)
    
    try:
        async for event in stream:
            frame = event.frame
            timestamp_us = event.timestamp_us
            rotation = event.rotation
            
            print(f"Received {frame.width}x{frame.height} frame, "
                  f"format: {frame.type}, rotation: {rotation}")
            
            # Access pixel data
            pixels = frame.data
            
            # Process frame
            # Examples:
            # - Display in window
            # - Save to image file
            # - Apply filters
            # - Detect objects
            # - Encode to video file
            
    finally:
        await stream.aclose()

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

1. Use Correct Buffer Size

# Calculate buffer sizes correctly
width, height = 1920, 1080

# RGBA: 4 bytes per pixel
rgba_size = width * height * 4
data_rgba = bytearray(rgba_size)

# I420: 1.5 bytes per pixel
i420_size = int(width * height * 1.5)
data_i420 = bytearray(i420_size)

# RGB24: 3 bytes per pixel
rgb24_size = width * height * 3
data_rgb24 = bytearray(rgb24_size)

2. Match Source Resolution

# Source and frames must match resolution
source = VideoSource(width=1280, height=720)

# Good: Matching resolution
frame = VideoFrame(1280, 720, VideoBufferType.RGBA, data)
source.capture_frame(frame)

# Bad: Mismatched resolution
# frame = VideoFrame(1920, 1080, VideoBufferType.RGBA, data)
# source.capture_frame(frame)  # Raises ValueError

3. Control Frame Rate

import asyncio
import time

async def capture_at_fps(source: VideoSource, fps: float):
    """Capture frames at specified FPS."""
    frame_interval = 1.0 / fps
    next_frame_time = time.time()
    
    while True:
        # Generate frame
        frame = generate_frame(source.width, source.height)
        
        # Capture
        source.capture_frame(frame)
        
        # Calculate next frame time
        next_frame_time += frame_interval
        
        # Sleep with drift compensation
        now = time.time()
        sleep_time = max(0, next_frame_time - now)
        await asyncio.sleep(sleep_time)

def generate_frame(width: int, height: int) -> VideoFrame:
    """Generate video frame."""
    data = bytearray(width * height * 4)
    # Fill with data...
    return VideoFrame(width, height, VideoBufferType.RGBA, data)

4. Handle Stream Capacity

# Bounded capacity prevents memory buildup
stream = VideoStream(track, capacity=30)  # Buffer 30 frames max

# For processing that might lag:
async def process_video_with_drops(stream: VideoStream):
    """Process video, dropping frames if too slow."""
    async for event in stream:
        # Process frame (may be slow)
        await process_frame(event.frame)
        # If processing is slow, old frames are dropped from queue

# Unbounded capacity (default) keeps all frames
stream = VideoStream(track, capacity=0)

# For live processing, use bounded capacity
# For recording, use unbounded capacity

5. Convert Format Efficiently

# Convert once in stream, not per frame
stream = VideoStream(track, format=VideoBufferType.RGBA)

async for event in stream:
    frame = event.frame  # Already in RGBA format
    # Process directly
    
# Instead of:
# stream = VideoStream(track)  # Native format
# async for event in stream:
#     frame = event.frame.convert(VideoBufferType.RGBA)  # Convert each frame

Advanced Patterns

Frame Timestamp Tracking

class FrameTimestampTracker:
    """Track frame timestamps and calculate FPS."""
    
    def __init__(self):
        self.timestamps = []
        self.max_history = 30
    
    def add_timestamp(self, timestamp_us: int):
        """Add frame timestamp."""
        self.timestamps.append(timestamp_us)
        
        # Keep last N timestamps
        if len(self.timestamps) > self.max_history:
            self.timestamps.pop(0)
    
    def get_fps(self) -> float:
        """Calculate current FPS."""
        if len(self.timestamps) < 2:
            return 0.0
        
        # Calculate time span
        time_span_us = self.timestamps[-1] - self.timestamps[0]
        time_span_s = time_span_us / 1_000_000
        
        # Calculate FPS
        frame_count = len(self.timestamps) - 1
        fps = frame_count / time_span_s if time_span_s > 0 else 0.0
        
        return fps

# Usage
tracker = FrameTimestampTracker()

async def process_with_fps(stream: VideoStream):
    async for event in stream:
        tracker.add_timestamp(event.timestamp_us)
        
        if len(tracker.timestamps) >= 10:
            fps = tracker.get_fps()
            print(f"Current FPS: {fps:.1f}")

Frame Dropping Strategy

async def process_with_frame_dropping(stream: VideoStream, target_fps: float):
    """Process video with frame dropping to maintain target FPS."""
    frame_interval_us = int(1_000_000 / target_fps)
    last_processed_us = 0
    
    async for event in stream:
        # Check if enough time has passed
        if event.timestamp_us - last_processed_us >= frame_interval_us:
            # Process this frame
            await process_frame(event.frame)
            last_processed_us = event.timestamp_us
        else:
            # Drop frame (too soon)
            print(f"Dropped frame at {event.timestamp_us}")

# Example: Process at 15 FPS even if receiving 30 FPS
asyncio.create_task(process_with_frame_dropping(stream, target_fps=15.0))

See Also

  • Video Tracks - Publishing and managing video tracks
  • Video Processing - Video frame conversion
  • Utilities - Audio/video synchronization