docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Observers provide monitoring and logging infrastructure for Pipecat pipelines. They track frame flow, processor activity, and system events without blocking pipeline execution.
{ .api }
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed
class BaseObserver:
"""Abstract base class for all observers.
Observers monitor pipeline activity and can track:
- Frame flow through processors
- Frame processing events
- Performance metrics
- System state changes
Key Methods:
on_push_frame(): Called when frame is pushed
on_process_frame(): Called when frame is processed
"""
async def on_push_frame(self, frame_pushed: FramePushed):
"""Called when a frame is pushed through the pipeline.
Args:
frame_pushed: Event data containing frame and processor info
"""
pass
async def on_process_frame(self, frame_processed: FrameProcessed):
"""Called when a frame is processed by a processor.
Args:
frame_processed: Event data containing frame and processor info
"""
pass{ .api }
from pipecat.observers.base_observer import FramePushed, FrameProcessed
class FramePushed:
"""Data for frame push events.
Attributes:
frame (Frame): The frame being pushed
processor (FrameProcessor): Processor pushing the frame
direction (FrameDirection): Direction of frame flow
timestamp (float): Event timestamp
"""
pass
class FrameProcessed:
"""Data for frame processing events.
Attributes:
frame (Frame): The frame being processed
processor (FrameProcessor): Processor handling the frame
direction (FrameDirection): Direction of frame flow
timestamp (float): Event timestamp
duration_ms (float): Processing duration in milliseconds
"""
pass{ .api }
from pipecat.pipeline.task_observer import TaskObserver
class TaskObserver(BaseObserver):
"""Non-blocking observer proxy for pipeline tasks.
Manages multiple observers without blocking pipeline execution.
Uses queue-based distribution for observer callbacks.
Features:
- Non-blocking observer pattern
- Dynamic observer addition/removal
- Queue-based frame distribution
- Lifecycle management
Example:
# Create task observer
task_observer = TaskObserver()
# Add observers
task_observer.add_observer(DebugLogObserver())
task_observer.add_observer(MetricsLogObserver())
# Integrate with pipeline task
task = PipelineTask(
pipeline=pipeline,
observer=task_observer
)
"""
def add_observer(self, observer: BaseObserver):
"""Add an observer to the task.
Args:
observer: Observer to add
"""
pass
def remove_observer(self, observer: BaseObserver):
"""Remove an observer from the task.
Args:
observer: Observer to remove
"""
pass
async def start(self):
"""Start the task observer."""
pass
async def stop(self):
"""Stop the task observer."""
pass{ .api }
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
class TurnTrackingObserver(BaseObserver):
"""Track conversation turns and transitions.
Monitors user and bot turns, tracking:
- Turn start/end events
- Turn duration
- Turn transitions
- Conversation flow
Use Cases:
- Conversation analytics
- Turn-level metrics
- Interaction patterns
- Response time tracking
Example:
observer = TurnTrackingObserver()
@observer.on_user_turn_start
async def handle_user_start(turn_id: str):
print(f"User turn started: {turn_id}")
@observer.on_bot_turn_complete
async def handle_bot_complete(turn_id: str, duration_ms: float):
print(f"Bot turn completed in {duration_ms}ms")
"""
passSpecialized observers for logging specific pipeline events.
{ .api }
from pipecat.observers.loggers.debug_log_observer import DebugLogObserver
class DebugLogObserver(BaseObserver):
"""Comprehensive debug logging observer.
Logs all frame flow and processor activity for debugging.
Features:
- Frame flow visualization
- Processor state tracking
- Detailed event logging
- Debug-level verbosity
Args:
log_level: Logging level (default: DEBUG)
include_frames: Log individual frames
include_processors: Log processor activity
Example:
debug_observer = DebugLogObserver(
log_level="DEBUG",
include_frames=True
)
task_observer.add_observer(debug_observer)
"""
def __init__(
self,
log_level: str = "DEBUG",
include_frames: bool = True,
include_processors: bool = True
):
pass{ .api }
from pipecat.observers.loggers.llm_log_observer import LLMLogObserver
class LLMLogObserver(BaseObserver):
"""LLM-specific logging observer.
Tracks LLM interactions including:
- Request/response pairs
- Token usage
- Function calls
- Completion times
Args:
log_tokens: Log token counts
log_function_calls: Log function executions
log_prompts: Log full prompts
Example:
llm_observer = LLMLogObserver(
log_tokens=True,
log_function_calls=True
)
task_observer.add_observer(llm_observer)
"""
def __init__(
self,
log_tokens: bool = True,
log_function_calls: bool = False,
log_prompts: bool = False
):
pass{ .api }
from pipecat.observers.loggers.metrics_log_observer import MetricsLogObserver
class MetricsLogObserver(BaseObserver):
"""Performance metrics logging observer.
Logs performance metrics including:
- Time to first byte (TTFB)
- Processing durations
- Token usage
- Response times
Args:
log_ttfb: Log TTFB metrics
log_processing: Log processing times
log_usage: Log usage metrics
Example:
metrics_observer = MetricsLogObserver(
log_ttfb=True,
log_usage=True
)
task_observer.add_observer(metrics_observer)
"""
def __init__(
self,
log_ttfb: bool = True,
log_processing: bool = True,
log_usage: bool = True
):
pass{ .api }
from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
class TranscriptionLogObserver(BaseObserver):
"""Transcription logging observer.
Logs speech-to-text transcription events:
- Interim transcriptions
- Final transcriptions
- Transcription timing
Args:
log_interim: Log interim transcriptions
log_final: Log final transcriptions
Example:
transcript_observer = TranscriptionLogObserver(
log_interim=True,
log_final=True
)
task_observer.add_observer(transcript_observer)
"""
def __init__(
self,
log_interim: bool = False,
log_final: bool = True
):
pass{ .api }
from pipecat.observers.loggers.user_bot_latency_log_observer import UserBotLatencyLogObserver
class UserBotLatencyLogObserver(BaseObserver):
"""User-to-bot latency measurement observer.
Measures and logs response latency:
- User speech end to bot speech start
- Total response time
- Processing breakdown
Args:
detailed_breakdown: Log detailed timing breakdown
Example:
latency_observer = UserBotLatencyLogObserver(
detailed_breakdown=True
)
task_observer.add_observer(latency_observer)
"""
def __init__(self, detailed_breakdown: bool = False):
pass{ .api }
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.observers.loggers import DebugLogObserver, MetricsLogObserver
# Create task observer
task_observer = TaskObserver()
# Add specialized observers
task_observer.add_observer(DebugLogObserver())
task_observer.add_observer(MetricsLogObserver())
# Create task with observer
task = PipelineTask(
pipeline=pipeline,
observer=task_observer
)
await task.run(){ .api }
from pipecat.observers.loggers import (
LLMLogObserver,
MetricsLogObserver,
UserBotLatencyLogObserver
)
# Create production observers
task_observer = TaskObserver()
# Add production-focused observers
task_observer.add_observer(
LLMLogObserver(
log_tokens=True,
log_function_calls=True
)
)
task_observer.add_observer(
MetricsLogObserver(
log_ttfb=True,
log_usage=True
)
)
task_observer.add_observer(
UserBotLatencyLogObserver(
detailed_breakdown=True
)
)
# Use in pipeline
task = PipelineTask(pipeline, observer=task_observer){ .api }
from pipecat.observers.base_observer import BaseObserver, FramePushed
class CustomMetricsObserver(BaseObserver):
"""Custom observer for specific metrics."""
def __init__(self):
super().__init__()
self.frame_counts = {}
async def on_push_frame(self, event: FramePushed):
"""Track frame type distribution."""
frame_type = type(event.frame).__name__
self.frame_counts[frame_type] = self.frame_counts.get(frame_type, 0) + 1
def get_stats(self):
"""Get collected statistics."""
return self.frame_counts
# Use custom observer
observer = CustomMetricsObserver()
task_observer.add_observer(observer)
# After pipeline runs
stats = observer.get_stats()
print(f"Frame distribution: {stats}"){ .api }
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
# Create turn tracker
turn_observer = TurnTrackingObserver()
# Register callbacks
@turn_observer.event_handler("on_user_turn_start")
async def handle_user_turn():
print("User started speaking")
@turn_observer.event_handler("on_user_turn_end")
async def handle_user_turn_end(duration_ms: float):
print(f"User finished speaking ({duration_ms}ms)")
@turn_observer.event_handler("on_bot_turn_start")
async def handle_bot_turn():
print("Bot started responding")
@turn_observer.event_handler("on_bot_turn_end")
async def handle_bot_turn_end(duration_ms: float):
print(f"Bot finished responding ({duration_ms}ms)")
# Add to task
task_observer.add_observer(turn_observer){ .api }
# Good: Production-focused observers
task_observer.add_observer(MetricsLogObserver())
task_observer.add_observer(LLMLogObserver(log_tokens=True))
task_observer.add_observer(UserBotLatencyLogObserver())
# Bad: Debug observers in production
task_observer.add_observer(DebugLogObserver()) # Too verbose for production{ .api }
class RobustObserver(BaseObserver):
"""Observer with error handling."""
async def on_push_frame(self, event: FramePushed):
try:
# Process event
await self._process(event)
except Exception as e:
# Log error but don't block pipeline
logger.error(f"Observer error: {e}"){ .api }
class ResourcefulObserver(BaseObserver):
"""Observer with resource management."""
def __init__(self):
super().__init__()
self.file = None
async def start(self):
"""Initialize resources."""
self.file = open("metrics.log", "w")
async def stop(self):
"""Clean up resources."""
if self.file:
self.file.close()