tessl install tessl/pypi-livekit@1.0.0Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities
Data streaming provides high-throughput, chunked delivery of text and binary data between participants. Unlike simple data packets, streams support large data transfers with progress tracking and efficient chunking.
Key concepts:
from livekit import (
TextStreamInfo,
ByteStreamInfo,
TextStreamReader,
TextStreamWriter,
ByteStreamReader,
ByteStreamWriter,
)STREAM_CHUNK_SIZE: int = 15_000
"""Size of stream chunks in bytes.
Default chunk size for data streaming.
Optimal balance between overhead and latency.
"""@dataclass
class BaseStreamInfo:
"""Base information for data streams.
Attributes:
stream_id: Unique stream identifier
Type: str
Auto-generated or custom
mime_type: MIME type of data
Type: str
Examples: "text/plain", "application/pdf"
topic: Topic for categorization
Type: str
Empty string if not specified
timestamp: Stream creation timestamp
Type: int
Unix timestamp in milliseconds
size: Total size in bytes
Type: int | None
None if size unknown
attributes: Key-value attributes
Type: Dict[str, str] | None
None if no attributes
"""
stream_id: str
mime_type: str
topic: str
timestamp: int
size: Optional[int]
attributes: Optional[Dict[str, str]]@dataclass
class TextStreamInfo(BaseStreamInfo):
"""Information for text streams.
Attributes:
attachments: List of attached stream IDs
Type: List[str]
IDs of related streams
Empty list if no attachments
"""
attachments: List[str]@dataclass
class ByteStreamInfo(BaseStreamInfo):
"""Information for byte streams.
Attributes:
name: Stream name
Type: str
Typically filename for files
Display name for streams
"""
name: strclass TextStreamReader(AsyncIterator[str]):
"""Reader for receiving text streams.
Async iterator yielding text chunks.
"""
def __init__(self, header: proto_DataStream.Header) -> None:
"""Initialize TextStreamReader.
Args:
header: Internal stream header
Note:
Created automatically by SDK.
Access via stream handler.
"""
@property
def info(self) -> TextStreamInfo:
"""Stream information.
Returns:
TextStreamInfo: Metadata about the stream
"""
async def read_all(self) -> str:
"""Read all text chunks into a single string.
Returns:
str: Complete text from all chunks
Note:
Buffers all chunks in memory.
For large streams, use async iteration instead.
Example:
>>> async def handle_text(reader, sender):
... text = await reader.read_all()
... print(f"Complete text: {text}")
"""
def __aiter__(self) -> AsyncIterator[str]:
"""Return self as async iterator.
Returns:
AsyncIterator[str]: Self
"""
async def __anext__(self) -> str:
"""Get next text chunk.
Returns:
str: Next chunk of text
Raises:
StopAsyncIteration: When stream complete
Example:
>>> async for chunk in reader:
... print(chunk, end='')
"""class ByteStreamReader(AsyncIterator[bytes]):
"""Reader for receiving byte streams.
Async iterator yielding byte chunks.
"""
def __init__(self, header: proto_DataStream.Header, capacity: int = 0) -> None:
"""Initialize ByteStreamReader.
Args:
header: Internal stream header
capacity: Queue capacity (0 for unbounded)
"""
@property
def info(self) -> ByteStreamInfo:
"""Stream information.
Returns:
ByteStreamInfo: Metadata about the stream
"""
def __aiter__(self) -> AsyncIterator[bytes]:
"""Return self as async iterator.
Returns:
AsyncIterator[bytes]: Self
"""
async def __anext__(self) -> bytes:
"""Get next byte chunk.
Returns:
bytes: Next chunk of bytes
Raises:
StopAsyncIteration: When stream complete
Example:
>>> data = bytearray()
>>> async for chunk in reader:
... data.extend(chunk)
... progress = len(data) / reader.info.size * 100
... print(f"Progress: {progress:.1f}%")
"""class TextStreamWriter:
"""Writer for sending text streams.
Writes text in chunks.
"""
def __init__(
self,
local_participant: LocalParticipant,
*,
topic: str = "",
attributes: Optional[Dict[str, str]] = {},
stream_id: str | None = None,
total_size: int | None = None,
reply_to_id: str | None = None,
destination_identities: Optional[List[str]] = None,
sender_identity: str | None = None
) -> None:
"""Initialize TextStreamWriter.
Args:
local_participant: Local participant instance
topic: Topic for routing (default: "")
attributes: Key-value attributes (default: {})
stream_id: Custom stream ID (default: auto-generated)
total_size: Total size in bytes if known (default: None)
reply_to_id: ID of stream being replied to (default: None)
destination_identities: Target participants (default: None for all)
sender_identity: Override sender identity (default: None)
"""
@property
def info(self) -> TextStreamInfo:
"""Stream information.
Returns:
TextStreamInfo: Metadata about this stream
"""
async def write(self, text: str) -> None:
"""Write text chunk to stream.
Args:
text: Text to write (UTF-8 encoded)
Raises:
RuntimeError: If stream closed or write fails
Example:
>>> await writer.write("Hello ")
>>> await writer.write("world!")
"""
async def aclose(self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None) -> None:
"""Close the stream.
Args:
reason: Optional reason for closing (default: "")
attributes: Final attributes to set (default: None)
Note:
Must be called to signal end of stream.
Receivers will get StopAsyncIteration after this.
Example:
>>> await writer.write("data")
>>> await writer.aclose()
"""class ByteStreamWriter:
"""Writer for sending byte streams.
Writes binary data in chunks.
"""
def __init__(
self,
local_participant: LocalParticipant,
*,
name: str,
topic: str = "",
attributes: Optional[Dict[str, str]] = None,
stream_id: str | None = None,
total_size: int | None = None,
mime_type: str = "application/octet-stream",
destination_identities: Optional[List[str]] = None
) -> None:
"""Initialize ByteStreamWriter.
Args:
local_participant: Local participant instance
name: Stream name (e.g., filename)
topic: Topic for routing (default: "")
attributes: Key-value attributes (default: None)
stream_id: Custom stream ID (default: auto-generated)
total_size: Total size in bytes if known (default: None)
mime_type: MIME type (default: "application/octet-stream")
destination_identities: Target participants (default: None for all)
"""
@property
def info(self) -> ByteStreamInfo:
"""Stream information.
Returns:
ByteStreamInfo: Metadata about this stream
"""
async def write(self, data: bytes) -> None:
"""Write byte chunk to stream.
Args:
data: Bytes to write
Raises:
RuntimeError: If stream closed or write fails
Example:
>>> await writer.write(chunk1)
>>> await writer.write(chunk2)
"""
async def aclose(self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None) -> None:
"""Close the stream.
Args:
reason: Optional reason for closing (default: "")
attributes: Final attributes to set (default: None)
Example:
>>> await writer.write(data)
>>> await writer.aclose()
"""TextStreamHandler = Callable[[TextStreamReader, str], None]
"""Handler for text stream reception.
Args:
reader: TextStreamReader for reading chunks
sender_identity: Identity of sender (str)
Handler should be async function.
Example:
>>> async def handler(reader: TextStreamReader, sender: str):
... text = await reader.read_all()
... print(f"From {sender}: {text}")
"""
ByteStreamHandler = Callable[[ByteStreamReader, str], None]
"""Handler for byte stream reception.
Args:
reader: ByteStreamReader for reading chunks
sender_identity: Identity of sender (str)
Handler should be async function.
Example:
>>> async def handler(reader: ByteStreamReader, sender: str):
... async for chunk in reader:
... process(chunk)
"""from livekit import LocalParticipant
local: LocalParticipant = room.local_participant
# Stream text in chunks
writer = await local.stream_text(topic="chat")
await writer.write("Hello, ")
await writer.write("world!")
await writer.aclose()
# Send complete text (convenience method)
info = await local.send_text("Quick message", topic="chat")
print(f"Sent stream: {info.stream_id}")# Stream bytes in chunks
writer = await local.stream_bytes(
name="document.pdf",
mime_type="application/pdf",
total_size=1024000
)
# Write chunks
with open("document.pdf", "rb") as f:
while chunk := f.read(15000):
await writer.write(chunk)
await writer.aclose()
# Send file (convenience method)
info = await local.send_file("/path/to/file.txt", topic="files")
print(f"Sent file: {info.name} ({info.size} bytes)")from livekit import TextStreamReader, ByteStreamReader
# Register text stream handler
async def handle_text(reader: TextStreamReader, sender: str):
"""Handle text stream."""
print(f"Text stream from {sender}")
print(f"Topic: {reader.info.topic}")
# Read all at once
text = await reader.read_all()
print(f"Complete text: {text}")
room.register_text_stream_handler("chat", handle_text)
# Register byte stream handler
async def handle_bytes(reader: ByteStreamReader, sender: str):
"""Handle byte stream."""
print(f"Receiving {reader.info.name} from {sender}")
print(f"Size: {reader.info.size} bytes")
print(f"MIME: {reader.info.mime_type}")
# Process chunks with progress
received = 0
data = bytearray()
async for chunk in reader:
data.extend(chunk)
received += len(chunk)
if reader.info.size:
progress = (received / reader.info.size) * 100
print(f"Progress: {progress:.1f}%")
print(f"Complete: {len(data)} bytes")
# Save to file
with open(f"received_{reader.info.name}", "wb") as f:
f.write(data)
room.register_byte_stream_handler("files", handle_bytes)import asyncio
from livekit import Room, TextStreamReader, ByteStreamReader
async def main():
room = Room()
# Register stream handlers
async def on_text(reader: TextStreamReader, sender: str):
"""Handle text stream."""
print(f"Text from {sender}:")
print(f" Topic: {reader.info.topic}")
print(f" Stream ID: {reader.info.stream_id}")
# Stream chunks
async for chunk in reader:
print(chunk, end='')
print() # Newline
async def on_bytes(reader: ByteStreamReader, sender: str):
"""Handle byte stream."""
print(f"File '{reader.info.name}' from {sender}")
print(f" Size: {reader.info.size} bytes")
print(f" MIME: {reader.info.mime_type}")
print(f" Topic: {reader.info.topic}")
# Receive with progress
data = bytearray()
async for chunk in reader:
data.extend(chunk)
if reader.info.size:
pct = len(data) / reader.info.size * 100
print(f" Progress: {pct:.1f}%")
print(f" Complete: {len(data)} bytes")
room.register_text_stream_handler("chat", on_text)
room.register_byte_stream_handler("files", on_bytes)
await room.connect(url, token)
# Send text stream
writer = await room.local_participant.stream_text(topic="chat")
await writer.write("Hello ")
await writer.write("everyone!")
await writer.aclose()
# Send file
info = await room.local_participant.send_file(
"/path/to/document.pdf",
topic="files"
)
print(f"Sent {info.name}")
await asyncio.sleep(30)
await room.disconnect()
asyncio.run(main())# Good: Use ~15KB chunks
CHUNK_SIZE = 15000
with open("file.bin", "rb") as f:
while chunk := f.read(CHUNK_SIZE):
await writer.write(chunk)
# Avoid: Too small (high overhead)
# while chunk := f.read(100):
# await writer.write(chunk)
# Avoid: Too large (high latency)
# while chunk := f.read(1_000_000):
# await writer.write(chunk)import os
file_path = "/path/to/file.pdf"
file_size = os.path.getsize(file_path)
# Good: Specify total_size
writer = await local.stream_bytes(
name=os.path.basename(file_path),
total_size=file_size,
mime_type="application/pdf"
)
# Receiver can show progress
async def handle_bytes(reader, sender):
if reader.info.size:
# Can calculate progress
progress = received / reader.info.size * 100writer = None
try:
writer = await local.stream_text(topic="chat")
await writer.write("Hello")
await writer.write("World")
finally:
if writer:
await writer.aclose()
# Or use pattern
writer = await local.stream_text(topic="chat")
try:
await writer.write("data")
except Exception as e:
print(f"Error: {e}")
finally:
await writer.aclose()# Register handlers per topic
room.register_text_stream_handler("chat", handle_chat)
room.register_text_stream_handler("commands", handle_commands)
room.register_byte_stream_handler("images", handle_images)
room.register_byte_stream_handler("documents", handle_documents)
# Send to appropriate topic
await local.stream_text(topic="chat") # Routes to handle_chat
await local.stream_bytes(name="pic.jpg", topic="images") # Routes to handle_imagesasync def safe_stream_handler(reader, sender):
"""Stream handler with error handling."""
try:
async for chunk in reader:
# Process chunk
process(chunk)
except Exception as e:
print(f"Stream error: {e}")
# Cleanup if needed
finally:
# Stream cleanup
passclass ProgressTracker:
"""Track stream progress."""
def __init__(self, total_size: int):
self.total_size = total_size
self.received = 0
def update(self, chunk_size: int):
"""Update progress."""
self.received += chunk_size
@property
def progress(self) -> float:
"""Get progress percentage."""
if self.total_size == 0:
return 0.0
return (self.received / self.total_size) * 100
@property
def complete(self) -> bool:
"""Check if complete."""
return self.received >= self.total_size
# Usage
async def handle_with_progress(reader: ByteStreamReader, sender: str):
if not reader.info.size:
print("Unknown size")
return
tracker = ProgressTracker(reader.info.size)
async for chunk in reader:
tracker.update(len(chunk))
print(f"Progress: {tracker.progress:.1f}%")
if tracker.progress % 10 < 1: # Every 10%
print(f"Milestone: {tracker.progress:.0f}%")import os
async def receive_file(reader: ByteStreamReader, output_dir: str):
"""Receive file and save to disk."""
output_path = os.path.join(output_dir, reader.info.name)
print(f"Receiving: {reader.info.name}")
print(f" Size: {reader.info.size} bytes")
print(f" MIME: {reader.info.mime_type}")
with open(output_path, "wb") as f:
async for chunk in reader:
f.write(chunk)
if reader.info.size:
progress = f.tell() / reader.info.size * 100
print(f" Progress: {progress:.1f}%", end='\r')
print(f"\nSaved to: {output_path}")
return output_path