or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

text-frames.mddocs/frames/

Text Frames

Text frames carry textual data through the pipeline, including LLM outputs, transcriptions, and TTS inputs.

Base Text Frame

{ .api }
from pipecat.frames.frames import TextFrame

class TextFrame(DataFrame):
    """Text content frame.

    Carries text data through the pipeline. Used by LLM services, TTS services,
    and context aggregators.

    Attributes:
        text (str): The text content
        skip_tts (Optional[bool]): Skip TTS synthesis for this text (set after init)
        includes_inter_frame_spaces (bool): Text includes spaces between frames (set after init)

    Args:
        text: The text content to carry

    Example:
        # Create a text frame
        frame = TextFrame(text="Hello, world!")

        # Set skip_tts after creation if needed
        frame.skip_tts = True

    Note: skip_tts and includes_inter_frame_spaces are fields marked with
    init=False, meaning they cannot be set during initialization. They must
    be set after the frame is created.

LLM Text Frames

LLMTextFrame

{ .api }
from pipecat.frames.frames import LLMTextFrame

class LLMTextFrame(TextFrame):
    """LLM-generated text.

    Text generated by a Large Language Model. This is the
    primary output from LLM services during streaming.

    Example:
        # Streaming LLM response
        frame = LLMTextFrame(text="Hello, ")
        frame2 = LLMTextFrame(text="how can I help you?")
    """
    pass

VisionTextFrame

{ .api }
from pipecat.frames.frames import VisionTextFrame

class VisionTextFrame(LLMTextFrame):
    """Vision service text output.

    Text generated by vision/multimodal services when
    describing images or videos.

    Example:
        # Image description
        frame = VisionTextFrame(
            text="The image shows a sunset over mountains."
        )
    """
    pass

VisionFullResponseStartFrame

{ .api }
from pipecat.frames.frames import VisionFullResponseStartFrame

class VisionFullResponseStartFrame(LLMFullResponseStartFrame):
    """Vision model response start.

    Indicates the beginning of a vision model response. Followed by
    one or more VisionTextFrames and a final VisionFullResponseEndFrame.

    Used to mark the start of a complete vision analysis response.

    Example:
        # Vision service emits this before streaming description
        await self.push_frame(VisionFullResponseStartFrame())
        # ... stream VisionTextFrame chunks ...
        await self.push_frame(VisionFullResponseEndFrame())
    """
    pass

VisionFullResponseEndFrame

{ .api }
from pipecat.frames.frames import VisionFullResponseEndFrame

class VisionFullResponseEndFrame(LLMFullResponseEndFrame):
    """Vision model response end.

    Indicates the end of a vision model response. Marks the completion
    of a vision analysis operation.

    Example:
        # Emitted by vision service after streaming complete
        await self.push_frame(VisionFullResponseEndFrame())
    """
    pass

LLMThoughtTextFrame

{ .api }
from pipecat.frames.frames import LLMThoughtTextFrame

class LLMThoughtTextFrame(DataFrame):
    """LLM thought text chunks.

    Internal reasoning or thoughts from the LLM that may or
    may not be exposed to the user. Used for chain-of-thought
    processing.

    Attributes:
        text (str): The thought text
    """

    def __init__(self, text: str):
        """Initialize thought frame.

        Args:
            text: The thought text
        """
        pass

Aggregated Text Frames

AggregatedTextFrame

{ .api }
from pipecat.frames.frames import AggregatedTextFrame, AggregationType
from enum import Enum

class AggregationType(Enum):
    """Text aggregation types.

    Attributes:
        SENTENCE: Aggregate by sentence boundaries
        WORD: Aggregate by word boundaries
    """
    SENTENCE = "sentence"
    WORD = "word"


class AggregatedTextFrame(TextFrame):
    """Aggregated text frame.

    Text that has been aggregated from multiple smaller chunks
    by an aggregator processor.

    Attributes:
        text (str): Aggregated text content
        aggregated_by (AggregationType): How text was aggregated
    """

    def __init__(
        self,
        text: str,
        aggregated_by: AggregationType,
        **kwargs
    ):
        """Initialize aggregated text frame.

        Args:
            text: Aggregated text
            aggregated_by: Aggregation type used
            **kwargs: Additional TextFrame arguments
        """
        pass

TTSTextFrame

{ .api }
from pipecat.frames.frames import TTSTextFrame

class TTSTextFrame(AggregatedTextFrame):
    """Text for TTS synthesis.

    Text that is ready for text-to-speech synthesis. This is
    typically the output of a sentence aggregator.

    Example:
        # After sentence aggregation
        frame = TTSTextFrame(
            text="This is a complete sentence.",
            aggregated_by=AggregationType.SENTENCE
        )
    """
    pass

Transcription Frames

TranscriptionFrame

{ .api }
from pipecat.frames.frames import TranscriptionFrame
from pipecat.transcriptions.language import Language

class TranscriptionFrame(TextFrame):
    """Speech transcription frame.

    Final transcription from a Speech-to-Text service.

    Attributes:
        text (str): Transcribed text
        user_id (str): User ID who spoke
        timestamp (str): ISO8601 timestamp
        language (Optional[Language]): Detected/specified language
        result (Optional[Any]): Raw STT service result
    """

    def __init__(
        self,
        text: str,
        user_id: str,
        timestamp: str,
        language: Optional[Language] = None,
        result: Optional[Any] = None
    ):
        """Initialize transcription frame.

        Args:
            text: Transcribed text
            user_id: ID of speaking user
            timestamp: ISO8601 timestamp string
            language: Language code
            result: Raw STT result object
        """
        pass

InterimTranscriptionFrame

{ .api }
from pipecat.frames.frames import InterimTranscriptionFrame

class InterimTranscriptionFrame(TextFrame):
    """Partial transcription frame.

    A text frame with interim transcription-specific data that represents
    partial results before final transcription.

    Attributes:
        text (str): The interim transcribed text
        user_id (str): Identifier for the user who spoke
        timestamp (str): When the interim transcription occurred
        language (Optional[Language]): Detected or specified language of the speech
        result (Optional[Any]): Raw result from the STT service

    Example:
        # Streaming transcription with required fields
        frame1 = InterimTranscriptionFrame(
            text="Hello",
            user_id="user1",
            timestamp="2024-01-01T12:00:00.000Z"
        )
        frame2 = InterimTranscriptionFrame(
            text="Hello world",
            user_id="user1",
            timestamp="2024-01-01T12:00:00.500Z"
        )
        frame3 = TranscriptionFrame(
            text="Hello world!",
            user_id="user1",
            timestamp="2024-01-01T12:00:01.000Z"
        )

    Note: user_id and timestamp are required fields, unlike the base TextFrame.

TranslationFrame

{ .api }
from pipecat.frames.frames import TranslationFrame

class TranslationFrame(TextFrame):
    """Translated transcription frame.

    Transcription that has been translated to another language.

    Attributes:
        text (str): Translated text
        source_language (Optional[Language]): Original language
        target_language (Optional[Language]): Target language
    """

    def __init__(
        self,
        text: str,
        source_language: Optional[Language] = None,
        target_language: Optional[Language] = None
    ):
        """Initialize translation frame.

        Args:
            text: Translated text
            source_language: Source language
            target_language: Target language
        """
        pass

Input Text Frames

InputTextRawFrame

{ .api }
from pipecat.frames.frames import InputTextRawFrame

class InputTextRawFrame(SystemFrame, TextFrame):
    """Text input from transport.

    Text received directly from the transport (e.g., user typing
    in a chat interface). SystemFrame for immediate processing.

    Example:
        # Text message from user
        frame = InputTextRawFrame(text="Hello bot")
    """
    pass

Transcription Data Structures

TranscriptionMessage

{ .api }
from pipecat.frames.frames import TranscriptionMessage

class TranscriptionMessage:
    """Transcription message data structure.

    DEPRECATED: Use LLMUserAggregator's and LLMAssistantAggregator's new events instead.

    .. deprecated:: 0.0.99
        Use LLMUserAggregator's and LLMAssistantAggregator's new events instead.

    A message in a conversation transcript containing the role and content.
    Messages are in standard format with roles normalized to user/assistant.

    Attributes:
        role (Literal["user", "assistant"]): The role of the message sender
        content (str): The message content/text
        user_id (Optional[str]): Optional identifier for the user
        timestamp (Optional[str]): Optional timestamp when message was created

    Example:
        msg = TranscriptionMessage(
            role="user",
            content="Hello, how are you?",
            user_id="user123",
            timestamp="2024-01-01T12:00:00Z"
        )
    """
    pass

ThoughtTranscriptionMessage

{ .api }
from pipecat.frames.frames import ThoughtTranscriptionMessage

class ThoughtTranscriptionMessage:
    """LLM thought message data structure.

    DEPRECATED: Use LLMAssistantAggregator's new events instead.

    .. deprecated:: 0.0.99
        Use LLMAssistantAggregator's new events instead.

    An LLM thought message in a conversation transcript.

    Attributes:
        role (Literal["assistant"]): Always "assistant"
        content (str): The thought content
        timestamp (Optional[str]): Optional timestamp when thought was created

    Example:
        thought = ThoughtTranscriptionMessage(
            content="Let me think about this...",
            timestamp="2024-01-01T12:00:00Z"
        )
    """
    pass

Deprecated Frames

TranscriptionUpdateFrame

{ .api }
from pipecat.frames.frames import TranscriptionUpdateFrame

class TranscriptionUpdateFrame(DataFrame):
    """DEPRECATED: Transcription update.

    Deprecated transcription frame. Use TranscriptionFrame instead.

    .. deprecated:: 0.0.90
        Use TranscriptionFrame instead.

    A frame containing new messages added to the conversation transcript.
    This frame is emitted when new messages are added to the conversation history,
    containing only the newly added messages rather than the full transcript.

    Attributes:
        messages (List[TranscriptionMessage | ThoughtTranscriptionMessage]): New messages
    """
    pass

Usage Patterns

Streaming LLM Text

{ .api }
from pipecat.services.llm_service import LLMService
from pipecat.frames.frames import LLMTextFrame

class StreamingLLM(LLMService):
    """Stream LLM responses as text frames."""

    async def process_frame(self, frame, direction):
        if isinstance(frame, LLMRunFrame):
            # Stream LLM response
            async for chunk in self._stream_completion():
                # Each chunk becomes a frame
                text_frame = LLMTextFrame(text=chunk)
                await self.push_frame(text_frame, direction)

        await self.push_frame(frame, direction)

Text Aggregation

{ .api }
from pipecat.processors.aggregators.sentence_aggregator import SentenceAggregator
from pipecat.frames.frames import LLMTextFrame, TTSTextFrame

# Aggregate streaming text into sentences
aggregator = SentenceAggregator()

# In pipeline:
# LLMTextFrame("Hello, ") -> aggregator
# LLMTextFrame("how are ") -> aggregator
# LLMTextFrame("you?") -> aggregator
# -> TTSTextFrame("Hello, how are you?")

Filtering Text for TTS

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TextFrame

class MarkdownTextFilter(FrameProcessor):
    """Remove markdown from text before TTS."""

    async def process_frame(self, frame, direction):
        if isinstance(frame, TextFrame) and not frame.skip_tts:
            # Remove markdown formatting
            frame.text = self._remove_markdown(frame.text)

        await self.push_frame(frame, direction)

    def _remove_markdown(self, text: str) -> str:
        """Remove markdown syntax from text."""
        import re
        # Remove bold/italic
        text = re.sub(r'\*\*?(.*?)\*\*?', r'\1', text)
        # Remove links [text](url)
        text = re.sub(r'\[(.*?)\]\(.*?\)', r'\1', text)
        return text

Transcription Processing

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TranscriptionFrame, InterimTranscriptionFrame

class TranscriptionLogger(FrameProcessor):
    """Log transcriptions."""

    async def process_frame(self, frame, direction):
        if isinstance(frame, TranscriptionFrame):
            # Final transcription
            print(f"[{frame.timestamp}] {frame.user_id}: {frame.text}")

        elif isinstance(frame, InterimTranscriptionFrame):
            # Interim transcription
            print(f"[interim] {frame.text}", end='\r')

        await self.push_frame(frame, direction)

Text Transformation

{ .api }
from pipecat.processors.frame_processor import StatelessTextTransformer
from pipecat.frames.frames import TextFrame

class UppercaseTransformer(StatelessTextTransformer):
    """Transform text to uppercase.

    Args:
        text_types: Frame types to transform
    """

    def __init__(self, text_types: List[type] = [TextFrame]):
        super().__init__(text_types=text_types)

    async def process_text(self, text: str) -> str:
        """Transform text to uppercase.

        Args:
            text: Input text

        Returns:
            Uppercase text
        """
        return text.upper()

# Usage
transformer = UppercaseTransformer(text_types=[LLMTextFrame])
# LLMTextFrame("hello") -> LLMTextFrame("HELLO")

Conditional TTS

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TextFrame

class ConditionalTTSProcessor(FrameProcessor):
    """Skip TTS for certain patterns."""

    async def process_frame(self, frame, direction):
        if isinstance(frame, TextFrame):
            # Skip TTS for commands
            if frame.text.startswith("/"):
                frame.skip_tts = True

            # Skip TTS for internal messages
            if frame.text.startswith("[internal]"):
                frame.skip_tts = True
                frame.append_to_context = False

        await self.push_frame(frame, direction)

Language Detection

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TranscriptionFrame
from pipecat.transcriptions.language import Language

class LanguageDetector(FrameProcessor):
    """Detect and set language for transcriptions."""

    async def process_frame(self, frame, direction):
        if isinstance(frame, TranscriptionFrame):
            if not frame.language:
                # Detect language
                detected = await self._detect_language(frame.text)
                frame.language = detected

            # Route based on language
            if frame.language == Language.ES:
                # Spanish - use Spanish TTS
                await self._route_to_spanish_tts(frame)
            else:
                # Default - use English TTS
                await self.push_frame(frame, direction)
        else:
            await self.push_frame(frame, direction)

Context Management

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TextFrame, LLMTextFrame

class ContextFilter(FrameProcessor):
    """Control what text is added to LLM context."""

    async def process_frame(self, frame, direction):
        if isinstance(frame, TextFrame):
            # Don't add system messages to context
            if frame.text.startswith("[SYSTEM]"):
                frame.append_to_context = False

            # Don't add error messages to context
            if "error" in frame.text.lower():
                frame.append_to_context = False

        await self.push_frame(frame, direction)

Text Buffering

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import LLMTextFrame, TextFrame

class TextBuffer(FrameProcessor):
    """Buffer text frames before processing."""

    def __init__(self, max_length: int = 100):
        super().__init__()
        self._buffer = ""
        self._max_length = max_length

    async def process_frame(self, frame, direction):
        if isinstance(frame, LLMTextFrame):
            # Add to buffer
            self._buffer += frame.text

            # Flush if buffer is full
            if len(self._buffer) >= self._max_length:
                buffered_frame = LLMTextFrame(text=self._buffer)
                await self.push_frame(buffered_frame, direction)
                self._buffer = ""
        else:
            # Flush buffer on other frames
            if self._buffer:
                buffered_frame = LLMTextFrame(text=self._buffer)
                await self.push_frame(buffered_frame, direction)
                self._buffer = ""

            await self.push_frame(frame, direction)

Best Practices

Memory Management

{ .api }
class EfficientTextProcessor(FrameProcessor):
    """Process text efficiently."""

    async def process_frame(self, frame, direction):
        if isinstance(frame, TextFrame):
            # Don't accumulate unbounded text
            if len(frame.text) > 10000:
                # Truncate or split large text
                frame.text = frame.text[:10000]

        await self.push_frame(frame, direction)

Error Handling

{ .api }
class RobustTextProcessor(FrameProcessor):
    """Handle text processing errors gracefully."""

    async def process_frame(self, frame, direction):
        if isinstance(frame, TextFrame):
            try:
                # Process text
                frame.text = await self._process_text(frame.text)
            except Exception as e:
                # Log error but continue
                logger.error(f"Text processing error: {e}")
                # Optionally add error to metadata
                frame.metadata["processing_error"] = str(e)

        await self.push_frame(frame, direction)

Unicode Handling

{ .api }
class UnicodeNormalizer(FrameProcessor):
    """Normalize unicode in text frames."""

    async def process_frame(self, frame, direction):
        if isinstance(frame, TextFrame):
            import unicodedata
            # Normalize to NFC form
            frame.text = unicodedata.normalize('NFC', frame.text)

        await self.push_frame(frame, direction)