docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Pipecat provides comprehensive metrics for monitoring performance, tracking usage, and understanding system behavior. Metrics flow as frames through pipelines and can be collected, logged, or sent to analytics systems.
{ .api }
from pipecat.metrics.metrics import MetricsData
from pydantic import BaseModel
class MetricsData(BaseModel):
"""Base metrics data container.
All metrics inherit from this class and use Pydantic for validation.
Attributes:
processor (str): Name of the processor generating the metrics
model (Optional[str]): Optional model name associated with the metrics
timestamp (float): Timestamp when the metric was generated
Example:
import time
# Subclass example
class CustomMetricsData(MetricsData):
value: int
metrics = CustomMetricsData(
processor="llm_service",
model="gpt-4",
timestamp=time.time(),
value=123
)
"""
processor: str
model: Optional[str] = None
timestamp: float = 0.0{ .api }
from pipecat.metrics.metrics import TTFBMetricsData
class TTFBMetricsData(MetricsData):
"""Time To First Byte metrics.
Measures latency from request to first response byte.
Critical for real-time conversational experiences.
Attributes:
processor (str): Processor name
model (Optional[str]): Optional model name
value (float): TTFB measurement in seconds
Example:
ttfb_metrics = TTFBMetricsData(
processor="openai_llm",
model="gpt-4",
value=0.245 # 245ms in seconds
)
# Access value in milliseconds
ttfb_ms = ttfb_metrics.value * 1000
"""
value: float{ .api }
from pipecat.metrics.metrics import ProcessingMetricsData
class ProcessingMetricsData(MetricsData):
"""Processing time metrics.
Measures total processing time for operations.
Attributes:
processor (str): Processor name
model (Optional[str]): Optional model name
value (float): Processing time measurement in seconds
Example:
processing_metrics = ProcessingMetricsData(
processor="tts_service",
model="elevenlabs",
value=0.850 # 850ms in seconds
)
# Access value in milliseconds
duration_ms = processing_metrics.value * 1000
"""
value: float{ .api }
from pipecat.metrics.metrics import LLMTokenUsage
class LLMTokenUsage(BaseModel):
"""LLM token usage statistics.
Tracks token consumption for cost and quota management.
Attributes:
prompt_tokens (int): Number of tokens in the input prompt
completion_tokens (int): Number of tokens in the generated completion
total_tokens (int): Total number of tokens used (prompt + completion)
cache_read_input_tokens (Optional[int]): Number of tokens read from cache
cache_creation_input_tokens (Optional[int]): Number of tokens used to create cache entries
reasoning_tokens (Optional[int]): Number of tokens used for reasoning (some models)
Example:
usage = LLMTokenUsage(
prompt_tokens=150,
completion_tokens=50,
total_tokens=200,
cache_read_input_tokens=100
)
"""
prompt_tokens: int
completion_tokens: int
total_tokens: int
cache_read_input_tokens: Optional[int] = None
cache_creation_input_tokens: Optional[int] = None
reasoning_tokens: Optional[int] = None{ .api }
from pipecat.metrics.metrics import LLMUsageMetricsData
class LLMUsageMetricsData(MetricsData):
"""LLM token usage metrics data.
Tracks token usage for LLM operations.
Attributes:
processor (str): LLM service name
model (Optional[str]): Model used (from MetricsData)
value (LLMTokenUsage): Token usage statistics for the LLM operation
tokens (LLMTokenUsage): Alias for value, token usage statistics
characters (int): Number of characters processed
Example:
import time
llm_metrics = LLMUsageMetricsData(
processor="openai_llm",
model="gpt-4",
timestamp=time.time(),
value=LLMTokenUsage(
prompt_tokens=100,
completion_tokens=50,
total_tokens=150
),
tokens=LLMTokenUsage(
prompt_tokens=100,
completion_tokens=50,
total_tokens=150
),
characters=200
)
"""
value: LLMTokenUsage
tokens: LLMTokenUsage
characters: int{ .api }
from pipecat.metrics.metrics import TTSUsageMetricsData
class TTSUsageMetricsData(MetricsData):
"""TTS usage metrics.
Tracks TTS synthesis usage for billing and monitoring.
Attributes:
processor (str): TTS service name
model (Optional[str]): TTS model name (from MetricsData)
timestamp (float): Metric timestamp (from MetricsData)
characters (int): Characters synthesized
audio_duration_ms (float): Generated audio duration in milliseconds
Example:
import time
tts_metrics = TTSUsageMetricsData(
processor="elevenlabs_tts",
model="eleven_turbo_v2",
timestamp=time.time(),
characters=150,
audio_duration_ms=5400 # 5.4 seconds
)
"""
characters: int
audio_duration_ms: float{ .api }
from pipecat.metrics.metrics import SmartTurnMetricsData
class SmartTurnMetricsData(MetricsData):
"""Smart turn prediction metrics.
Tracks turn detection analysis performance.
Attributes:
processor (str): Turn analyzer name
model (Optional[str]): Turn analyzer model name (from MetricsData)
timestamp (float): Metric timestamp (from MetricsData)
prediction_score (float): Turn prediction confidence (0.0-1.0)
Example:
import time
turn_metrics = SmartTurnMetricsData(
processor="smart_turn_v3",
timestamp=time.time(),
prediction_score=0.87
)
"""
prediction_score: float{ .api }
from pipecat.frames.frames import MetricsFrame
class MetricsFrame(SystemFrame):
"""Metrics data frame.
Carries metrics data through pipeline. SystemFrame for
immediate processing and reporting.
Attributes:
data (MetricsData): Metrics data
Example:
metrics_frame = MetricsFrame(
data=TTFBMetricsData(
processor="llm",
timestamp=time.time(),
ttfb=0.250
)
)
await task.queue_frame(metrics_frame)
"""
def __init__(self, data: MetricsData):
"""Initialize metrics frame.
Args:
data: Metrics data
"""
super().__init__()
self.data = data{ .api }
from pipecat.pipeline.task import PipelineTask, PipelineParams
# Enable metrics in pipeline params
task = PipelineTask(
pipeline=pipeline,
params=PipelineParams(
enable_metrics=True, # Enable TTFB metrics
enable_usage_metrics=True # Enable usage metrics
)
)
# Metrics automatically generated and emitted as MetricsFrames{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import TTFBMetricsData, LLMUsageMetricsData
from pipecat.pipeline.pipeline import Pipeline
class MetricsCollector(FrameProcessor):
"""Collect and log metrics."""
def __init__(self):
super().__init__()
self._ttfb_samples = []
self._token_usage = []
async def process_frame(self, frame, direction):
if isinstance(frame, MetricsFrame):
data = frame.data
if isinstance(data, TTFBMetricsData):
self._ttfb_samples.append(data.value * 1000) # Convert to ms
print(f"TTFB: {data.value * 1000:.2f}ms from {data.processor}")
elif isinstance(data, LLMUsageMetricsData):
self._token_usage.append(data.tokens.total_tokens)
print(f"LLM: {data.tokens.total_tokens} tokens, model={data.model}")
await self.push_frame(frame, direction)
def get_stats(self):
"""Get collected statistics."""
if self._ttfb_samples:
avg_ttfb = sum(self._ttfb_samples) / len(self._ttfb_samples)
else:
avg_ttfb = 0
total_tokens = sum(self._token_usage)
return {
"avg_ttfb_ms": avg_ttfb,
"total_tokens": total_tokens,
"sample_count": len(self._ttfb_samples)
}
# Use in pipeline
collector = MetricsCollector()
pipeline = Pipeline([
user_agg,
llm,
collector, # Collect metrics
tts,
transport.output()
]){ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import TTFBMetricsData
class LatencyMonitor(FrameProcessor):
"""Monitor and alert on high latency."""
def __init__(self, threshold_ms: float = 500):
super().__init__()
self._threshold_ms = threshold_ms
async def process_frame(self, frame, direction):
if isinstance(frame, MetricsFrame):
if isinstance(frame.data, TTFBMetricsData):
ttfb_ms = frame.data.ttfb_ms
if ttfb_ms > self._threshold_ms:
print(f"⚠️ High latency: {ttfb_ms:.2f}ms (threshold: {self._threshold_ms}ms)")
await self.push_frame(frame, direction){ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import LLMUsageMetricsData
class CostTracker(FrameProcessor):
"""Track LLM token costs."""
# Example pricing (per 1K tokens)
PRICING = {
"gpt-4": {"prompt": 0.03, "completion": 0.06},
"gpt-3.5-turbo": {"prompt": 0.0015, "completion": 0.002}
}
def __init__(self):
super().__init__()
self._total_cost = 0.0
async def process_frame(self, frame, direction):
if isinstance(frame, MetricsFrame):
if isinstance(frame.data, LLMUsageMetricsData):
data = frame.data
model = data.model
if model in self.PRICING:
prompt_cost = (data.tokens.prompt_tokens / 1000) * self.PRICING[model]["prompt"]
completion_cost = (data.tokens.completion_tokens / 1000) * self.PRICING[model]["completion"]
request_cost = prompt_cost + completion_cost
self._total_cost += request_cost
print(f"Request cost: ${request_cost:.4f} (Total: ${self._total_cost:.4f})")
await self.push_frame(frame, direction)
def get_total_cost(self) -> float:
"""Get total cost."""
return self._total_cost{ .api }
import asyncio
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import MetricsFrame
class AnalyticsExporter(FrameProcessor):
"""Export metrics to analytics service."""
def __init__(self, analytics_client):
super().__init__()
self._client = analytics_client
async def process_frame(self, frame, direction):
if isinstance(frame, MetricsFrame):
# Send to analytics service
asyncio.create_task(
self._send_metric(frame.data)
)
await self.push_frame(frame, direction)
async def _send_metric(self, data):
"""Send metric to analytics."""
try:
await self._client.track_event(
event_type="metric",
properties={
"processor": data.processor,
"timestamp": data.timestamp,
**data.dict()
}
)
except Exception as e:
print(f"Failed to send metric: {e}"){ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import MetricsFrame
import time
class DashboardUpdater(FrameProcessor):
"""Update real-time dashboard."""
def __init__(self):
super().__init__()
self._metrics = {
"ttfb_samples": [],
"token_usage": 0,
"requests": 0,
"last_update": time.time()
}
async def process_frame(self, frame, direction):
if isinstance(frame, MetricsFrame):
data = frame.data
if isinstance(data, TTFBMetricsData):
self._metrics["ttfb_samples"].append(data.ttfb_ms)
# Keep last 100 samples
self._metrics["ttfb_samples"] = self._metrics["ttfb_samples"][-100:]
elif isinstance(data, LLMUsageMetricsData):
self._metrics["token_usage"] += data.tokens.total_tokens
self._metrics["requests"] += 1
self._metrics["last_update"] = time.time()
# Update dashboard
await self._update_dashboard()
await self.push_frame(frame, direction)
async def _update_dashboard(self):
"""Update dashboard with current metrics."""
if self._metrics["ttfb_samples"]:
avg_ttfb = sum(self._metrics["ttfb_samples"]) / len(self._metrics["ttfb_samples"])
min_ttfb = min(self._metrics["ttfb_samples"])
max_ttfb = max(self._metrics["ttfb_samples"])
print(f"""
Dashboard Update:
TTFB: avg={avg_ttfb:.2f}ms, min={min_ttfb:.2f}ms, max={max_ttfb:.2f}ms
Tokens: {self._metrics["token_usage"]}
Requests: {self._metrics["requests"]}
"""){ .api }
# Good: Enable metrics
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True
)
)
# Bad: No metrics
task = PipelineTask(pipeline)
# No visibility into performance or usage{ .api }
# Monitor these metrics in production:
# 1. TTFB (latency)
# 2. Token usage (cost)
# 3. Error rates
# 4. Request counts
# 5. Audio duration (for TTS)
class CriticalMetricsMonitor(FrameProcessor):
"""Monitor critical metrics."""
async def process_frame(self, frame, direction):
if isinstance(frame, MetricsFrame):
# Log all metrics
logger.info(f"Metric: {frame.data}")
# Alert on issues
if isinstance(frame.data, TTFBMetricsData):
if frame.data.ttfb_ms > 1000:
logger.warning(f"High TTFB: {frame.data.ttfb_ms}ms")
await self.push_frame(frame, direction){ .api }
# Good: Aggregate metrics for trends
class TimeSeriesAggregator(FrameProcessor):
"""Aggregate metrics over time windows."""
def __init__(self, window_seconds: int = 60):
super().__init__()
self._window = window_seconds
self._samples = []
async def process_frame(self, frame, direction):
if isinstance(frame, MetricsFrame):
now = time.time()
# Add sample
self._samples.append((now, frame.data))
# Remove old samples
cutoff = now - self._window
self._samples = [
(t, d) for t, d in self._samples
if t > cutoff
]
# Calculate aggregate
if len(self._samples) >= 10:
avg = self._calculate_average()
print(f"Rolling average: {avg}")
await self.push_frame(frame, direction)
# Bad: Only looking at individual metrics
# Misses trends and patterns{ .api }
# Good: Export metrics for offline analysis
class MetricsLogger(FrameProcessor):
"""Log metrics to file."""
def __init__(self, log_file: str):
super().__init__()
self._log_file = log_file
async def process_frame(self, frame, direction):
if isinstance(frame, MetricsFrame):
with open(self._log_file, "a") as f:
f.write(f"{frame.data.json()}\n")
await self.push_frame(frame, direction)
# Analyze logs later with pandas, etc.