or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

transcript-processors.mddocs/processors/

Transcript Processors

Transcript processors convert speech and text frames into structured conversation transcripts with timestamps. These processors enable conversation history tracking, analysis, and logging.

Base Transcript Processor

BaseTranscriptProcessor

{ .api }
from pipecat.processors.transcript_processor import BaseTranscriptProcessor

class BaseTranscriptProcessor(FrameProcessor):
    """Base class for processing conversation transcripts.

    Provides common functionality for handling transcript messages and updates.

    Event Handlers:
        on_transcript_update: Emitted when new transcript messages are available

    Features:
        - Timestamped message tracking
        - Event-driven updates
        - Message history storage

    Example:
        class CustomTranscriptProcessor(BaseTranscriptProcessor):
            async def process_frame(self, frame, direction):
                # Custom processing
                messages = [TranscriptionMessage(...)]
                await self._emit_update(messages)
    """

    def __init__(self, **kwargs):
        """Initialize processor with empty message store.

        Args:
            **kwargs: Additional arguments passed to parent class
        """
        pass

    async def _emit_update(self, messages: List[TranscriptionMessage]):
        """Emit transcript updates for new messages.

        Args:
            messages: New messages to emit in update
        """
        pass

User Transcript Processor

UserTranscriptProcessor

{ .api }
from pipecat.processors.transcript_processor import UserTranscriptProcessor

class UserTranscriptProcessor(BaseTranscriptProcessor):
    """Processes user transcription frames into timestamped conversation messages.

    Converts incoming TranscriptionFrame objects from STT services into
    structured TranscriptionMessage objects with timestamps and user IDs.

    Frames Consumed:
        - TranscriptionFrame: User speech transcriptions from STT

    Frames Produced:
        - TranscriptionUpdateFrame: Contains new user messages

    Example:
        from pipecat.processors.transcript_processor import UserTranscriptProcessor

        user_transcript = UserTranscriptProcessor()

        @user_transcript.event_handler("on_transcript_update")
        async def handle_update(processor, frame):
            for msg in frame.messages:
                print(f"[{msg.timestamp}] {msg.role}: {msg.content}")

        pipeline = Pipeline([
            transport.input(),
            stt,
            user_transcript,  # Track user speech
            context_aggregator.user(),
            llm,
            tts,
            transport.output()
        ])
    """

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process TranscriptionFrames into user conversation messages.

        Args:
            frame: Input frame to process
            direction: Frame processing direction
        """
        pass

Assistant Transcript Processor

AssistantTranscriptProcessor

{ .api }
from pipecat.processors.transcript_processor import AssistantTranscriptProcessor

class AssistantTranscriptProcessor(BaseTranscriptProcessor):
    """Processes assistant TTS text frames and LLM thought frames into timestamped messages.

    This processor aggregates both TTS text frames and LLM thought frames into
    complete utterances and thoughts, emitting them as transcript messages.

    Aggregation Behavior:

    An assistant utterance is completed when:
        - The bot stops speaking (BotStoppedSpeakingFrame)
        - The bot is interrupted (InterruptionFrame)
        - The pipeline ends (EndFrame, CancelFrame)

    A thought is completed when:
        - The thought ends (LLMThoughtEndFrame)
        - The bot is interrupted (InterruptionFrame)
        - The pipeline ends (EndFrame, CancelFrame)

    Frames Consumed:
        - TTSTextFrame: Text being sent to TTS
        - LLMThoughtStartFrame: Start of LLM thought
        - LLMThoughtTextFrame: LLM thought text chunks
        - LLMThoughtEndFrame: End of LLM thought
        - BotStoppedSpeakingFrame: Bot finished speaking
        - InterruptionFrame: Bot was interrupted
        - EndFrame/CancelFrame: Pipeline ending

    Frames Produced:
        - TranscriptionUpdateFrame: Contains assistant messages
        - ThoughtTranscriptionMessage: LLM thought content (if enabled)

    Example:
        from pipecat.processors.transcript_processor import AssistantTranscriptProcessor

        assistant_transcript = AssistantTranscriptProcessor(
            process_thoughts=True  # Enable thought tracking
        )

        @assistant_transcript.event_handler("on_transcript_update")
        async def handle_update(processor, frame):
            for msg in frame.messages:
                if isinstance(msg, ThoughtTranscriptionMessage):
                    print(f"[THOUGHT] {msg.content}")
                else:
                    print(f"[ASSISTANT] {msg.content}")

        pipeline = Pipeline([
            transport.input(),
            stt,
            context_aggregator.user(),
            llm,
            tts,
            transport.output(),
            assistant_transcript  # Track assistant speech
        ])
    """

    def __init__(self, *, process_thoughts: bool = False, **kwargs):
        """Initialize processor with aggregation state.

        Args:
            process_thoughts: Whether to process LLM thought frames. Defaults to False
            **kwargs: Additional arguments passed to parent class
        """
        pass

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process frames into assistant conversation messages and thought messages.

        Handles different frame types:
            - TTSTextFrame: Aggregates text for current utterance
            - LLMThoughtStartFrame: Begins aggregating a new thought
            - LLMThoughtTextFrame: Aggregates text for current thought
            - LLMThoughtEndFrame: Completes current thought
            - BotStoppedSpeakingFrame: Completes current utterance
            - InterruptionFrame: Completes current utterance and thought due to interruption
            - EndFrame: Completes current utterance and thought at pipeline end
            - CancelFrame: Completes current utterance and thought due to cancellation

        Args:
            frame: Input frame to process
            direction: Frame processing direction
        """
        pass

Transcript Processor Factory

TranscriptProcessor

{ .api }
from pipecat.processors.transcript_processor import TranscriptProcessor

class TranscriptProcessor:
    """Factory for creating and managing transcript processors.

    Provides unified access to user and assistant transcript processors
    with shared event handling. The assistant processor handles both TTS text
    and LLM thought frames.

    .. deprecated:: 0.0.99
        `TranscriptProcessor` is deprecated and will be removed in a future version.
        Use `LLMUserAggregator`'s and `LLMAssistantAggregator`'s new events instead.

    Features:
        - Unified event handling for both processors
        - Shared configuration
        - Simplified pipeline integration

    Example:
        from pipecat.processors.transcript_processor import TranscriptProcessor

        transcript = TranscriptProcessor(process_thoughts=True)

        pipeline = Pipeline([
            transport.input(),
            stt,
            transcript.user(),              # User transcripts
            context_aggregator.user(),
            llm,
            tts,
            transport.output(),
            transcript.assistant(),         # Assistant transcripts (including thoughts)
            context_aggregator.assistant(),
        ])

        @transcript.event_handler("on_transcript_update")
        async def handle_update(processor, frame):
            for msg in frame.messages:
                print(f"[{msg.timestamp}] {msg.role}: {msg.content}")
                # Save to database, log, etc.
    """

    def __init__(self, *, process_thoughts: bool = False):
        """Initialize factory.

        Args:
            process_thoughts: Whether the assistant processor should handle LLM thought
                frames. Defaults to False
        """
        pass

    def user(self, **kwargs) -> UserTranscriptProcessor:
        """Get the user transcript processor.

        Args:
            **kwargs: Arguments specific to UserTranscriptProcessor

        Returns:
            The user transcript processor instance
        """
        pass

    def assistant(self, **kwargs) -> AssistantTranscriptProcessor:
        """Get the assistant transcript processor.

        Args:
            **kwargs: Arguments specific to AssistantTranscriptProcessor

        Returns:
            The assistant transcript processor instance
        """
        pass

    def event_handler(self, event_name: str):
        """Register event handler for both processors.

        Args:
            event_name: Name of event to handle

        Returns:
            Decorator function that registers handler with both processors
        """
        pass

Transcript Message Types

TranscriptionMessage

{ .api }
from pipecat.frames.frames import TranscriptionMessage

@dataclass
class TranscriptionMessage:
    """A timestamped conversation message.

    Parameters:
        role: Message role ("user" or "assistant")
        content: Message text content
        timestamp: ISO 8601 timestamp
        user_id: Optional user identifier
    """

    role: str
    content: str
    timestamp: str
    user_id: Optional[str] = None

ThoughtTranscriptionMessage

{ .api }
from pipecat.frames.frames import ThoughtTranscriptionMessage

@dataclass
class ThoughtTranscriptionMessage:
    """A timestamped LLM thought message.

    Represents internal reasoning or thought process from the LLM
    that is not directly spoken to the user.

    Parameters:
        content: Thought text content
        timestamp: ISO 8601 timestamp
    """

    content: str
    timestamp: str

TranscriptionUpdateFrame

{ .api }
from pipecat.frames.frames import TranscriptionUpdateFrame

class TranscriptionUpdateFrame(DataFrame):
    """Frame containing new transcript messages.

    Emitted by transcript processors when new messages are available.

    Parameters:
        messages: List of new TranscriptionMessage or ThoughtTranscriptionMessage objects
    """

    def __init__(self, messages: List[Union[TranscriptionMessage, ThoughtTranscriptionMessage]]):
        pass

Usage Patterns

Basic Transcript Tracking

{ .api }
from pipecat.processors.transcript_processor import (
    UserTranscriptProcessor,
    AssistantTranscriptProcessor
)

# Create processors
user_transcript = UserTranscriptProcessor()
assistant_transcript = AssistantTranscriptProcessor()

# Handle updates
@user_transcript.event_handler("on_transcript_update")
async def handle_user_update(processor, frame):
    for msg in frame.messages:
        print(f"User: {msg.content}")

@assistant_transcript.event_handler("on_transcript_update")
async def handle_assistant_update(processor, frame):
    for msg in frame.messages:
        print(f"Assistant: {msg.content}")

# Add to pipeline
pipeline = Pipeline([
    transport.input(),
    stt,
    user_transcript,
    context_aggregator.user(),
    llm,
    tts,
    transport.output(),
    assistant_transcript
])

Unified Transcript Handling

{ .api }
from pipecat.processors.transcript_processor import TranscriptProcessor

# Create unified processor
transcript = TranscriptProcessor()

# Single event handler for both
@transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
    for msg in frame.messages:
        timestamp = msg.timestamp
        role = msg.role if hasattr(msg, 'role') else 'thought'
        content = msg.content
        print(f"[{timestamp}] {role}: {content}")

# Use in pipeline
pipeline = Pipeline([
    transport.input(),
    stt,
    transcript.user(),
    context_aggregator.user(),
    llm,
    tts,
    transport.output(),
    transcript.assistant()
])

Tracking LLM Thoughts

{ .api }
from pipecat.processors.transcript_processor import AssistantTranscriptProcessor
from pipecat.frames.frames import ThoughtTranscriptionMessage

# Enable thought processing
assistant_transcript = AssistantTranscriptProcessor(
    process_thoughts=True
)

@assistant_transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
    for msg in frame.messages:
        if isinstance(msg, ThoughtTranscriptionMessage):
            # LLM internal thought
            logger.debug(f"Thought: {msg.content}")
        else:
            # Spoken utterance
            logger.info(f"Said: {msg.content}")

Saving Transcripts to Database

{ .api }
from pipecat.processors.transcript_processor import TranscriptProcessor
import asyncpg

# Database connection
db_pool = await asyncpg.create_pool("postgresql://...")

transcript = TranscriptProcessor()

@transcript.event_handler("on_transcript_update")
async def save_to_db(processor, frame):
    async with db_pool.acquire() as conn:
        for msg in frame.messages:
            await conn.execute("""
                INSERT INTO transcripts (role, content, timestamp, session_id)
                VALUES ($1, $2, $3, $4)
            """, msg.role, msg.content, msg.timestamp, session_id)

pipeline = Pipeline([
    transport.input(),
    stt,
    transcript.user(),
    context_aggregator.user(),
    llm,
    tts,
    transport.output(),
    transcript.assistant()
])

Real-Time Transcript Streaming

{ .api }
from pipecat.processors.transcript_processor import TranscriptProcessor
import asyncio

transcript = TranscriptProcessor()
transcript_queue = asyncio.Queue()

@transcript.event_handler("on_transcript_update")
async def stream_transcript(processor, frame):
    for msg in frame.messages:
        await transcript_queue.put(msg)

# Consumer coroutine
async def stream_to_client():
    while True:
        msg = await transcript_queue.get()
        await websocket.send_json({
            "role": msg.role,
            "content": msg.content,
            "timestamp": msg.timestamp
        })

Conversation Analytics

{ .api }
from pipecat.processors.transcript_processor import TranscriptProcessor
from collections import defaultdict

transcript = TranscriptProcessor()

# Track metrics
metrics = {
    "user_messages": 0,
    "assistant_messages": 0,
    "total_user_words": 0,
    "total_assistant_words": 0,
    "conversation_start": None,
}

@transcript.event_handler("on_transcript_update")
async def track_metrics(processor, frame):
    for msg in frame.messages:
        if not metrics["conversation_start"]:
            metrics["conversation_start"] = msg.timestamp

        if msg.role == "user":
            metrics["user_messages"] += 1
            metrics["total_user_words"] += len(msg.content.split())
        elif msg.role == "assistant":
            metrics["assistant_messages"] += 1
            metrics["total_assistant_words"] += len(msg.content.split())

    # Log metrics periodically
    logger.info(f"Conversation metrics: {metrics}")

Best Practices

Place Processors Correctly in Pipeline

{ .api }
# Good: Place user processor after STT
pipeline = Pipeline([
    transport.input(),
    stt,                      # Produces TranscriptionFrame
    user_transcript,          # Consumes TranscriptionFrame
    context_aggregator.user(),
    llm,
    tts,
    transport.output(),
    assistant_transcript      # Consumes TTSTextFrame
])

# Bad: Wrong order (won't capture transcripts)
pipeline = Pipeline([
    transport.input(),
    user_transcript,    # Before STT - won't see TranscriptionFrame
    stt,
    ...
])

Handle Interruptions Properly

{ .api }
# Good: Assistant processor handles interruptions
assistant_transcript = AssistantTranscriptProcessor()

@assistant_transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
    for msg in frame.messages:
        # Message will be emitted even if interrupted
        save_transcript(msg)

# The processor automatically completes aggregation on interruption

Use Timestamps for Ordering

{ .api }
# Good: Use timestamps to order messages
@transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
    # Messages already have ISO 8601 timestamps
    for msg in frame.messages:
        # Can sort by timestamp if needed
        db.insert_with_timestamp(msg.timestamp, msg)

# Bad: Don't rely on arrival order
# Messages may arrive out of order due to async processing

Enable Thoughts for Debugging

{ .api }
# Good: Enable thoughts in development
if debug_mode:
    assistant_transcript = AssistantTranscriptProcessor(
        process_thoughts=True
    )
else:
    assistant_transcript = AssistantTranscriptProcessor(
        process_thoughts=False
    )

# Good: Log thoughts separately
@assistant_transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
    for msg in frame.messages:
        if isinstance(msg, ThoughtTranscriptionMessage):
            debug_logger.log(msg.content)  # Only in debug logs
        else:
            transcript_logger.log(msg.content)  # Regular transcript

Handle Empty Content

{ .api }
# Good: Check for empty content
@transcript.event_handler("on_transcript_update")
async def handle_update(processor, frame):
    for msg in frame.messages:
        if msg.content.strip():  # Only process non-empty
            await save_message(msg)

# The processor already strips whitespace, but check for safety

Combine with Context Aggregators

{ .api }
# Good: Use transcript alongside context aggregators
from pipecat.processors.aggregators.llm_context import LLMContextAggregatorPair

transcript = TranscriptProcessor()
context = LLMContext()
aggregators = LLMContextAggregatorPair(context)

# Both track conversation but serve different purposes:
# - Transcript: Historical record with timestamps
# - Context: Active conversation state for LLM

pipeline = Pipeline([
    transport.input(),
    stt,
    transcript.user(),        # Track history
    aggregators.user(),       # Build context
    llm,
    tts,
    transport.output(),
    transcript.assistant(),   # Track history
    aggregators.assistant()   # Build context
])