or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/livekit@1.0.x

docs

audio-frames-sources.mdaudio-processing.mdaudio-tracks.mddata-streaming.mde2ee.mdevents.mdindex.mdparticipants.mdroom.mdrpc.mdtrack-publications.mdtranscription.mdtypes-enums.mdutilities.mdvideo-frames-sources.mdvideo-processing.mdvideo-tracks.md
tile.json

tessl/pypi-livekit

tessl install tessl/pypi-livekit@1.0.0

Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities

data-streaming.mddocs/

Data Streaming

Overview

Data streaming provides high-throughput, chunked delivery of text and binary data between participants. Unlike simple data packets, streams support large data transfers with progress tracking and efficient chunking.

Key concepts:

  • Streams: Chunked data delivery for large files or continuous data
  • Topics: Optional categorization for routing
  • Attributes: Key-value metadata for streams
  • Text streams: UTF-8 encoded text data
  • Byte streams: Binary data (files, images, etc.)
  • Chunk size: Default 15KB for optimal performance

Import

from livekit import (
    TextStreamInfo,
    ByteStreamInfo,
    TextStreamReader,
    TextStreamWriter,
    ByteStreamReader,
    ByteStreamWriter,
)

Constants

STREAM_CHUNK_SIZE: int = 15_000
"""Size of stream chunks in bytes.

Default chunk size for data streaming.
Optimal balance between overhead and latency.
"""

Classes

BaseStreamInfo

@dataclass
class BaseStreamInfo:
    """Base information for data streams.
    
    Attributes:
        stream_id: Unique stream identifier
                  Type: str
                  Auto-generated or custom
        mime_type: MIME type of data
                  Type: str
                  Examples: "text/plain", "application/pdf"
        topic: Topic for categorization
              Type: str
              Empty string if not specified
        timestamp: Stream creation timestamp
                  Type: int
                  Unix timestamp in milliseconds
        size: Total size in bytes
             Type: int | None
             None if size unknown
        attributes: Key-value attributes
                   Type: Dict[str, str] | None
                   None if no attributes
    """
    
    stream_id: str
    mime_type: str
    topic: str
    timestamp: int
    size: Optional[int]
    attributes: Optional[Dict[str, str]]

TextStreamInfo

@dataclass
class TextStreamInfo(BaseStreamInfo):
    """Information for text streams.
    
    Attributes:
        attachments: List of attached stream IDs
                    Type: List[str]
                    IDs of related streams
                    Empty list if no attachments
    """
    
    attachments: List[str]

ByteStreamInfo

@dataclass
class ByteStreamInfo(BaseStreamInfo):
    """Information for byte streams.
    
    Attributes:
        name: Stream name
             Type: str
             Typically filename for files
             Display name for streams
    """
    
    name: str

TextStreamReader

class TextStreamReader(AsyncIterator[str]):
    """Reader for receiving text streams.
    
    Async iterator yielding text chunks.
    """
    
    def __init__(self, header: proto_DataStream.Header) -> None:
        """Initialize TextStreamReader.
        
        Args:
            header: Internal stream header
            
        Note:
            Created automatically by SDK.
            Access via stream handler.
        """
    
    @property
    def info(self) -> TextStreamInfo:
        """Stream information.
        
        Returns:
            TextStreamInfo: Metadata about the stream
        """
    
    async def read_all(self) -> str:
        """Read all text chunks into a single string.
        
        Returns:
            str: Complete text from all chunks
            
        Note:
            Buffers all chunks in memory.
            For large streams, use async iteration instead.
            
        Example:
            >>> async def handle_text(reader, sender):
            ...     text = await reader.read_all()
            ...     print(f"Complete text: {text}")
        """
    
    def __aiter__(self) -> AsyncIterator[str]:
        """Return self as async iterator.
        
        Returns:
            AsyncIterator[str]: Self
        """
    
    async def __anext__(self) -> str:
        """Get next text chunk.
        
        Returns:
            str: Next chunk of text
            
        Raises:
            StopAsyncIteration: When stream complete
            
        Example:
            >>> async for chunk in reader:
            ...     print(chunk, end='')
        """

ByteStreamReader

class ByteStreamReader(AsyncIterator[bytes]):
    """Reader for receiving byte streams.
    
    Async iterator yielding byte chunks.
    """
    
    def __init__(self, header: proto_DataStream.Header, capacity: int = 0) -> None:
        """Initialize ByteStreamReader.
        
        Args:
            header: Internal stream header
            capacity: Queue capacity (0 for unbounded)
        """
    
    @property
    def info(self) -> ByteStreamInfo:
        """Stream information.
        
        Returns:
            ByteStreamInfo: Metadata about the stream
        """
    
    def __aiter__(self) -> AsyncIterator[bytes]:
        """Return self as async iterator.
        
        Returns:
            AsyncIterator[bytes]: Self
        """
    
    async def __anext__(self) -> bytes:
        """Get next byte chunk.
        
        Returns:
            bytes: Next chunk of bytes
            
        Raises:
            StopAsyncIteration: When stream complete
            
        Example:
            >>> data = bytearray()
            >>> async for chunk in reader:
            ...     data.extend(chunk)
            ...     progress = len(data) / reader.info.size * 100
            ...     print(f"Progress: {progress:.1f}%")
        """

TextStreamWriter

class TextStreamWriter:
    """Writer for sending text streams.
    
    Writes text in chunks.
    """
    
    def __init__(
        self,
        local_participant: LocalParticipant,
        *,
        topic: str = "",
        attributes: Optional[Dict[str, str]] = {},
        stream_id: str | None = None,
        total_size: int | None = None,
        reply_to_id: str | None = None,
        destination_identities: Optional[List[str]] = None,
        sender_identity: str | None = None
    ) -> None:
        """Initialize TextStreamWriter.
        
        Args:
            local_participant: Local participant instance
            topic: Topic for routing (default: "")
            attributes: Key-value attributes (default: {})
            stream_id: Custom stream ID (default: auto-generated)
            total_size: Total size in bytes if known (default: None)
            reply_to_id: ID of stream being replied to (default: None)
            destination_identities: Target participants (default: None for all)
            sender_identity: Override sender identity (default: None)
        """
    
    @property
    def info(self) -> TextStreamInfo:
        """Stream information.
        
        Returns:
            TextStreamInfo: Metadata about this stream
        """
    
    async def write(self, text: str) -> None:
        """Write text chunk to stream.
        
        Args:
            text: Text to write (UTF-8 encoded)
            
        Raises:
            RuntimeError: If stream closed or write fails
            
        Example:
            >>> await writer.write("Hello ")
            >>> await writer.write("world!")
        """
    
    async def aclose(self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None) -> None:
        """Close the stream.
        
        Args:
            reason: Optional reason for closing (default: "")
            attributes: Final attributes to set (default: None)
            
        Note:
            Must be called to signal end of stream.
            Receivers will get StopAsyncIteration after this.
            
        Example:
            >>> await writer.write("data")
            >>> await writer.aclose()
        """

ByteStreamWriter

class ByteStreamWriter:
    """Writer for sending byte streams.
    
    Writes binary data in chunks.
    """
    
    def __init__(
        self,
        local_participant: LocalParticipant,
        *,
        name: str,
        topic: str = "",
        attributes: Optional[Dict[str, str]] = None,
        stream_id: str | None = None,
        total_size: int | None = None,
        mime_type: str = "application/octet-stream",
        destination_identities: Optional[List[str]] = None
    ) -> None:
        """Initialize ByteStreamWriter.
        
        Args:
            local_participant: Local participant instance
            name: Stream name (e.g., filename)
            topic: Topic for routing (default: "")
            attributes: Key-value attributes (default: None)
            stream_id: Custom stream ID (default: auto-generated)
            total_size: Total size in bytes if known (default: None)
            mime_type: MIME type (default: "application/octet-stream")
            destination_identities: Target participants (default: None for all)
        """
    
    @property
    def info(self) -> ByteStreamInfo:
        """Stream information.
        
        Returns:
            ByteStreamInfo: Metadata about this stream
        """
    
    async def write(self, data: bytes) -> None:
        """Write byte chunk to stream.
        
        Args:
            data: Bytes to write
            
        Raises:
            RuntimeError: If stream closed or write fails
            
        Example:
            >>> await writer.write(chunk1)
            >>> await writer.write(chunk2)
        """
    
    async def aclose(self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None) -> None:
        """Close the stream.
        
        Args:
            reason: Optional reason for closing (default: "")
            attributes: Final attributes to set (default: None)
            
        Example:
            >>> await writer.write(data)
            >>> await writer.aclose()
        """

Type Aliases

TextStreamHandler = Callable[[TextStreamReader, str], None]
"""Handler for text stream reception.

Args:
    reader: TextStreamReader for reading chunks
    sender_identity: Identity of sender (str)

Handler should be async function.

Example:
    >>> async def handler(reader: TextStreamReader, sender: str):
    ...     text = await reader.read_all()
    ...     print(f"From {sender}: {text}")
"""

ByteStreamHandler = Callable[[ByteStreamReader, str], None]
"""Handler for byte stream reception.

Args:
    reader: ByteStreamReader for reading chunks
    sender_identity: Identity of sender (str)

Handler should be async function.

Example:
    >>> async def handler(reader: ByteStreamReader, sender: str):
    ...     async for chunk in reader:
    ...         process(chunk)
"""

Usage Examples

Sending Text

from livekit import LocalParticipant

local: LocalParticipant = room.local_participant

# Stream text in chunks
writer = await local.stream_text(topic="chat")
await writer.write("Hello, ")
await writer.write("world!")
await writer.aclose()

# Send complete text (convenience method)
info = await local.send_text("Quick message", topic="chat")
print(f"Sent stream: {info.stream_id}")

Sending Bytes

# Stream bytes in chunks
writer = await local.stream_bytes(
    name="document.pdf",
    mime_type="application/pdf",
    total_size=1024000
)

# Write chunks
with open("document.pdf", "rb") as f:
    while chunk := f.read(15000):
        await writer.write(chunk)

await writer.aclose()

# Send file (convenience method)
info = await local.send_file("/path/to/file.txt", topic="files")
print(f"Sent file: {info.name} ({info.size} bytes)")

Receiving Streams

from livekit import TextStreamReader, ByteStreamReader

# Register text stream handler
async def handle_text(reader: TextStreamReader, sender: str):
    """Handle text stream."""
    print(f"Text stream from {sender}")
    print(f"Topic: {reader.info.topic}")
    
    # Read all at once
    text = await reader.read_all()
    print(f"Complete text: {text}")

room.register_text_stream_handler("chat", handle_text)

# Register byte stream handler
async def handle_bytes(reader: ByteStreamReader, sender: str):
    """Handle byte stream."""
    print(f"Receiving {reader.info.name} from {sender}")
    print(f"Size: {reader.info.size} bytes")
    print(f"MIME: {reader.info.mime_type}")
    
    # Process chunks with progress
    received = 0
    data = bytearray()
    
    async for chunk in reader:
        data.extend(chunk)
        received += len(chunk)
        
        if reader.info.size:
            progress = (received / reader.info.size) * 100
            print(f"Progress: {progress:.1f}%")
    
    print(f"Complete: {len(data)} bytes")
    
    # Save to file
    with open(f"received_{reader.info.name}", "wb") as f:
        f.write(data)

room.register_byte_stream_handler("files", handle_bytes)

Complete Example

import asyncio
from livekit import Room, TextStreamReader, ByteStreamReader

async def main():
    room = Room()
    
    # Register stream handlers
    async def on_text(reader: TextStreamReader, sender: str):
        """Handle text stream."""
        print(f"Text from {sender}:")
        print(f"  Topic: {reader.info.topic}")
        print(f"  Stream ID: {reader.info.stream_id}")
        
        # Stream chunks
        async for chunk in reader:
            print(chunk, end='')
        print()  # Newline
    
    async def on_bytes(reader: ByteStreamReader, sender: str):
        """Handle byte stream."""
        print(f"File '{reader.info.name}' from {sender}")
        print(f"  Size: {reader.info.size} bytes")
        print(f"  MIME: {reader.info.mime_type}")
        print(f"  Topic: {reader.info.topic}")
        
        # Receive with progress
        data = bytearray()
        async for chunk in reader:
            data.extend(chunk)
            if reader.info.size:
                pct = len(data) / reader.info.size * 100
                print(f"  Progress: {pct:.1f}%")
        
        print(f"  Complete: {len(data)} bytes")
    
    room.register_text_stream_handler("chat", on_text)
    room.register_byte_stream_handler("files", on_bytes)
    
    await room.connect(url, token)
    
    # Send text stream
    writer = await room.local_participant.stream_text(topic="chat")
    await writer.write("Hello ")
    await writer.write("everyone!")
    await writer.aclose()
    
    # Send file
    info = await room.local_participant.send_file(
        "/path/to/document.pdf",
        topic="files"
    )
    print(f"Sent {info.name}")
    
    await asyncio.sleep(30)
    await room.disconnect()

asyncio.run(main())

Best Practices

1. Use Appropriate Chunk Size

# Good: Use ~15KB chunks
CHUNK_SIZE = 15000
with open("file.bin", "rb") as f:
    while chunk := f.read(CHUNK_SIZE):
        await writer.write(chunk)

# Avoid: Too small (high overhead)
# while chunk := f.read(100):
#     await writer.write(chunk)

# Avoid: Too large (high latency)
# while chunk := f.read(1_000_000):
#     await writer.write(chunk)

2. Provide Total Size for Progress

import os

file_path = "/path/to/file.pdf"
file_size = os.path.getsize(file_path)

# Good: Specify total_size
writer = await local.stream_bytes(
    name=os.path.basename(file_path),
    total_size=file_size,
    mime_type="application/pdf"
)

# Receiver can show progress
async def handle_bytes(reader, sender):
    if reader.info.size:
        # Can calculate progress
        progress = received / reader.info.size * 100

3. Always Close Streams

writer = None
try:
    writer = await local.stream_text(topic="chat")
    await writer.write("Hello")
    await writer.write("World")
finally:
    if writer:
        await writer.aclose()

# Or use pattern
writer = await local.stream_text(topic="chat")
try:
    await writer.write("data")
except Exception as e:
    print(f"Error: {e}")
finally:
    await writer.aclose()

4. Use Topics for Routing

# Register handlers per topic
room.register_text_stream_handler("chat", handle_chat)
room.register_text_stream_handler("commands", handle_commands)
room.register_byte_stream_handler("images", handle_images)
room.register_byte_stream_handler("documents", handle_documents)

# Send to appropriate topic
await local.stream_text(topic="chat")  # Routes to handle_chat
await local.stream_bytes(name="pic.jpg", topic="images")  # Routes to handle_images

5. Handle Stream Errors

async def safe_stream_handler(reader, sender):
    """Stream handler with error handling."""
    try:
        async for chunk in reader:
            # Process chunk
            process(chunk)
    except Exception as e:
        print(f"Stream error: {e}")
        # Cleanup if needed
    finally:
        # Stream cleanup
        pass

Advanced Patterns

Progress Tracking

class ProgressTracker:
    """Track stream progress."""
    
    def __init__(self, total_size: int):
        self.total_size = total_size
        self.received = 0
    
    def update(self, chunk_size: int):
        """Update progress."""
        self.received += chunk_size
    
    @property
    def progress(self) -> float:
        """Get progress percentage."""
        if self.total_size == 0:
            return 0.0
        return (self.received / self.total_size) * 100
    
    @property
    def complete(self) -> bool:
        """Check if complete."""
        return self.received >= self.total_size

# Usage
async def handle_with_progress(reader: ByteStreamReader, sender: str):
    if not reader.info.size:
        print("Unknown size")
        return
    
    tracker = ProgressTracker(reader.info.size)
    
    async for chunk in reader:
        tracker.update(len(chunk))
        print(f"Progress: {tracker.progress:.1f}%")
        
        if tracker.progress % 10 < 1:  # Every 10%
            print(f"Milestone: {tracker.progress:.0f}%")

File Assembly

import os

async def receive_file(reader: ByteStreamReader, output_dir: str):
    """Receive file and save to disk."""
    output_path = os.path.join(output_dir, reader.info.name)
    
    print(f"Receiving: {reader.info.name}")
    print(f"  Size: {reader.info.size} bytes")
    print(f"  MIME: {reader.info.mime_type}")
    
    with open(output_path, "wb") as f:
        async for chunk in reader:
            f.write(chunk)
            
            if reader.info.size:
                progress = f.tell() / reader.info.size * 100
                print(f"  Progress: {progress:.1f}%", end='\r')
    
    print(f"\nSaved to: {output_path}")
    return output_path

See Also

  • Participants - Sending data and streams
  • Room and Connection Management - Registering stream handlers
  • RPC - Alternative communication pattern