or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

metrics.mddocs/utilities/

Metrics and Observability

Pipecat provides comprehensive metrics for monitoring performance, tracking usage, and understanding system behavior. Metrics flow as frames through pipelines and can be collected, logged, or sent to analytics systems.

Metrics Data Types

MetricsData

{ .api }
from pipecat.metrics.metrics import MetricsData
from pydantic import BaseModel

class MetricsData(BaseModel):
    """Base metrics data container.

    All metrics inherit from this class and use Pydantic for validation.

    Attributes:
        processor (str): Name of the processor generating the metrics
        model (Optional[str]): Optional model name associated with the metrics
        timestamp (float): Timestamp when the metric was generated

    Example:
        import time

        # Subclass example
        class CustomMetricsData(MetricsData):
            value: int

        metrics = CustomMetricsData(
            processor="llm_service",
            model="gpt-4",
            timestamp=time.time(),
            value=123
        )
    """
    processor: str
    model: Optional[str] = None
    timestamp: float = 0.0

TTFBMetricsData

{ .api }
from pipecat.metrics.metrics import TTFBMetricsData

class TTFBMetricsData(MetricsData):
    """Time To First Byte metrics.

    Measures latency from request to first response byte.
    Critical for real-time conversational experiences.

    Attributes:
        processor (str): Processor name
        model (Optional[str]): Optional model name
        value (float): TTFB measurement in seconds

    Example:
        ttfb_metrics = TTFBMetricsData(
            processor="openai_llm",
            model="gpt-4",
            value=0.245  # 245ms in seconds
        )

        # Access value in milliseconds
        ttfb_ms = ttfb_metrics.value * 1000
    """
    value: float

ProcessingMetricsData

{ .api }
from pipecat.metrics.metrics import ProcessingMetricsData

class ProcessingMetricsData(MetricsData):
    """Processing time metrics.

    Measures total processing time for operations.

    Attributes:
        processor (str): Processor name
        model (Optional[str]): Optional model name
        value (float): Processing time measurement in seconds

    Example:
        processing_metrics = ProcessingMetricsData(
            processor="tts_service",
            model="elevenlabs",
            value=0.850  # 850ms in seconds
        )

        # Access value in milliseconds
        duration_ms = processing_metrics.value * 1000
    """
    value: float

LLMTokenUsage

{ .api }
from pipecat.metrics.metrics import LLMTokenUsage

class LLMTokenUsage(BaseModel):
    """LLM token usage statistics.

    Tracks token consumption for cost and quota management.

    Attributes:
        prompt_tokens (int): Number of tokens in the input prompt
        completion_tokens (int): Number of tokens in the generated completion
        total_tokens (int): Total number of tokens used (prompt + completion)
        cache_read_input_tokens (Optional[int]): Number of tokens read from cache
        cache_creation_input_tokens (Optional[int]): Number of tokens used to create cache entries
        reasoning_tokens (Optional[int]): Number of tokens used for reasoning (some models)

    Example:
        usage = LLMTokenUsage(
            prompt_tokens=150,
            completion_tokens=50,
            total_tokens=200,
            cache_read_input_tokens=100
        )
    """
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int
    cache_read_input_tokens: Optional[int] = None
    cache_creation_input_tokens: Optional[int] = None
    reasoning_tokens: Optional[int] = None

LLMUsageMetricsData

{ .api }
from pipecat.metrics.metrics import LLMUsageMetricsData

class LLMUsageMetricsData(MetricsData):
    """LLM token usage metrics data.

    Tracks token usage for LLM operations.

    Attributes:
        processor (str): LLM service name
        model (Optional[str]): Model used (from MetricsData)
        value (LLMTokenUsage): Token usage statistics for the LLM operation
        tokens (LLMTokenUsage): Alias for value, token usage statistics
        characters (int): Number of characters processed

    Example:
        import time

        llm_metrics = LLMUsageMetricsData(
            processor="openai_llm",
            model="gpt-4",
            timestamp=time.time(),
            value=LLMTokenUsage(
                prompt_tokens=100,
                completion_tokens=50,
                total_tokens=150
            ),
            tokens=LLMTokenUsage(
                prompt_tokens=100,
                completion_tokens=50,
                total_tokens=150
            ),
            characters=200
        )
    """
    value: LLMTokenUsage
    tokens: LLMTokenUsage
    characters: int

TTSUsageMetricsData

{ .api }
from pipecat.metrics.metrics import TTSUsageMetricsData

class TTSUsageMetricsData(MetricsData):
    """TTS usage metrics.

    Tracks TTS synthesis usage for billing and monitoring.

    Attributes:
        processor (str): TTS service name
        model (Optional[str]): TTS model name (from MetricsData)
        timestamp (float): Metric timestamp (from MetricsData)
        characters (int): Characters synthesized
        audio_duration_ms (float): Generated audio duration in milliseconds

    Example:
        import time

        tts_metrics = TTSUsageMetricsData(
            processor="elevenlabs_tts",
            model="eleven_turbo_v2",
            timestamp=time.time(),
            characters=150,
            audio_duration_ms=5400  # 5.4 seconds
        )
    """
    characters: int
    audio_duration_ms: float

SmartTurnMetricsData

{ .api }
from pipecat.metrics.metrics import SmartTurnMetricsData

class SmartTurnMetricsData(MetricsData):
    """Smart turn prediction metrics.

    Tracks turn detection analysis performance.

    Attributes:
        processor (str): Turn analyzer name
        model (Optional[str]): Turn analyzer model name (from MetricsData)
        timestamp (float): Metric timestamp (from MetricsData)
        prediction_score (float): Turn prediction confidence (0.0-1.0)

    Example:
        import time

        turn_metrics = SmartTurnMetricsData(
            processor="smart_turn_v3",
            timestamp=time.time(),
            prediction_score=0.87
        )
    """
    prediction_score: float

Metrics Frames

MetricsFrame

{ .api }
from pipecat.frames.frames import MetricsFrame

class MetricsFrame(SystemFrame):
    """Metrics data frame.

    Carries metrics data through pipeline. SystemFrame for
    immediate processing and reporting.

    Attributes:
        data (MetricsData): Metrics data

    Example:
        metrics_frame = MetricsFrame(
            data=TTFBMetricsData(
                processor="llm",
                timestamp=time.time(),
                ttfb=0.250
            )
        )

        await task.queue_frame(metrics_frame)
    """

    def __init__(self, data: MetricsData):
        """Initialize metrics frame.

        Args:
            data: Metrics data
        """
        super().__init__()
        self.data = data

Collecting Metrics

Enable Metrics Collection

{ .api }
from pipecat.pipeline.task import PipelineTask, PipelineParams

# Enable metrics in pipeline params
task = PipelineTask(
    pipeline=pipeline,
    params=PipelineParams(
        enable_metrics=True,         # Enable TTFB metrics
        enable_usage_metrics=True    # Enable usage metrics
    )
)

# Metrics automatically generated and emitted as MetricsFrames

Metrics Processor

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import TTFBMetricsData, LLMUsageMetricsData
from pipecat.pipeline.pipeline import Pipeline

class MetricsCollector(FrameProcessor):
    """Collect and log metrics."""

    def __init__(self):
        super().__init__()
        self._ttfb_samples = []
        self._token_usage = []

    async def process_frame(self, frame, direction):
        if isinstance(frame, MetricsFrame):
            data = frame.data

            if isinstance(data, TTFBMetricsData):
                self._ttfb_samples.append(data.value * 1000)  # Convert to ms
                print(f"TTFB: {data.value * 1000:.2f}ms from {data.processor}")

            elif isinstance(data, LLMUsageMetricsData):
                self._token_usage.append(data.tokens.total_tokens)
                print(f"LLM: {data.tokens.total_tokens} tokens, model={data.model}")

        await self.push_frame(frame, direction)

    def get_stats(self):
        """Get collected statistics."""
        if self._ttfb_samples:
            avg_ttfb = sum(self._ttfb_samples) / len(self._ttfb_samples)
        else:
            avg_ttfb = 0

        total_tokens = sum(self._token_usage)

        return {
            "avg_ttfb_ms": avg_ttfb,
            "total_tokens": total_tokens,
            "sample_count": len(self._ttfb_samples)
        }

# Use in pipeline
collector = MetricsCollector()
pipeline = Pipeline([
    user_agg,
    llm,
    collector,  # Collect metrics
    tts,
    transport.output()
])

Usage Patterns

Monitor Latency

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import TTFBMetricsData

class LatencyMonitor(FrameProcessor):
    """Monitor and alert on high latency."""

    def __init__(self, threshold_ms: float = 500):
        super().__init__()
        self._threshold_ms = threshold_ms

    async def process_frame(self, frame, direction):
        if isinstance(frame, MetricsFrame):
            if isinstance(frame.data, TTFBMetricsData):
                ttfb_ms = frame.data.ttfb_ms

                if ttfb_ms > self._threshold_ms:
                    print(f"⚠️  High latency: {ttfb_ms:.2f}ms (threshold: {self._threshold_ms}ms)")

        await self.push_frame(frame, direction)

Track Token Costs

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import LLMUsageMetricsData

class CostTracker(FrameProcessor):
    """Track LLM token costs."""

    # Example pricing (per 1K tokens)
    PRICING = {
        "gpt-4": {"prompt": 0.03, "completion": 0.06},
        "gpt-3.5-turbo": {"prompt": 0.0015, "completion": 0.002}
    }

    def __init__(self):
        super().__init__()
        self._total_cost = 0.0

    async def process_frame(self, frame, direction):
        if isinstance(frame, MetricsFrame):
            if isinstance(frame.data, LLMUsageMetricsData):
                data = frame.data
                model = data.model

                if model in self.PRICING:
                    prompt_cost = (data.tokens.prompt_tokens / 1000) * self.PRICING[model]["prompt"]
                    completion_cost = (data.tokens.completion_tokens / 1000) * self.PRICING[model]["completion"]
                    request_cost = prompt_cost + completion_cost

                    self._total_cost += request_cost

                    print(f"Request cost: ${request_cost:.4f} (Total: ${self._total_cost:.4f})")

        await self.push_frame(frame, direction)

    def get_total_cost(self) -> float:
        """Get total cost."""
        return self._total_cost

Export to Analytics

{ .api }
import asyncio
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import MetricsFrame

class AnalyticsExporter(FrameProcessor):
    """Export metrics to analytics service."""

    def __init__(self, analytics_client):
        super().__init__()
        self._client = analytics_client

    async def process_frame(self, frame, direction):
        if isinstance(frame, MetricsFrame):
            # Send to analytics service
            asyncio.create_task(
                self._send_metric(frame.data)
            )

        await self.push_frame(frame, direction)

    async def _send_metric(self, data):
        """Send metric to analytics."""
        try:
            await self._client.track_event(
                event_type="metric",
                properties={
                    "processor": data.processor,
                    "timestamp": data.timestamp,
                    **data.dict()
                }
            )
        except Exception as e:
            print(f"Failed to send metric: {e}")

Real-time Dashboard

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import MetricsFrame
import time

class DashboardUpdater(FrameProcessor):
    """Update real-time dashboard."""

    def __init__(self):
        super().__init__()
        self._metrics = {
            "ttfb_samples": [],
            "token_usage": 0,
            "requests": 0,
            "last_update": time.time()
        }

    async def process_frame(self, frame, direction):
        if isinstance(frame, MetricsFrame):
            data = frame.data

            if isinstance(data, TTFBMetricsData):
                self._metrics["ttfb_samples"].append(data.ttfb_ms)
                # Keep last 100 samples
                self._metrics["ttfb_samples"] = self._metrics["ttfb_samples"][-100:]

            elif isinstance(data, LLMUsageMetricsData):
                self._metrics["token_usage"] += data.tokens.total_tokens
                self._metrics["requests"] += 1

            self._metrics["last_update"] = time.time()

            # Update dashboard
            await self._update_dashboard()

        await self.push_frame(frame, direction)

    async def _update_dashboard(self):
        """Update dashboard with current metrics."""
        if self._metrics["ttfb_samples"]:
            avg_ttfb = sum(self._metrics["ttfb_samples"]) / len(self._metrics["ttfb_samples"])
            min_ttfb = min(self._metrics["ttfb_samples"])
            max_ttfb = max(self._metrics["ttfb_samples"])

            print(f"""
Dashboard Update:
  TTFB: avg={avg_ttfb:.2f}ms, min={min_ttfb:.2f}ms, max={max_ttfb:.2f}ms
  Tokens: {self._metrics["token_usage"]}
  Requests: {self._metrics["requests"]}
            """)

Best Practices

Always Enable Metrics in Production

{ .api }
# Good: Enable metrics
task = PipelineTask(
    pipeline,
    params=PipelineParams(
        enable_metrics=True,
        enable_usage_metrics=True
    )
)

# Bad: No metrics
task = PipelineTask(pipeline)
# No visibility into performance or usage

Monitor Critical Metrics

{ .api }
# Monitor these metrics in production:
# 1. TTFB (latency)
# 2. Token usage (cost)
# 3. Error rates
# 4. Request counts
# 5. Audio duration (for TTS)

class CriticalMetricsMonitor(FrameProcessor):
    """Monitor critical metrics."""

    async def process_frame(self, frame, direction):
        if isinstance(frame, MetricsFrame):
            # Log all metrics
            logger.info(f"Metric: {frame.data}")

            # Alert on issues
            if isinstance(frame.data, TTFBMetricsData):
                if frame.data.ttfb_ms > 1000:
                    logger.warning(f"High TTFB: {frame.data.ttfb_ms}ms")

        await self.push_frame(frame, direction)

Aggregate Over Time

{ .api }
# Good: Aggregate metrics for trends
class TimeSeriesAggregator(FrameProcessor):
    """Aggregate metrics over time windows."""

    def __init__(self, window_seconds: int = 60):
        super().__init__()
        self._window = window_seconds
        self._samples = []

    async def process_frame(self, frame, direction):
        if isinstance(frame, MetricsFrame):
            now = time.time()

            # Add sample
            self._samples.append((now, frame.data))

            # Remove old samples
            cutoff = now - self._window
            self._samples = [
                (t, d) for t, d in self._samples
                if t > cutoff
            ]

            # Calculate aggregate
            if len(self._samples) >= 10:
                avg = self._calculate_average()
                print(f"Rolling average: {avg}")

        await self.push_frame(frame, direction)

# Bad: Only looking at individual metrics
# Misses trends and patterns

Export for Analysis

{ .api }
# Good: Export metrics for offline analysis
class MetricsLogger(FrameProcessor):
    """Log metrics to file."""

    def __init__(self, log_file: str):
        super().__init__()
        self._log_file = log_file

    async def process_frame(self, frame, direction):
        if isinstance(frame, MetricsFrame):
            with open(self._log_file, "a") as f:
                f.write(f"{frame.data.json()}\n")

        await self.push_frame(frame, direction)

# Analyze logs later with pandas, etc.