docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
{ .api }
from pipecat.frames.frames import ErrorFrame
class ErrorFrame(SystemFrame):
"""Recoverable error.
Attributes:
error: Error message string
"""
def __init__(self, error: str):
pass
# Usage
try:
result = await operation()
except Exception as e:
await self.push_frame(ErrorFrame(error=str(e)))
# Continue processing{ .api }
from pipecat.frames.frames import FatalErrorFrame
class FatalErrorFrame(ErrorFrame):
"""Unrecoverable error that stops pipeline."""
def __init__(self, error: str):
pass
# Usage
try:
result = await critical_operation()
except CriticalError as e:
await self.push_frame(FatalErrorFrame(error=str(e)))
# Pipeline will stop{ .api }
@service.event_handler("on_connection_error")
async def handle_connection_error(error: Exception):
"""Handle service connection errors."""
logger.error(f"Connection error: {error}")
# Implement retry or fallback
await asyncio.sleep(5)
await service.reconnect(){ .api }
@llm.event_handler("on_completion_timeout")
async def handle_llm_timeout():
"""Handle LLM completion timeout."""
logger.warning("LLM timed out")
await task.queue_frame(
TextFrame("I apologize, I'm taking too long.")
){ .api }
@transport.event_handler("on_transport_error")
async def handle_transport_error(error: Exception):
"""Handle transport errors."""
logger.error(f"Transport error: {error}")
# Cleanup and reconnect
await transport.stop()
await transport.start(){ .api }
async def retry_with_backoff(
func,
max_retries: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0
):
"""Retry with exponential backoff."""
delay = initial_delay
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay = min(delay * 2, max_delay)
raise last_exception{ .api }
from pipecat.pipeline.service_switcher import (
ServiceSwitcher,
ServiceSwitcherStrategy
)
class FallbackStrategy(ServiceSwitcherStrategy):
"""Automatic fallback on error."""
def __init__(self):
self._current = 0
self._failed = set()
def select(self, services):
for i in range(len(services)):
idx = (self._current + i) % len(services)
if idx not in self._failed:
return idx
self._failed.clear()
return 0
def on_error(self, service_index: int):
self._failed.add(service_index)
self._current = (service_index + 1) % len(services)
# Usage
switcher = ServiceSwitcher(
services=[primary, secondary, tertiary],
strategy=FallbackStrategy()
){ .api }
from enum import Enum
from time import time
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
"""Circuit breaker pattern for service calls."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0
):
self._state = CircuitState.CLOSED
self._failure_count = 0
self._failure_threshold = failure_threshold
self._recovery_timeout = recovery_timeout
self._last_failure_time = 0
async def call(self, func):
"""Execute function with circuit breaker."""
if self._state == CircuitState.OPEN:
if time() - self._last_failure_time > self._recovery_timeout:
self._state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func()
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.CLOSED
self._failure_count = 0
return result
except Exception as e:
self._failure_count += 1
self._last_failure_time = time()
if self._failure_count >= self._failure_threshold:
self._state = CircuitState.OPEN
raise e{ .api }
# Base exceptions (framework-specific)
class PipecatError(Exception):
"""Base Pipecat exception."""
pass
class ServiceError(PipecatError):
"""Service-related error."""
pass
class TransportError(PipecatError):
"""Transport-related error."""
pass
class ProcessorError(PipecatError):
"""Processor-related error."""
pass{ .api }
class RobustProcessor(FrameProcessor):
"""Processor with comprehensive error handling."""
async def process_frame(self, frame, direction):
try:
await self._process(frame)
await self.push_frame(frame, direction)
except asyncio.TimeoutError:
logger.warning("Processing timeout")
await self.push_frame(ErrorFrame(error="Timeout"))
await self.push_frame(frame, direction)
except RecoverableError as e:
logger.error(f"Recoverable: {e}")
await self.push_frame(ErrorFrame(error=str(e)))
await self.push_frame(frame, direction)
except Exception as e:
logger.critical(f"Fatal: {e}")
await self.push_frame(FatalErrorFrame(error=str(e))){ .api }
class ResilientService(LLMService):
"""Service with error recovery."""
async def _with_retry(self, operation):
"""Execute with retry logic."""
for attempt in range(3):
try:
return await operation()
except ConnectionError as e:
if attempt == 2:
raise ServiceError(f"Failed after 3 attempts: {e}")
await asyncio.sleep(2 ** attempt){ .api }
async def run_pipeline_with_error_handling():
"""Run pipeline with comprehensive error handling."""
task = PipelineTask(pipeline)
try:
await task.run()
except asyncio.CancelledError:
logger.info("Pipeline cancelled")
raise
except KeyboardInterrupt:
logger.info("User interrupted")
await task.cancel()
except Exception as e:
logger.error(f"Pipeline error: {e}", exc_info=True)
await task.cancel()
raise
finally:
# Always cleanup
await transport.stop()
logger.info("Cleanup complete"){ .api }
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
){ .api }
from loguru import logger
logger.add(
"pipecat.log",
rotation="500 MB",
retention="10 days",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
# Filter specific components
logger.add(
"llm.log",
level="DEBUG",
filter=lambda record: "llm" in record["name"].lower()
){ .api }
from pipecat.observers.loggers import DebugLogObserver
# Log all frame flow
observer = TaskObserver()
observer.add_observer(DebugLogObserver())
task = PipelineTask(pipeline, observer=observer){ .api }
class DebugProcessor(FrameProcessor):
"""Log frame flow for debugging."""
async def process_frame(self, frame, direction):
logger.debug(f"{direction.value}: {frame.__class__.__name__}")
await self.push_frame(frame, direction)