docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Transcript processors convert speech and text frames into structured conversation transcripts with timestamps. These processors enable conversation history tracking, analysis, and logging.
{ .api }
from pipecat.processors.transcript_processor import BaseTranscriptProcessor
class BaseTranscriptProcessor(FrameProcessor):
"""Base class for processing conversation transcripts.
Provides common functionality for handling transcript messages and updates.
Event Handlers:
on_transcript_update: Emitted when new transcript messages are available
Features:
- Timestamped message tracking
- Event-driven updates
- Message history storage
Example:
class CustomTranscriptProcessor(BaseTranscriptProcessor):
async def process_frame(self, frame, direction):
# Custom processing
messages = [TranscriptionMessage(...)]
await self._emit_update(messages)
"""
def __init__(self, **kwargs):
"""Initialize processor with empty message store.
Args:
**kwargs: Additional arguments passed to parent class
"""
pass
async def _emit_update(self, messages: List[TranscriptionMessage]):
"""Emit transcript updates for new messages.
Args:
messages: New messages to emit in update
"""
pass{ .api }
from pipecat.processors.transcript_processor import UserTranscriptProcessor
class UserTranscriptProcessor(BaseTranscriptProcessor):
"""Processes user transcription frames into timestamped conversation messages.
Converts incoming TranscriptionFrame objects from STT services into
structured TranscriptionMessage objects with timestamps and user IDs.
Frames Consumed:
- TranscriptionFrame: User speech transcriptions from STT
Frames Produced:
- TranscriptionUpdateFrame: Contains new user messages
Example:
from pipecat.processors.transcript_processor import UserTranscriptProcessor
user_transcript = UserTranscriptProcessor()
@user_transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
for msg in frame.messages:
print(f"[{msg.timestamp}] {msg.role}: {msg.content}")
pipeline = Pipeline([
transport.input(),
stt,
user_transcript, # Track user speech
context_aggregator.user(),
llm,
tts,
transport.output()
])
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process TranscriptionFrames into user conversation messages.
Args:
frame: Input frame to process
direction: Frame processing direction
"""
pass{ .api }
from pipecat.processors.transcript_processor import AssistantTranscriptProcessor
class AssistantTranscriptProcessor(BaseTranscriptProcessor):
"""Processes assistant TTS text frames and LLM thought frames into timestamped messages.
This processor aggregates both TTS text frames and LLM thought frames into
complete utterances and thoughts, emitting them as transcript messages.
Aggregation Behavior:
An assistant utterance is completed when:
- The bot stops speaking (BotStoppedSpeakingFrame)
- The bot is interrupted (InterruptionFrame)
- The pipeline ends (EndFrame, CancelFrame)
A thought is completed when:
- The thought ends (LLMThoughtEndFrame)
- The bot is interrupted (InterruptionFrame)
- The pipeline ends (EndFrame, CancelFrame)
Frames Consumed:
- TTSTextFrame: Text being sent to TTS
- LLMThoughtStartFrame: Start of LLM thought
- LLMThoughtTextFrame: LLM thought text chunks
- LLMThoughtEndFrame: End of LLM thought
- BotStoppedSpeakingFrame: Bot finished speaking
- InterruptionFrame: Bot was interrupted
- EndFrame/CancelFrame: Pipeline ending
Frames Produced:
- TranscriptionUpdateFrame: Contains assistant messages
- ThoughtTranscriptionMessage: LLM thought content (if enabled)
Example:
from pipecat.processors.transcript_processor import AssistantTranscriptProcessor
assistant_transcript = AssistantTranscriptProcessor(
process_thoughts=True # Enable thought tracking
)
@assistant_transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
for msg in frame.messages:
if isinstance(msg, ThoughtTranscriptionMessage):
print(f"[THOUGHT] {msg.content}")
else:
print(f"[ASSISTANT] {msg.content}")
pipeline = Pipeline([
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
assistant_transcript # Track assistant speech
])
"""
def __init__(self, *, process_thoughts: bool = False, **kwargs):
"""Initialize processor with aggregation state.
Args:
process_thoughts: Whether to process LLM thought frames. Defaults to False
**kwargs: Additional arguments passed to parent class
"""
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames into assistant conversation messages and thought messages.
Handles different frame types:
- TTSTextFrame: Aggregates text for current utterance
- LLMThoughtStartFrame: Begins aggregating a new thought
- LLMThoughtTextFrame: Aggregates text for current thought
- LLMThoughtEndFrame: Completes current thought
- BotStoppedSpeakingFrame: Completes current utterance
- InterruptionFrame: Completes current utterance and thought due to interruption
- EndFrame: Completes current utterance and thought at pipeline end
- CancelFrame: Completes current utterance and thought due to cancellation
Args:
frame: Input frame to process
direction: Frame processing direction
"""
pass{ .api }
from pipecat.processors.transcript_processor import TranscriptProcessor
class TranscriptProcessor:
"""Factory for creating and managing transcript processors.
Provides unified access to user and assistant transcript processors
with shared event handling. The assistant processor handles both TTS text
and LLM thought frames.
.. deprecated:: 0.0.99
`TranscriptProcessor` is deprecated and will be removed in a future version.
Use `LLMUserAggregator`'s and `LLMAssistantAggregator`'s new events instead.
Features:
- Unified event handling for both processors
- Shared configuration
- Simplified pipeline integration
Example:
from pipecat.processors.transcript_processor import TranscriptProcessor
transcript = TranscriptProcessor(process_thoughts=True)
pipeline = Pipeline([
transport.input(),
stt,
transcript.user(), # User transcripts
context_aggregator.user(),
llm,
tts,
transport.output(),
transcript.assistant(), # Assistant transcripts (including thoughts)
context_aggregator.assistant(),
])
@transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
for msg in frame.messages:
print(f"[{msg.timestamp}] {msg.role}: {msg.content}")
# Save to database, log, etc.
"""
def __init__(self, *, process_thoughts: bool = False):
"""Initialize factory.
Args:
process_thoughts: Whether the assistant processor should handle LLM thought
frames. Defaults to False
"""
pass
def user(self, **kwargs) -> UserTranscriptProcessor:
"""Get the user transcript processor.
Args:
**kwargs: Arguments specific to UserTranscriptProcessor
Returns:
The user transcript processor instance
"""
pass
def assistant(self, **kwargs) -> AssistantTranscriptProcessor:
"""Get the assistant transcript processor.
Args:
**kwargs: Arguments specific to AssistantTranscriptProcessor
Returns:
The assistant transcript processor instance
"""
pass
def event_handler(self, event_name: str):
"""Register event handler for both processors.
Args:
event_name: Name of event to handle
Returns:
Decorator function that registers handler with both processors
"""
pass{ .api }
from pipecat.frames.frames import TranscriptionMessage
@dataclass
class TranscriptionMessage:
"""A timestamped conversation message.
Parameters:
role: Message role ("user" or "assistant")
content: Message text content
timestamp: ISO 8601 timestamp
user_id: Optional user identifier
"""
role: str
content: str
timestamp: str
user_id: Optional[str] = None{ .api }
from pipecat.frames.frames import ThoughtTranscriptionMessage
@dataclass
class ThoughtTranscriptionMessage:
"""A timestamped LLM thought message.
Represents internal reasoning or thought process from the LLM
that is not directly spoken to the user.
Parameters:
content: Thought text content
timestamp: ISO 8601 timestamp
"""
content: str
timestamp: str{ .api }
from pipecat.frames.frames import TranscriptionUpdateFrame
class TranscriptionUpdateFrame(DataFrame):
"""Frame containing new transcript messages.
Emitted by transcript processors when new messages are available.
Parameters:
messages: List of new TranscriptionMessage or ThoughtTranscriptionMessage objects
"""
def __init__(self, messages: List[Union[TranscriptionMessage, ThoughtTranscriptionMessage]]):
pass{ .api }
from pipecat.processors.transcript_processor import (
UserTranscriptProcessor,
AssistantTranscriptProcessor
)
# Create processors
user_transcript = UserTranscriptProcessor()
assistant_transcript = AssistantTranscriptProcessor()
# Handle updates
@user_transcript.event_handler("on_transcript_update")
async def handle_user_update(processor, frame):
for msg in frame.messages:
print(f"User: {msg.content}")
@assistant_transcript.event_handler("on_transcript_update")
async def handle_assistant_update(processor, frame):
for msg in frame.messages:
print(f"Assistant: {msg.content}")
# Add to pipeline
pipeline = Pipeline([
transport.input(),
stt,
user_transcript,
context_aggregator.user(),
llm,
tts,
transport.output(),
assistant_transcript
]){ .api }
from pipecat.processors.transcript_processor import TranscriptProcessor
# Create unified processor
transcript = TranscriptProcessor()
# Single event handler for both
@transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
for msg in frame.messages:
timestamp = msg.timestamp
role = msg.role if hasattr(msg, 'role') else 'thought'
content = msg.content
print(f"[{timestamp}] {role}: {content}")
# Use in pipeline
pipeline = Pipeline([
transport.input(),
stt,
transcript.user(),
context_aggregator.user(),
llm,
tts,
transport.output(),
transcript.assistant()
]){ .api }
from pipecat.processors.transcript_processor import AssistantTranscriptProcessor
from pipecat.frames.frames import ThoughtTranscriptionMessage
# Enable thought processing
assistant_transcript = AssistantTranscriptProcessor(
process_thoughts=True
)
@assistant_transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
for msg in frame.messages:
if isinstance(msg, ThoughtTranscriptionMessage):
# LLM internal thought
logger.debug(f"Thought: {msg.content}")
else:
# Spoken utterance
logger.info(f"Said: {msg.content}"){ .api }
from pipecat.processors.transcript_processor import TranscriptProcessor
import asyncpg
# Database connection
db_pool = await asyncpg.create_pool("postgresql://...")
transcript = TranscriptProcessor()
@transcript.event_handler("on_transcript_update")
async def save_to_db(processor, frame):
async with db_pool.acquire() as conn:
for msg in frame.messages:
await conn.execute("""
INSERT INTO transcripts (role, content, timestamp, session_id)
VALUES ($1, $2, $3, $4)
""", msg.role, msg.content, msg.timestamp, session_id)
pipeline = Pipeline([
transport.input(),
stt,
transcript.user(),
context_aggregator.user(),
llm,
tts,
transport.output(),
transcript.assistant()
]){ .api }
from pipecat.processors.transcript_processor import TranscriptProcessor
import asyncio
transcript = TranscriptProcessor()
transcript_queue = asyncio.Queue()
@transcript.event_handler("on_transcript_update")
async def stream_transcript(processor, frame):
for msg in frame.messages:
await transcript_queue.put(msg)
# Consumer coroutine
async def stream_to_client():
while True:
msg = await transcript_queue.get()
await websocket.send_json({
"role": msg.role,
"content": msg.content,
"timestamp": msg.timestamp
}){ .api }
from pipecat.processors.transcript_processor import TranscriptProcessor
from collections import defaultdict
transcript = TranscriptProcessor()
# Track metrics
metrics = {
"user_messages": 0,
"assistant_messages": 0,
"total_user_words": 0,
"total_assistant_words": 0,
"conversation_start": None,
}
@transcript.event_handler("on_transcript_update")
async def track_metrics(processor, frame):
for msg in frame.messages:
if not metrics["conversation_start"]:
metrics["conversation_start"] = msg.timestamp
if msg.role == "user":
metrics["user_messages"] += 1
metrics["total_user_words"] += len(msg.content.split())
elif msg.role == "assistant":
metrics["assistant_messages"] += 1
metrics["total_assistant_words"] += len(msg.content.split())
# Log metrics periodically
logger.info(f"Conversation metrics: {metrics}"){ .api }
# Good: Place user processor after STT
pipeline = Pipeline([
transport.input(),
stt, # Produces TranscriptionFrame
user_transcript, # Consumes TranscriptionFrame
context_aggregator.user(),
llm,
tts,
transport.output(),
assistant_transcript # Consumes TTSTextFrame
])
# Bad: Wrong order (won't capture transcripts)
pipeline = Pipeline([
transport.input(),
user_transcript, # Before STT - won't see TranscriptionFrame
stt,
...
]){ .api }
# Good: Assistant processor handles interruptions
assistant_transcript = AssistantTranscriptProcessor()
@assistant_transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
for msg in frame.messages:
# Message will be emitted even if interrupted
save_transcript(msg)
# The processor automatically completes aggregation on interruption{ .api }
# Good: Use timestamps to order messages
@transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
# Messages already have ISO 8601 timestamps
for msg in frame.messages:
# Can sort by timestamp if needed
db.insert_with_timestamp(msg.timestamp, msg)
# Bad: Don't rely on arrival order
# Messages may arrive out of order due to async processing{ .api }
# Good: Enable thoughts in development
if debug_mode:
assistant_transcript = AssistantTranscriptProcessor(
process_thoughts=True
)
else:
assistant_transcript = AssistantTranscriptProcessor(
process_thoughts=False
)
# Good: Log thoughts separately
@assistant_transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
for msg in frame.messages:
if isinstance(msg, ThoughtTranscriptionMessage):
debug_logger.log(msg.content) # Only in debug logs
else:
transcript_logger.log(msg.content) # Regular transcript{ .api }
# Good: Check for empty content
@transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
for msg in frame.messages:
if msg.content.strip(): # Only process non-empty
await save_message(msg)
# The processor already strips whitespace, but check for safety{ .api }
# Good: Use transcript alongside context aggregators
from pipecat.processors.aggregators.llm_context import LLMContextAggregatorPair
transcript = TranscriptProcessor()
context = LLMContext()
aggregators = LLMContextAggregatorPair(context)
# Both track conversation but serve different purposes:
# - Transcript: Historical record with timestamps
# - Context: Active conversation state for LLM
pipeline = Pipeline([
transport.input(),
stt,
transcript.user(), # Track history
aggregators.user(), # Build context
llm,
tts,
transport.output(),
transcript.assistant(), # Track history
aggregators.assistant() # Build context
])