tessl install tessl/pypi-livekit@1.0.0Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities
Transcription support for receiving and publishing speech-to-text data for audio tracks. Transcriptions can be generated by server-side services, AI agents, or published directly by clients.
Key concepts:
from livekit import Transcription, TranscriptionSegment@dataclass
class Transcription:
"""Transcription data for a track.
Contains transcribed audio segments with metadata.
Attributes:
participant_identity: Identity of participant whose audio is transcribed
Type: str
Must match participant in room
track_sid: Session ID of the audio track being transcribed
Type: str
Format: "TR_" followed by random string
Must be valid audio track SID
segments: List of transcription segments
Type: List[TranscriptionSegment]
Can be empty list
Ordered by start_time
"""
participant_identity: str
track_sid: str
segments: List[TranscriptionSegment]@dataclass
class TranscriptionSegment:
"""A segment of transcribed audio.
Represents a single transcribed phrase or sentence.
Attributes:
id: Unique segment identifier
Type: str
Used to match interim and final segments
Same ID indicates updates to same speech segment
text: Transcribed text
Type: str
UTF-8 encoded string
Empty string is valid (for silence detection)
start_time: Start time in milliseconds
Type: int
Relative to track start or absolute timestamp
Milliseconds from reference point
end_time: End time in milliseconds
Type: int
Must be >= start_time
Milliseconds from reference point
language: Language code
Type: str
BCP 47 language tag (e.g., "en", "en-US", "es-MX")
Empty string if language unknown
final: Whether this is final transcription
Type: bool
False: Interim result (may change)
True: Final result (won't change)
"""
id: str
text: str
start_time: int
end_time: int
language: str
final: boolfrom livekit import Room, TranscriptionSegment, Participant, TrackPublication
room = Room()
@room.on("transcription_received")
def on_transcription(
segments: list[TranscriptionSegment],
participant: Participant,
publication: TrackPublication
):
"""Handle received transcription.
Args:
segments: List of transcription segments (may contain multiple)
participant: Participant whose audio is transcribed
publication: Track publication being transcribed
"""
print(f"Transcription from {participant.identity} ({publication.name}):")
for segment in segments:
# Format timing
duration = segment.end_time - segment.start_time
time_range = f"[{segment.start_time}-{segment.end_time}ms] ({duration}ms)"
# Segment type
seg_type = "FINAL" if segment.final else "interim"
# Display transcription
print(f"{time_range} ({seg_type}) [{segment.language}]")
print(f" {segment.text}")
print(f" ID: {segment.id}")from livekit import Transcription, TranscriptionSegment, LocalParticipant
local: LocalParticipant = room.local_participant
# Create transcription with multiple segments
transcription = Transcription(
participant_identity=local.identity,
track_sid="TR_XXXXX", # Your audio track SID
segments=[
TranscriptionSegment(
id="seg1",
text="Hello world",
start_time=0,
end_time=1000,
language="en",
final=True
),
TranscriptionSegment(
id="seg2",
text="How are you today?",
start_time=1000,
end_time=2500,
language="en",
final=True
)
]
)
# Publish to room
await local.publish_transcription(transcription)# Interim transcription (low latency, may change)
interim_segment = TranscriptionSegment(
id="seg-123",
text="Hello wor", # Partial, may be corrected
start_time=0,
end_time=800,
language="en",
final=False # Interim
)
interim_transcription = Transcription(
participant_identity=local.identity,
track_sid=track_sid,
segments=[interim_segment]
)
await local.publish_transcription(interim_transcription)
# Later, publish final transcription with same ID
final_segment = TranscriptionSegment(
id="seg-123", # Same ID as interim
text="Hello world", # Corrected text
start_time=0,
end_time=1000,
language="en",
final=True # Final
)
final_transcription = Transcription(
participant_identity=local.identity,
track_sid=track_sid,
segments=[final_segment]
)
await local.publish_transcription(final_transcription)import asyncio
from livekit import (
Room,
Transcription,
TranscriptionSegment,
AudioSource,
LocalAudioTrack,
Participant,
TrackPublication,
)
async def main():
room = Room()
# Track interim segments
interim_segments = {}
@room.on("transcription_received")
def on_transcription(
segments: list[TranscriptionSegment],
participant: Participant,
publication: TrackPublication
):
"""Handle received transcriptions."""
print(f"\nTranscription from {participant.identity}:")
print(f"Track: {publication.name} ({publication.sid})")
for seg in segments:
# Calculate duration
duration_ms = seg.end_time - seg.start_time
if seg.final:
# Final transcription
print(f"[FINAL] {seg.start_time}-{seg.end_time}ms ({duration_ms}ms)")
print(f" Text: {seg.text}")
print(f" Lang: {seg.language}")
print(f" ID: {seg.id}")
# Remove from interim tracking
interim_segments.pop(seg.id, None)
else:
# Interim transcription
print(f"[interim] {seg.start_time}-{seg.end_time}ms ({duration_ms}ms)")
print(f" Text: {seg.text}")
print(f" Lang: {segment.language}")
# Track interim segment
interim_segments[seg.id] = seg
# Connect
await room.connect(url, token)
# Publish audio track
local = room.local_participant
source = AudioSource(48000, 1)
track = LocalAudioTrack.create_audio_track("mic", source)
await local.publish_track(track)
track_sid = list(local.track_publications.keys())[0] if local.track_publications else "TR_XXX"
# Publish transcription example
transcription = Transcription(
participant_identity=local.identity,
track_sid=track_sid,
segments=[
# First segment
TranscriptionSegment(
id="s1",
text="This is a test",
start_time=0,
end_time=1500,
language="en-US",
final=True
),
# Second segment
TranscriptionSegment(
id="s2",
text="of the transcription system",
start_time=1500,
end_time=3000,
language="en-US",
final=True
)
]
)
await local.publish_transcription(transcription)
print("Transcription published")
# Keep running
await asyncio.sleep(30)
# Cleanup
await room.disconnect()
if __name__ == "__main__":
asyncio.run(main())# Transcription with multiple languages
transcription = Transcription(
participant_identity=local.identity,
track_sid=track_sid,
segments=[
TranscriptionSegment(
id="s1",
text="Hello",
start_time=0,
end_time=500,
language="en",
final=True
),
TranscriptionSegment(
id="s2",
text="Hola",
start_time=500,
end_time=1000,
language="es",
final=True
),
TranscriptionSegment(
id="s3",
text="Bonjour",
start_time=1000,
end_time=1500,
language="fr",
final=True
)
]
)
await local.publish_transcription(transcription)class TranscriptionManager:
"""Manage interim and final transcriptions."""
def __init__(self, local_participant, track_sid):
self.local = local_participant
self.track_sid = track_sid
self.current_segment_id = 0
async def publish_interim(self, text: str, start_time: int, end_time: int, language: str = "en"):
"""Publish interim transcription."""
segment_id = f"seg-{self.current_segment_id}"
self.current_segment_id += 1
transcription = Transcription(
participant_identity=self.local.identity,
track_sid=self.track_sid,
segments=[
TranscriptionSegment(
id=segment_id,
text=text,
start_time=start_time,
end_time=end_time,
language=language,
final=False
)
]
)
await self.local.publish_transcription(transcription)
return segment_id
async def publish_final(self, segment_id: str, text: str, start_time: int, end_time: int, language: str = "en"):
"""Publish final transcription (update interim)."""
transcription = Transcription(
participant_identity=self.local.identity,
track_sid=self.track_sid,
segments=[
TranscriptionSegment(
id=segment_id,
text=text,
start_time=start_time,
end_time=end_time,
language=language,
final=True
)
]
)
await self.local.publish_transcription(transcription)
# Usage
manager = TranscriptionManager(local, track_sid)
# Publish interim
seg_id = await manager.publish_interim("Hello wor", 0, 800)
# Later, publish final
await manager.publish_final(seg_id, "Hello world", 0, 1000)# Note: TranscriptionSegment doesn't have confidence field
# Can encode confidence in text or use separate data channel
# Option 1: Include in text (for display only)
segment = TranscriptionSegment(
id="s1",
text="Hello world (95%)",
start_time=0,
end_time=1000,
language="en",
final=True
)
# Option 2: Send confidence separately via data
import json
confidence_data = {
"segment_id": "s1",
"confidence": 0.95,
"word_confidences": [
{"word": "Hello", "confidence": 0.97},
{"word": "world", "confidence": 0.93}
]
}
await local.publish_data(
json.dumps(confidence_data),
topic="transcription-confidence"
)# Good: Use consistent IDs to update segments
interim_id = "seg-123"
# Publish interim
await publish_transcription(Transcription(..., segments=[
TranscriptionSegment(id=interim_id, final=False, ...)
]))
# Update with final (same ID)
await publish_transcription(Transcription(..., segments=[
TranscriptionSegment(id=interim_id, final=True, ...)
]))@room.on("transcription_received")
def on_transcription(segments, participant, publication):
if not segments:
print("Empty transcription received")
return
for seg in segments:
if not seg.text:
print(f"Silent segment: {seg.start_time}-{seg.end_time}ms")
else:
print(f"Text: {seg.text}")def validate_segment(seg: TranscriptionSegment) -> bool:
"""Validate transcription segment."""
if seg.end_time < seg.start_time:
print(f"Invalid timing: end ({seg.end_time}) < start ({seg.start_time})")
return False
if seg.start_time < 0:
print(f"Invalid start time: {seg.start_time}")
return False
return True
# Use before publishing
if validate_segment(segment):
await publish_transcription(transcription)# Good: BCP 47 language tags
languages = [
"en", # English
"en-US", # English (US)
"en-GB", # English (UK)
"es", # Spanish
"es-MX", # Spanish (Mexico)
"fr", # French
"zh-CN", # Chinese (Simplified)
"ja", # Japanese
]
# Bad: Non-standard codes
# "english", "ENG", "en_US"# Good: Publish multiple segments together
transcription = Transcription(
participant_identity=local.identity,
track_sid=track_sid,
segments=[seg1, seg2, seg3] # Multiple segments
)
await local.publish_transcription(transcription)
# Less efficient: Publish one at a time
# for seg in segments:
# await local.publish_transcription(Transcription(..., segments=[seg]))class InterimBuffer:
"""Buffer interim transcriptions."""
def __init__(self, max_age_ms: int = 5000):
self.max_age_ms = max_age_ms
self.interim_segments = {}
def add_segment(self, seg: TranscriptionSegment):
"""Add segment to buffer."""
if seg.final:
# Remove from buffer if exists
self.interim_segments.pop(seg.id, None)
else:
# Add/update interim
self.interim_segments[seg.id] = seg
def get_text(self) -> str:
"""Get current buffered text."""
segments = sorted(
self.interim_segments.values(),
key=lambda s: s.start_time
)
return " ".join(s.text for s in segments)
def cleanup_old(self, current_time_ms: int):
"""Remove old interim segments."""
to_remove = [
seg_id for seg_id, seg in self.interim_segments.items()
if current_time_ms - seg.end_time > self.max_age_ms
]
for seg_id in to_remove:
del self.interim_segments[seg_id]
# Usage
buffer = InterimBuffer()
@room.on("transcription_received")
def on_transcription(segments, participant, publication):
for seg in segments:
buffer.add_segment(seg)
print(f"Current text: {buffer.get_text()}")class CaptionGenerator:
"""Generate captions from transcriptions."""
def __init__(self, max_chars_per_line: int = 50):
self.max_chars_per_line = max_chars_per_line
self.captions = []
def add_transcription(self, segments: list[TranscriptionSegment]):
"""Process transcription segments into captions."""
for seg in segments:
if not seg.final:
continue # Only use final segments
# Split long text into multiple lines
words = seg.text.split()
lines = []
current_line = []
current_length = 0
for word in words:
word_length = len(word) + 1 # +1 for space
if current_length + word_length > self.max_chars_per_line:
if current_line:
lines.append(" ".join(current_line))
current_line = []
current_length = 0
current_line.append(word)
current_length += word_length
if current_line:
lines.append(" ".join(current_line))
# Create caption
caption = {
"text": "\n".join(lines),
"start_time": seg.start_time,
"end_time": seg.end_time,
"language": seg.language
}
self.captions.append(caption)
def get_caption_at_time(self, time_ms: int) -> str:
"""Get caption text at specific time."""
for caption in self.captions:
if caption["start_time"] <= time_ms <= caption["end_time"]:
return caption["text"]
return ""
# Usage
generator = CaptionGenerator(max_chars_per_line=40)
@room.on("transcription_received")
def on_transcription(segments, participant, publication):
generator.add_transcription(segments)
# Display caption at current time
current_time = 1500 # Example
caption = generator.get_caption_at_time(current_time)
if caption:
print(f"Caption: {caption}")