or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

observers.mddocs/utilities/

Observers

Observers provide monitoring and logging infrastructure for Pipecat pipelines. They track frame flow, processor activity, and system events without blocking pipeline execution.

Base Observer

BaseObserver

{ .api }
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed

class BaseObserver:
    """Abstract base class for all observers.

    Observers monitor pipeline activity and can track:
    - Frame flow through processors
    - Frame processing events
    - Performance metrics
    - System state changes

    Key Methods:
        on_push_frame(): Called when frame is pushed
        on_process_frame(): Called when frame is processed
    """

    async def on_push_frame(self, frame_pushed: FramePushed):
        """Called when a frame is pushed through the pipeline.

        Args:
            frame_pushed: Event data containing frame and processor info
        """
        pass

    async def on_process_frame(self, frame_processed: FrameProcessed):
        """Called when a frame is processed by a processor.

        Args:
            frame_processed: Event data containing frame and processor info
        """
        pass

Event Data Structures

{ .api }
from pipecat.observers.base_observer import FramePushed, FrameProcessed

class FramePushed:
    """Data for frame push events.

    Attributes:
        frame (Frame): The frame being pushed
        processor (FrameProcessor): Processor pushing the frame
        direction (FrameDirection): Direction of frame flow
        timestamp (float): Event timestamp
    """
    pass

class FrameProcessed:
    """Data for frame processing events.

    Attributes:
        frame (Frame): The frame being processed
        processor (FrameProcessor): Processor handling the frame
        direction (FrameDirection): Direction of frame flow
        timestamp (float): Event timestamp
        duration_ms (float): Processing duration in milliseconds
    """
    pass

Task Observer

TaskObserver

{ .api }
from pipecat.pipeline.task_observer import TaskObserver

class TaskObserver(BaseObserver):
    """Non-blocking observer proxy for pipeline tasks.

    Manages multiple observers without blocking pipeline execution.
    Uses queue-based distribution for observer callbacks.

    Features:
    - Non-blocking observer pattern
    - Dynamic observer addition/removal
    - Queue-based frame distribution
    - Lifecycle management

    Example:
        # Create task observer
        task_observer = TaskObserver()

        # Add observers
        task_observer.add_observer(DebugLogObserver())
        task_observer.add_observer(MetricsLogObserver())

        # Integrate with pipeline task
        task = PipelineTask(
            pipeline=pipeline,
            observer=task_observer
        )
    """

    def add_observer(self, observer: BaseObserver):
        """Add an observer to the task.

        Args:
            observer: Observer to add
        """
        pass

    def remove_observer(self, observer: BaseObserver):
        """Remove an observer from the task.

        Args:
            observer: Observer to remove
        """
        pass

    async def start(self):
        """Start the task observer."""
        pass

    async def stop(self):
        """Stop the task observer."""
        pass

Specialized Observers

TurnTrackingObserver

{ .api }
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver

class TurnTrackingObserver(BaseObserver):
    """Track conversation turns and transitions.

    Monitors user and bot turns, tracking:
    - Turn start/end events
    - Turn duration
    - Turn transitions
    - Conversation flow

    Use Cases:
    - Conversation analytics
    - Turn-level metrics
    - Interaction patterns
    - Response time tracking

    Example:
        observer = TurnTrackingObserver()

        @observer.on_user_turn_start
        async def handle_user_start(turn_id: str):
            print(f"User turn started: {turn_id}")

        @observer.on_bot_turn_complete
        async def handle_bot_complete(turn_id: str, duration_ms: float):
            print(f"Bot turn completed in {duration_ms}ms")
    """
    pass

Observer Loggers

Specialized observers for logging specific pipeline events.

DebugLogObserver

{ .api }
from pipecat.observers.loggers.debug_log_observer import DebugLogObserver

class DebugLogObserver(BaseObserver):
    """Comprehensive debug logging observer.

    Logs all frame flow and processor activity for debugging.

    Features:
    - Frame flow visualization
    - Processor state tracking
    - Detailed event logging
    - Debug-level verbosity

    Args:
        log_level: Logging level (default: DEBUG)
        include_frames: Log individual frames
        include_processors: Log processor activity

    Example:
        debug_observer = DebugLogObserver(
            log_level="DEBUG",
            include_frames=True
        )

        task_observer.add_observer(debug_observer)
    """

    def __init__(
        self,
        log_level: str = "DEBUG",
        include_frames: bool = True,
        include_processors: bool = True
    ):
        pass

LLMLogObserver

{ .api }
from pipecat.observers.loggers.llm_log_observer import LLMLogObserver

class LLMLogObserver(BaseObserver):
    """LLM-specific logging observer.

    Tracks LLM interactions including:
    - Request/response pairs
    - Token usage
    - Function calls
    - Completion times

    Args:
        log_tokens: Log token counts
        log_function_calls: Log function executions
        log_prompts: Log full prompts

    Example:
        llm_observer = LLMLogObserver(
            log_tokens=True,
            log_function_calls=True
        )

        task_observer.add_observer(llm_observer)
    """

    def __init__(
        self,
        log_tokens: bool = True,
        log_function_calls: bool = False,
        log_prompts: bool = False
    ):
        pass

MetricsLogObserver

{ .api }
from pipecat.observers.loggers.metrics_log_observer import MetricsLogObserver

class MetricsLogObserver(BaseObserver):
    """Performance metrics logging observer.

    Logs performance metrics including:
    - Time to first byte (TTFB)
    - Processing durations
    - Token usage
    - Response times

    Args:
        log_ttfb: Log TTFB metrics
        log_processing: Log processing times
        log_usage: Log usage metrics

    Example:
        metrics_observer = MetricsLogObserver(
            log_ttfb=True,
            log_usage=True
        )

        task_observer.add_observer(metrics_observer)
    """

    def __init__(
        self,
        log_ttfb: bool = True,
        log_processing: bool = True,
        log_usage: bool = True
    ):
        pass

TranscriptionLogObserver

{ .api }
from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver

class TranscriptionLogObserver(BaseObserver):
    """Transcription logging observer.

    Logs speech-to-text transcription events:
    - Interim transcriptions
    - Final transcriptions
    - Transcription timing

    Args:
        log_interim: Log interim transcriptions
        log_final: Log final transcriptions

    Example:
        transcript_observer = TranscriptionLogObserver(
            log_interim=True,
            log_final=True
        )

        task_observer.add_observer(transcript_observer)
    """

    def __init__(
        self,
        log_interim: bool = False,
        log_final: bool = True
    ):
        pass

UserBotLatencyLogObserver

{ .api }
from pipecat.observers.loggers.user_bot_latency_log_observer import UserBotLatencyLogObserver

class UserBotLatencyLogObserver(BaseObserver):
    """User-to-bot latency measurement observer.

    Measures and logs response latency:
    - User speech end to bot speech start
    - Total response time
    - Processing breakdown

    Args:
        detailed_breakdown: Log detailed timing breakdown

    Example:
        latency_observer = UserBotLatencyLogObserver(
            detailed_breakdown=True
        )

        task_observer.add_observer(latency_observer)
    """

    def __init__(self, detailed_breakdown: bool = False):
        pass

Usage Patterns

Basic Observer Setup

{ .api }
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.observers.loggers import DebugLogObserver, MetricsLogObserver

# Create task observer
task_observer = TaskObserver()

# Add specialized observers
task_observer.add_observer(DebugLogObserver())
task_observer.add_observer(MetricsLogObserver())

# Create task with observer
task = PipelineTask(
    pipeline=pipeline,
    observer=task_observer
)

await task.run()

Production Monitoring Setup

{ .api }
from pipecat.observers.loggers import (
    LLMLogObserver,
    MetricsLogObserver,
    UserBotLatencyLogObserver
)

# Create production observers
task_observer = TaskObserver()

# Add production-focused observers
task_observer.add_observer(
    LLMLogObserver(
        log_tokens=True,
        log_function_calls=True
    )
)

task_observer.add_observer(
    MetricsLogObserver(
        log_ttfb=True,
        log_usage=True
    )
)

task_observer.add_observer(
    UserBotLatencyLogObserver(
        detailed_breakdown=True
    )
)

# Use in pipeline
task = PipelineTask(pipeline, observer=task_observer)

Custom Observer

{ .api }
from pipecat.observers.base_observer import BaseObserver, FramePushed

class CustomMetricsObserver(BaseObserver):
    """Custom observer for specific metrics."""

    def __init__(self):
        super().__init__()
        self.frame_counts = {}

    async def on_push_frame(self, event: FramePushed):
        """Track frame type distribution."""
        frame_type = type(event.frame).__name__
        self.frame_counts[frame_type] = self.frame_counts.get(frame_type, 0) + 1

    def get_stats(self):
        """Get collected statistics."""
        return self.frame_counts

# Use custom observer
observer = CustomMetricsObserver()
task_observer.add_observer(observer)

# After pipeline runs
stats = observer.get_stats()
print(f"Frame distribution: {stats}")

Turn Tracking Example

{ .api }
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver

# Create turn tracker
turn_observer = TurnTrackingObserver()

# Register callbacks
@turn_observer.event_handler("on_user_turn_start")
async def handle_user_turn():
    print("User started speaking")

@turn_observer.event_handler("on_user_turn_end")
async def handle_user_turn_end(duration_ms: float):
    print(f"User finished speaking ({duration_ms}ms)")

@turn_observer.event_handler("on_bot_turn_start")
async def handle_bot_turn():
    print("Bot started responding")

@turn_observer.event_handler("on_bot_turn_end")
async def handle_bot_turn_end(duration_ms: float):
    print(f"Bot finished responding ({duration_ms}ms)")

# Add to task
task_observer.add_observer(turn_observer)

Best Practices

Choose Appropriate Observers

{ .api }
# Good: Production-focused observers
task_observer.add_observer(MetricsLogObserver())
task_observer.add_observer(LLMLogObserver(log_tokens=True))
task_observer.add_observer(UserBotLatencyLogObserver())

# Bad: Debug observers in production
task_observer.add_observer(DebugLogObserver())  # Too verbose for production

Handle Observer Errors

{ .api }
class RobustObserver(BaseObserver):
    """Observer with error handling."""

    async def on_push_frame(self, event: FramePushed):
        try:
            # Process event
            await self._process(event)
        except Exception as e:
            # Log error but don't block pipeline
            logger.error(f"Observer error: {e}")

Clean Up Resources

{ .api }
class ResourcefulObserver(BaseObserver):
    """Observer with resource management."""

    def __init__(self):
        super().__init__()
        self.file = None

    async def start(self):
        """Initialize resources."""
        self.file = open("metrics.log", "w")

    async def stop(self):
        """Clean up resources."""
        if self.file:
            self.file.close()