tessl install tessl/pypi-livekit@1.0.0Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities
Low-level video handling for creating, managing, and capturing video frames to sources. Provides direct control over video data for advanced use cases.
Key concepts:
from livekit import (
VideoFrame, VideoSource, VideoStream, VideoFrameEvent,
VideoBufferType, VideoRotation
)class VideoFrame:
"""Represents a video frame with pixel data.
Stores raw pixel data in specified format.
"""
def __init__(
self,
width: int,
height: int,
type: proto_video.VideoBufferType.ValueType,
data: Union[bytes, bytearray, memoryview]
) -> None:
"""Initialize VideoFrame with pixel data.
Args:
width: Width in pixels
Type: int
Must be positive
Typically multiple of 2 for YUV formats
height: Height in pixels
Type: int
Must be positive
Typically multiple of 2 for YUV formats
type: Format type
Type: VideoBufferType enum
Examples:
- RGBA: 4 bytes per pixel (width * height * 4)
- I420: 1.5 bytes per pixel (width * height * 1.5)
- RGB24: 3 bytes per pixel (width * height * 3)
data: Raw pixel data
Type: bytes | bytearray | memoryview
Size must match format requirements
Raises:
ValueError: If data size doesn't match format requirements
Example:
>>> # RGBA frame
>>> width, height = 1920, 1080
>>> data = bytearray(width * height * 4)
>>> frame = VideoFrame(width, height, VideoBufferType.RGBA, data)
>>>
>>> # I420 frame
>>> data_i420 = bytearray(int(width * height * 1.5))
>>> frame_i420 = VideoFrame(width, height, VideoBufferType.I420, data_i420)
"""
@property
def width(self) -> int:
"""Width in pixels.
Returns:
int: Frame width
"""
@property
def height(self) -> int:
"""Height in pixels.
Returns:
int: Frame height
"""
@property
def type(self) -> proto_video.VideoBufferType.ValueType:
"""Format type.
Returns:
VideoBufferType: Pixel format (RGBA, I420, etc.)
"""
@property
def data(self) -> memoryview:
"""Raw pixel data as memoryview.
Returns:
memoryview: View of pixel data
Can be modified in-place
Example:
>>> frame = VideoFrame(640, 480, VideoBufferType.RGBA, data)
>>> pixels = frame.data
>>>
>>> # Modify pixel at (x=10, y=10)
>>> x, y = 10, 10
>>> idx = (y * frame.width + x) * 4 # RGBA = 4 bytes per pixel
>>> pixels[idx] = 255 # Red
>>> pixels[idx+1] = 0 # Green
>>> pixels[idx+2] = 0 # Blue
>>> pixels[idx+3] = 255 # Alpha
Note:
For packed formats (RGBA): Single buffer
For planar formats (I420): Use get_plane() instead
"""
def get_plane(self, plane_nth: int) -> Optional[memoryview]:
"""Get a specific plane for multi-plane formats.
See video-processing.md for details.
"""
def convert(
self,
type: proto_video.VideoBufferType.ValueType,
*,
flip_y: bool = False
) -> VideoFrame:
"""Convert frame to different format.
See video-processing.md for details.
"""class VideoSource:
"""Video source for publishing video frames.
Manages video frame publishing with specified resolution.
"""
def __init__(self, width: int, height: int) -> None:
"""Initialize VideoSource.
Args:
width: Width in pixels
Type: int
Must be positive
Typically multiple of 2
height: Height in pixels
Type: int
Must be positive
Typically multiple of 2
Returns:
VideoSource instance
Raises:
ValueError: If width or height invalid
Example:
>>> # HD resolution
>>> source = VideoSource(width=1920, height=1080)
>>>
>>> # 720p resolution
>>> source = VideoSource(width=1280, height=720)
>>>
>>> # VGA resolution
>>> source = VideoSource(width=640, height=480)
Note:
Resolution is fixed at creation.
To change resolution, create new source.
All captured frames should match source resolution.
Mismatched frames may be rejected or scaled.
"""
def capture_frame(
self,
frame: VideoFrame,
*,
timestamp_us: int = 0,
rotation: proto_video.VideoRotation.ValueType = proto_video.VideoRotation.VIDEO_ROTATION_0
) -> None:
"""Capture a video frame for publishing.
Args:
frame: VideoFrame to capture
Must match source width and height
Can be any supported format
timestamp_us: Timestamp in microseconds
Type: int
Default: 0 (automatic timestamping)
Used for frame timing and synchronization
rotation: Video rotation
Type: VideoRotation enum
Default: VIDEO_ROTATION_0 (no rotation)
Options:
- VIDEO_ROTATION_0: No rotation
- VIDEO_ROTATION_90: 90° clockwise
- VIDEO_ROTATION_180: 180°
- VIDEO_ROTATION_270: 270° clockwise (90° CCW)
Returns:
None (synchronous operation)
Raises:
ValueError: If frame dimensions don't match source
RuntimeError: If capture fails
Example:
>>> source = VideoSource(1920, 1080)
>>> frame = VideoFrame(1920, 1080, VideoBufferType.RGBA, data)
>>>
>>> # Capture with automatic timestamp
>>> source.capture_frame(frame)
>>>
>>> # Capture with explicit timestamp
>>> timestamp = int(time.time() * 1_000_000)
>>> source.capture_frame(frame, timestamp_us=timestamp)
>>>
>>> # Capture with rotation (for portrait video)
>>> source.capture_frame(
... frame,
... rotation=VideoRotation.VIDEO_ROTATION_90
... )
Note:
Synchronous operation (doesn't await).
Frame is published immediately to track.
No internal queue (unlike AudioSource).
Frame rate control is manual:
>>> import asyncio
>>> fps = 30
>>> for frame in frames:
... source.capture_frame(frame)
... await asyncio.sleep(1.0 / fps)
"""
async def aclose(self) -> None:
"""Close the video source and clean up resources.
Returns:
None (awaitable)
Example:
>>> source = VideoSource(1920, 1080)
>>> # ... use source
>>> await source.aclose()
Note:
Always close sources when done.
Releases internal resources.
Source cannot be reused after closing.
"""@dataclass
class VideoFrameEvent:
"""Event representing a received video frame.
Attributes:
frame: The video frame
Type: VideoFrame
timestamp_us: Frame timestamp in microseconds
Type: int
rotation: Frame rotation
Type: VideoRotation enum
"""
frame: VideoFrame
timestamp_us: int
rotation: proto_video_frame.VideoRotation
class VideoStream(AsyncIterator[VideoFrameEvent]):
"""Asynchronous video stream for receiving video frames.
Async iterator that yields VideoFrameEvent objects.
"""
def __init__(
self,
track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
**kwargs
) -> None:
"""Initialize VideoStream.
Args:
track: Video track to receive from
Type: Track (RemoteVideoTrack or LocalVideoTrack)
loop: Event loop to use
Type: asyncio.AbstractEventLoop | None
Default: None (uses current loop)
capacity: Internal frame queue capacity
Type: int
Default: 0 (unbounded)
>0: Bounded queue (drops frames if full)
format: Desired output format
Type: VideoBufferType | None
Default: None (uses track's native format)
SDK converts frames to this format if specified
Example:
>>> # Basic stream (native format)
>>> stream = VideoStream(track)
>>>
>>> # Stream with format conversion
>>> stream = VideoStream(
... track,
... format=VideoBufferType.RGBA # Convert to RGBA
... )
>>>
>>> # Stream with bounded queue
>>> stream = VideoStream(track, capacity=30) # Buffer 30 frames
"""
@classmethod
def from_participant(
cls,
*,
participant: Participant,
track_source: TrackSource.ValueType,
loop: Optional[asyncio.AbstractEventLoop] = None,
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
capacity: int = 0
) -> VideoStream:
"""Create a VideoStream from participant's video track.
Args:
participant: Participant to receive video from
track_source: Track source type (e.g., SOURCE_CAMERA)
... (other args same as __init__)
Returns:
VideoStream: Stream for specified track source
Raises:
ValueError: If participant doesn't have track with specified source
Example:
>>> # Stream camera video from participant
>>> stream = VideoStream.from_participant(
... participant=remote_participant,
... track_source=TrackSource.SOURCE_CAMERA,
... format=VideoBufferType.RGBA
... )
"""
@classmethod
def from_track(
cls,
*,
track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
capacity: int = 0
) -> VideoStream:
"""Create a VideoStream from existing track.
Alternative constructor (same as __init__).
"""
async def aclose(self) -> None:
"""Close the video stream and clean up resources.
Returns:
None (awaitable)
Example:
>>> stream = VideoStream(track)
>>> try:
... async for event in stream:
... process(event.frame)
... finally:
... await stream.aclose()
"""
def __aiter__(self) -> AsyncIterator[VideoFrameEvent]:
"""Return self as async iterator."""
async def __anext__(self) -> VideoFrameEvent:
"""Get next video frame event.
Returns:
VideoFrameEvent: Next frame event with frame, timestamp, rotation
Raises:
StopAsyncIteration: When stream ends
"""import asyncio
from livekit import (
Room, VideoSource, LocalVideoTrack, VideoFrame,
VideoBufferType, VideoRotation, VideoStream, TrackKind
)
async def main():
room = Room()
# Publishing video
source = VideoSource(1280, 720)
track = LocalVideoTrack.create_video_track("camera", source)
await room.local_participant.publish_track(track)
# Generate and capture video
asyncio.create_task(generate_video(source))
# Receiving video
@room.on("track_subscribed")
def on_track(track, publication, participant):
if track.kind == TrackKind.KIND_VIDEO:
asyncio.create_task(receive_video(track))
await room.connect(url, token)
await asyncio.sleep(30)
# Cleanup
await source.aclose()
await room.disconnect()
async def generate_video(source: VideoSource):
"""Generate video frames."""
width, height = 1280, 720
fps = 30
frame_duration = 1.0 / fps
frame_count = 0
while True:
# Create frame data
data = bytearray(width * height * 4) # RGBA
# Fill with color pattern
color = (
(frame_count * 1) % 256, # Red
(frame_count * 2) % 256, # Green
(frame_count * 3) % 256, # Blue
255 # Alpha
)
for i in range(0, len(data), 4):
data[i:i+4] = color
# Create frame
frame = VideoFrame(width, height, VideoBufferType.RGBA, data)
# Capture with timestamp
timestamp_us = int(time.time() * 1_000_000)
source.capture_frame(
frame,
timestamp_us=timestamp_us,
rotation=VideoRotation.VIDEO_ROTATION_0
)
frame_count += 1
# Wait for next frame
await asyncio.sleep(frame_duration)
async def receive_video(track):
"""Receive and process video frames."""
# Create stream with RGBA format
stream = VideoStream(track, format=VideoBufferType.RGBA)
try:
async for event in stream:
frame = event.frame
timestamp_us = event.timestamp_us
rotation = event.rotation
print(f"Received {frame.width}x{frame.height} frame, "
f"format: {frame.type}, rotation: {rotation}")
# Access pixel data
pixels = frame.data
# Process frame
# Examples:
# - Display in window
# - Save to image file
# - Apply filters
# - Detect objects
# - Encode to video file
finally:
await stream.aclose()
if __name__ == "__main__":
asyncio.run(main())# Calculate buffer sizes correctly
width, height = 1920, 1080
# RGBA: 4 bytes per pixel
rgba_size = width * height * 4
data_rgba = bytearray(rgba_size)
# I420: 1.5 bytes per pixel
i420_size = int(width * height * 1.5)
data_i420 = bytearray(i420_size)
# RGB24: 3 bytes per pixel
rgb24_size = width * height * 3
data_rgb24 = bytearray(rgb24_size)# Source and frames must match resolution
source = VideoSource(width=1280, height=720)
# Good: Matching resolution
frame = VideoFrame(1280, 720, VideoBufferType.RGBA, data)
source.capture_frame(frame)
# Bad: Mismatched resolution
# frame = VideoFrame(1920, 1080, VideoBufferType.RGBA, data)
# source.capture_frame(frame) # Raises ValueErrorimport asyncio
import time
async def capture_at_fps(source: VideoSource, fps: float):
"""Capture frames at specified FPS."""
frame_interval = 1.0 / fps
next_frame_time = time.time()
while True:
# Generate frame
frame = generate_frame(source.width, source.height)
# Capture
source.capture_frame(frame)
# Calculate next frame time
next_frame_time += frame_interval
# Sleep with drift compensation
now = time.time()
sleep_time = max(0, next_frame_time - now)
await asyncio.sleep(sleep_time)
def generate_frame(width: int, height: int) -> VideoFrame:
"""Generate video frame."""
data = bytearray(width * height * 4)
# Fill with data...
return VideoFrame(width, height, VideoBufferType.RGBA, data)# Bounded capacity prevents memory buildup
stream = VideoStream(track, capacity=30) # Buffer 30 frames max
# For processing that might lag:
async def process_video_with_drops(stream: VideoStream):
"""Process video, dropping frames if too slow."""
async for event in stream:
# Process frame (may be slow)
await process_frame(event.frame)
# If processing is slow, old frames are dropped from queue
# Unbounded capacity (default) keeps all frames
stream = VideoStream(track, capacity=0)
# For live processing, use bounded capacity
# For recording, use unbounded capacity# Convert once in stream, not per frame
stream = VideoStream(track, format=VideoBufferType.RGBA)
async for event in stream:
frame = event.frame # Already in RGBA format
# Process directly
# Instead of:
# stream = VideoStream(track) # Native format
# async for event in stream:
# frame = event.frame.convert(VideoBufferType.RGBA) # Convert each frameclass FrameTimestampTracker:
"""Track frame timestamps and calculate FPS."""
def __init__(self):
self.timestamps = []
self.max_history = 30
def add_timestamp(self, timestamp_us: int):
"""Add frame timestamp."""
self.timestamps.append(timestamp_us)
# Keep last N timestamps
if len(self.timestamps) > self.max_history:
self.timestamps.pop(0)
def get_fps(self) -> float:
"""Calculate current FPS."""
if len(self.timestamps) < 2:
return 0.0
# Calculate time span
time_span_us = self.timestamps[-1] - self.timestamps[0]
time_span_s = time_span_us / 1_000_000
# Calculate FPS
frame_count = len(self.timestamps) - 1
fps = frame_count / time_span_s if time_span_s > 0 else 0.0
return fps
# Usage
tracker = FrameTimestampTracker()
async def process_with_fps(stream: VideoStream):
async for event in stream:
tracker.add_timestamp(event.timestamp_us)
if len(tracker.timestamps) >= 10:
fps = tracker.get_fps()
print(f"Current FPS: {fps:.1f}")async def process_with_frame_dropping(stream: VideoStream, target_fps: float):
"""Process video with frame dropping to maintain target FPS."""
frame_interval_us = int(1_000_000 / target_fps)
last_processed_us = 0
async for event in stream:
# Check if enough time has passed
if event.timestamp_us - last_processed_us >= frame_interval_us:
# Process this frame
await process_frame(event.frame)
last_processed_us = event.timestamp_us
else:
# Drop frame (too soon)
print(f"Dropped frame at {event.timestamp_us}")
# Example: Process at 15 FPS even if receiving 30 FPS
asyncio.create_task(process_with_frame_dropping(stream, target_fps=15.0))