docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Text frames carry textual data through the pipeline, including LLM outputs, transcriptions, and TTS inputs.
{ .api }
from pipecat.frames.frames import TextFrame
class TextFrame(DataFrame):
"""Text content frame.
Carries text data through the pipeline. Used by LLM services, TTS services,
and context aggregators.
Attributes:
text (str): The text content
skip_tts (Optional[bool]): Skip TTS synthesis for this text (set after init)
includes_inter_frame_spaces (bool): Text includes spaces between frames (set after init)
Args:
text: The text content to carry
Example:
# Create a text frame
frame = TextFrame(text="Hello, world!")
# Set skip_tts after creation if needed
frame.skip_tts = True
Note: skip_tts and includes_inter_frame_spaces are fields marked with
init=False, meaning they cannot be set during initialization. They must
be set after the frame is created.{ .api }
from pipecat.frames.frames import LLMTextFrame
class LLMTextFrame(TextFrame):
"""LLM-generated text.
Text generated by a Large Language Model. This is the
primary output from LLM services during streaming.
Example:
# Streaming LLM response
frame = LLMTextFrame(text="Hello, ")
frame2 = LLMTextFrame(text="how can I help you?")
"""
pass{ .api }
from pipecat.frames.frames import VisionTextFrame
class VisionTextFrame(LLMTextFrame):
"""Vision service text output.
Text generated by vision/multimodal services when
describing images or videos.
Example:
# Image description
frame = VisionTextFrame(
text="The image shows a sunset over mountains."
)
"""
pass{ .api }
from pipecat.frames.frames import VisionFullResponseStartFrame
class VisionFullResponseStartFrame(LLMFullResponseStartFrame):
"""Vision model response start.
Indicates the beginning of a vision model response. Followed by
one or more VisionTextFrames and a final VisionFullResponseEndFrame.
Used to mark the start of a complete vision analysis response.
Example:
# Vision service emits this before streaming description
await self.push_frame(VisionFullResponseStartFrame())
# ... stream VisionTextFrame chunks ...
await self.push_frame(VisionFullResponseEndFrame())
"""
pass{ .api }
from pipecat.frames.frames import VisionFullResponseEndFrame
class VisionFullResponseEndFrame(LLMFullResponseEndFrame):
"""Vision model response end.
Indicates the end of a vision model response. Marks the completion
of a vision analysis operation.
Example:
# Emitted by vision service after streaming complete
await self.push_frame(VisionFullResponseEndFrame())
"""
pass{ .api }
from pipecat.frames.frames import LLMThoughtTextFrame
class LLMThoughtTextFrame(DataFrame):
"""LLM thought text chunks.
Internal reasoning or thoughts from the LLM that may or
may not be exposed to the user. Used for chain-of-thought
processing.
Attributes:
text (str): The thought text
"""
def __init__(self, text: str):
"""Initialize thought frame.
Args:
text: The thought text
"""
pass{ .api }
from pipecat.frames.frames import AggregatedTextFrame, AggregationType
from enum import Enum
class AggregationType(Enum):
"""Text aggregation types.
Attributes:
SENTENCE: Aggregate by sentence boundaries
WORD: Aggregate by word boundaries
"""
SENTENCE = "sentence"
WORD = "word"
class AggregatedTextFrame(TextFrame):
"""Aggregated text frame.
Text that has been aggregated from multiple smaller chunks
by an aggregator processor.
Attributes:
text (str): Aggregated text content
aggregated_by (AggregationType): How text was aggregated
"""
def __init__(
self,
text: str,
aggregated_by: AggregationType,
**kwargs
):
"""Initialize aggregated text frame.
Args:
text: Aggregated text
aggregated_by: Aggregation type used
**kwargs: Additional TextFrame arguments
"""
pass{ .api }
from pipecat.frames.frames import TTSTextFrame
class TTSTextFrame(AggregatedTextFrame):
"""Text for TTS synthesis.
Text that is ready for text-to-speech synthesis. This is
typically the output of a sentence aggregator.
Example:
# After sentence aggregation
frame = TTSTextFrame(
text="This is a complete sentence.",
aggregated_by=AggregationType.SENTENCE
)
"""
pass{ .api }
from pipecat.frames.frames import TranscriptionFrame
from pipecat.transcriptions.language import Language
class TranscriptionFrame(TextFrame):
"""Speech transcription frame.
Final transcription from a Speech-to-Text service.
Attributes:
text (str): Transcribed text
user_id (str): User ID who spoke
timestamp (str): ISO8601 timestamp
language (Optional[Language]): Detected/specified language
result (Optional[Any]): Raw STT service result
"""
def __init__(
self,
text: str,
user_id: str,
timestamp: str,
language: Optional[Language] = None,
result: Optional[Any] = None
):
"""Initialize transcription frame.
Args:
text: Transcribed text
user_id: ID of speaking user
timestamp: ISO8601 timestamp string
language: Language code
result: Raw STT result object
"""
pass{ .api }
from pipecat.frames.frames import InterimTranscriptionFrame
class InterimTranscriptionFrame(TextFrame):
"""Partial transcription frame.
A text frame with interim transcription-specific data that represents
partial results before final transcription.
Attributes:
text (str): The interim transcribed text
user_id (str): Identifier for the user who spoke
timestamp (str): When the interim transcription occurred
language (Optional[Language]): Detected or specified language of the speech
result (Optional[Any]): Raw result from the STT service
Example:
# Streaming transcription with required fields
frame1 = InterimTranscriptionFrame(
text="Hello",
user_id="user1",
timestamp="2024-01-01T12:00:00.000Z"
)
frame2 = InterimTranscriptionFrame(
text="Hello world",
user_id="user1",
timestamp="2024-01-01T12:00:00.500Z"
)
frame3 = TranscriptionFrame(
text="Hello world!",
user_id="user1",
timestamp="2024-01-01T12:00:01.000Z"
)
Note: user_id and timestamp are required fields, unlike the base TextFrame.{ .api }
from pipecat.frames.frames import TranslationFrame
class TranslationFrame(TextFrame):
"""Translated transcription frame.
Transcription that has been translated to another language.
Attributes:
text (str): Translated text
source_language (Optional[Language]): Original language
target_language (Optional[Language]): Target language
"""
def __init__(
self,
text: str,
source_language: Optional[Language] = None,
target_language: Optional[Language] = None
):
"""Initialize translation frame.
Args:
text: Translated text
source_language: Source language
target_language: Target language
"""
pass{ .api }
from pipecat.frames.frames import InputTextRawFrame
class InputTextRawFrame(SystemFrame, TextFrame):
"""Text input from transport.
Text received directly from the transport (e.g., user typing
in a chat interface). SystemFrame for immediate processing.
Example:
# Text message from user
frame = InputTextRawFrame(text="Hello bot")
"""
pass{ .api }
from pipecat.frames.frames import TranscriptionMessage
class TranscriptionMessage:
"""Transcription message data structure.
DEPRECATED: Use LLMUserAggregator's and LLMAssistantAggregator's new events instead.
.. deprecated:: 0.0.99
Use LLMUserAggregator's and LLMAssistantAggregator's new events instead.
A message in a conversation transcript containing the role and content.
Messages are in standard format with roles normalized to user/assistant.
Attributes:
role (Literal["user", "assistant"]): The role of the message sender
content (str): The message content/text
user_id (Optional[str]): Optional identifier for the user
timestamp (Optional[str]): Optional timestamp when message was created
Example:
msg = TranscriptionMessage(
role="user",
content="Hello, how are you?",
user_id="user123",
timestamp="2024-01-01T12:00:00Z"
)
"""
pass{ .api }
from pipecat.frames.frames import ThoughtTranscriptionMessage
class ThoughtTranscriptionMessage:
"""LLM thought message data structure.
DEPRECATED: Use LLMAssistantAggregator's new events instead.
.. deprecated:: 0.0.99
Use LLMAssistantAggregator's new events instead.
An LLM thought message in a conversation transcript.
Attributes:
role (Literal["assistant"]): Always "assistant"
content (str): The thought content
timestamp (Optional[str]): Optional timestamp when thought was created
Example:
thought = ThoughtTranscriptionMessage(
content="Let me think about this...",
timestamp="2024-01-01T12:00:00Z"
)
"""
pass{ .api }
from pipecat.frames.frames import TranscriptionUpdateFrame
class TranscriptionUpdateFrame(DataFrame):
"""DEPRECATED: Transcription update.
Deprecated transcription frame. Use TranscriptionFrame instead.
.. deprecated:: 0.0.90
Use TranscriptionFrame instead.
A frame containing new messages added to the conversation transcript.
This frame is emitted when new messages are added to the conversation history,
containing only the newly added messages rather than the full transcript.
Attributes:
messages (List[TranscriptionMessage | ThoughtTranscriptionMessage]): New messages
"""
pass{ .api }
from pipecat.services.llm_service import LLMService
from pipecat.frames.frames import LLMTextFrame
class StreamingLLM(LLMService):
"""Stream LLM responses as text frames."""
async def process_frame(self, frame, direction):
if isinstance(frame, LLMRunFrame):
# Stream LLM response
async for chunk in self._stream_completion():
# Each chunk becomes a frame
text_frame = LLMTextFrame(text=chunk)
await self.push_frame(text_frame, direction)
await self.push_frame(frame, direction){ .api }
from pipecat.processors.aggregators.sentence_aggregator import SentenceAggregator
from pipecat.frames.frames import LLMTextFrame, TTSTextFrame
# Aggregate streaming text into sentences
aggregator = SentenceAggregator()
# In pipeline:
# LLMTextFrame("Hello, ") -> aggregator
# LLMTextFrame("how are ") -> aggregator
# LLMTextFrame("you?") -> aggregator
# -> TTSTextFrame("Hello, how are you?"){ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TextFrame
class MarkdownTextFilter(FrameProcessor):
"""Remove markdown from text before TTS."""
async def process_frame(self, frame, direction):
if isinstance(frame, TextFrame) and not frame.skip_tts:
# Remove markdown formatting
frame.text = self._remove_markdown(frame.text)
await self.push_frame(frame, direction)
def _remove_markdown(self, text: str) -> str:
"""Remove markdown syntax from text."""
import re
# Remove bold/italic
text = re.sub(r'\*\*?(.*?)\*\*?', r'\1', text)
# Remove links [text](url)
text = re.sub(r'\[(.*?)\]\(.*?\)', r'\1', text)
return text{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TranscriptionFrame, InterimTranscriptionFrame
class TranscriptionLogger(FrameProcessor):
"""Log transcriptions."""
async def process_frame(self, frame, direction):
if isinstance(frame, TranscriptionFrame):
# Final transcription
print(f"[{frame.timestamp}] {frame.user_id}: {frame.text}")
elif isinstance(frame, InterimTranscriptionFrame):
# Interim transcription
print(f"[interim] {frame.text}", end='\r')
await self.push_frame(frame, direction){ .api }
from pipecat.processors.frame_processor import StatelessTextTransformer
from pipecat.frames.frames import TextFrame
class UppercaseTransformer(StatelessTextTransformer):
"""Transform text to uppercase.
Args:
text_types: Frame types to transform
"""
def __init__(self, text_types: List[type] = [TextFrame]):
super().__init__(text_types=text_types)
async def process_text(self, text: str) -> str:
"""Transform text to uppercase.
Args:
text: Input text
Returns:
Uppercase text
"""
return text.upper()
# Usage
transformer = UppercaseTransformer(text_types=[LLMTextFrame])
# LLMTextFrame("hello") -> LLMTextFrame("HELLO"){ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TextFrame
class ConditionalTTSProcessor(FrameProcessor):
"""Skip TTS for certain patterns."""
async def process_frame(self, frame, direction):
if isinstance(frame, TextFrame):
# Skip TTS for commands
if frame.text.startswith("/"):
frame.skip_tts = True
# Skip TTS for internal messages
if frame.text.startswith("[internal]"):
frame.skip_tts = True
frame.append_to_context = False
await self.push_frame(frame, direction){ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TranscriptionFrame
from pipecat.transcriptions.language import Language
class LanguageDetector(FrameProcessor):
"""Detect and set language for transcriptions."""
async def process_frame(self, frame, direction):
if isinstance(frame, TranscriptionFrame):
if not frame.language:
# Detect language
detected = await self._detect_language(frame.text)
frame.language = detected
# Route based on language
if frame.language == Language.ES:
# Spanish - use Spanish TTS
await self._route_to_spanish_tts(frame)
else:
# Default - use English TTS
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction){ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TextFrame, LLMTextFrame
class ContextFilter(FrameProcessor):
"""Control what text is added to LLM context."""
async def process_frame(self, frame, direction):
if isinstance(frame, TextFrame):
# Don't add system messages to context
if frame.text.startswith("[SYSTEM]"):
frame.append_to_context = False
# Don't add error messages to context
if "error" in frame.text.lower():
frame.append_to_context = False
await self.push_frame(frame, direction){ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import LLMTextFrame, TextFrame
class TextBuffer(FrameProcessor):
"""Buffer text frames before processing."""
def __init__(self, max_length: int = 100):
super().__init__()
self._buffer = ""
self._max_length = max_length
async def process_frame(self, frame, direction):
if isinstance(frame, LLMTextFrame):
# Add to buffer
self._buffer += frame.text
# Flush if buffer is full
if len(self._buffer) >= self._max_length:
buffered_frame = LLMTextFrame(text=self._buffer)
await self.push_frame(buffered_frame, direction)
self._buffer = ""
else:
# Flush buffer on other frames
if self._buffer:
buffered_frame = LLMTextFrame(text=self._buffer)
await self.push_frame(buffered_frame, direction)
self._buffer = ""
await self.push_frame(frame, direction){ .api }
class EfficientTextProcessor(FrameProcessor):
"""Process text efficiently."""
async def process_frame(self, frame, direction):
if isinstance(frame, TextFrame):
# Don't accumulate unbounded text
if len(frame.text) > 10000:
# Truncate or split large text
frame.text = frame.text[:10000]
await self.push_frame(frame, direction){ .api }
class RobustTextProcessor(FrameProcessor):
"""Handle text processing errors gracefully."""
async def process_frame(self, frame, direction):
if isinstance(frame, TextFrame):
try:
# Process text
frame.text = await self._process_text(frame.text)
except Exception as e:
# Log error but continue
logger.error(f"Text processing error: {e}")
# Optionally add error to metadata
frame.metadata["processing_error"] = str(e)
await self.push_frame(frame, direction){ .api }
class UnicodeNormalizer(FrameProcessor):
"""Normalize unicode in text frames."""
async def process_frame(self, frame, direction):
if isinstance(frame, TextFrame):
import unicodedata
# Normalize to NFC form
frame.text = unicodedata.normalize('NFC', frame.text)
await self.push_frame(frame, direction)