or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

error-handling.mddocs/reference/

Error Handling Reference

Error Frame Types

ErrorFrame

{ .api }
from pipecat.frames.frames import ErrorFrame

class ErrorFrame(SystemFrame):
    """Recoverable error.
    
    Attributes:
        error: Error message string
    """
    
    def __init__(self, error: str):
        pass

# Usage
try:
    result = await operation()
except Exception as e:
    await self.push_frame(ErrorFrame(error=str(e)))
    # Continue processing

FatalErrorFrame

{ .api }
from pipecat.frames.frames import FatalErrorFrame

class FatalErrorFrame(ErrorFrame):
    """Unrecoverable error that stops pipeline."""
    
    def __init__(self, error: str):
        pass

# Usage
try:
    result = await critical_operation()
except CriticalError as e:
    await self.push_frame(FatalErrorFrame(error=str(e)))
    # Pipeline will stop

Common Error Patterns

Service Connection Errors

{ .api }
@service.event_handler("on_connection_error")
async def handle_connection_error(error: Exception):
    """Handle service connection errors."""
    logger.error(f"Connection error: {error}")
    # Implement retry or fallback
    await asyncio.sleep(5)
    await service.reconnect()

LLM Timeout Errors

{ .api }
@llm.event_handler("on_completion_timeout")
async def handle_llm_timeout():
    """Handle LLM completion timeout."""
    logger.warning("LLM timed out")
    await task.queue_frame(
        TextFrame("I apologize, I'm taking too long.")
    )

Transport Errors

{ .api }
@transport.event_handler("on_transport_error")
async def handle_transport_error(error: Exception):
    """Handle transport errors."""
    logger.error(f"Transport error: {error}")
    # Cleanup and reconnect
    await transport.stop()
    await transport.start()

Error Recovery Strategies

Retry with Exponential Backoff

{ .api }
async def retry_with_backoff(
    func,
    max_retries: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0
):
    """Retry with exponential backoff."""
    delay = initial_delay
    last_exception = None
    
    for attempt in range(max_retries):
        try:
            return await func()
        except Exception as e:
            last_exception = e
            if attempt < max_retries - 1:
                await asyncio.sleep(delay)
                delay = min(delay * 2, max_delay)
    
    raise last_exception

Service Fallback

{ .api }
from pipecat.pipeline.service_switcher import (
    ServiceSwitcher,
    ServiceSwitcherStrategy
)

class FallbackStrategy(ServiceSwitcherStrategy):
    """Automatic fallback on error."""
    
    def __init__(self):
        self._current = 0
        self._failed = set()
    
    def select(self, services):
        for i in range(len(services)):
            idx = (self._current + i) % len(services)
            if idx not in self._failed:
                return idx
        self._failed.clear()
        return 0
    
    def on_error(self, service_index: int):
        self._failed.add(service_index)
        self._current = (service_index + 1) % len(services)

# Usage
switcher = ServiceSwitcher(
    services=[primary, secondary, tertiary],
    strategy=FallbackStrategy()
)

Circuit Breaker

{ .api }
from enum import Enum
from time import time

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open" # Testing recovery

class CircuitBreaker:
    """Circuit breaker pattern for service calls."""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0
    ):
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._failure_threshold = failure_threshold
        self._recovery_timeout = recovery_timeout
        self._last_failure_time = 0
    
    async def call(self, func):
        """Execute function with circuit breaker."""
        if self._state == CircuitState.OPEN:
            if time() - self._last_failure_time > self._recovery_timeout:
                self._state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func()
            if self._state == CircuitState.HALF_OPEN:
                self._state = CircuitState.CLOSED
                self._failure_count = 0
            return result
        
        except Exception as e:
            self._failure_count += 1
            self._last_failure_time = time()
            
            if self._failure_count >= self._failure_threshold:
                self._state = CircuitState.OPEN
            
            raise e

Exception Hierarchy

Pipecat Exceptions

{ .api }
# Base exceptions (framework-specific)
class PipecatError(Exception):
    """Base Pipecat exception."""
    pass

class ServiceError(PipecatError):
    """Service-related error."""
    pass

class TransportError(PipecatError):
    """Transport-related error."""
    pass

class ProcessorError(PipecatError):
    """Processor-related error."""
    pass

Error Handling Best Practices

Processor Error Handling

{ .api }
class RobustProcessor(FrameProcessor):
    """Processor with comprehensive error handling."""
    
    async def process_frame(self, frame, direction):
        try:
            await self._process(frame)
            await self.push_frame(frame, direction)
        
        except asyncio.TimeoutError:
            logger.warning("Processing timeout")
            await self.push_frame(ErrorFrame(error="Timeout"))
            await self.push_frame(frame, direction)
        
        except RecoverableError as e:
            logger.error(f"Recoverable: {e}")
            await self.push_frame(ErrorFrame(error=str(e)))
            await self.push_frame(frame, direction)
        
        except Exception as e:
            logger.critical(f"Fatal: {e}")
            await self.push_frame(FatalErrorFrame(error=str(e)))

Service Error Handling

{ .api }
class ResilientService(LLMService):
    """Service with error recovery."""
    
    async def _with_retry(self, operation):
        """Execute with retry logic."""
        for attempt in range(3):
            try:
                return await operation()
            except ConnectionError as e:
                if attempt == 2:
                    raise ServiceError(f"Failed after 3 attempts: {e}")
                await asyncio.sleep(2 ** attempt)

Pipeline Error Handling

{ .api }
async def run_pipeline_with_error_handling():
    """Run pipeline with comprehensive error handling."""
    task = PipelineTask(pipeline)
    
    try:
        await task.run()
    
    except asyncio.CancelledError:
        logger.info("Pipeline cancelled")
        raise
    
    except KeyboardInterrupt:
        logger.info("User interrupted")
        await task.cancel()
    
    except Exception as e:
        logger.error(f"Pipeline error: {e}", exc_info=True)
        await task.cancel()
        raise
    
    finally:
        # Always cleanup
        await transport.stop()
        logger.info("Cleanup complete")

Logging Configuration

Basic Logging

{ .api }
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

Loguru Configuration

{ .api }
from loguru import logger

logger.add(
    "pipecat.log",
    rotation="500 MB",
    retention="10 days",
    level="DEBUG",
    format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)

# Filter specific components
logger.add(
    "llm.log",
    level="DEBUG",
    filter=lambda record: "llm" in record["name"].lower()
)

Debugging Configuration

Debug Observers

{ .api }
from pipecat.observers.loggers import DebugLogObserver

# Log all frame flow
observer = TaskObserver()
observer.add_observer(DebugLogObserver())

task = PipelineTask(pipeline, observer=observer)

Frame Flow Debugging

{ .api }
class DebugProcessor(FrameProcessor):
    """Log frame flow for debugging."""
    
    async def process_frame(self, frame, direction):
        logger.debug(f"{direction.value}: {frame.__class__.__name__}")
        await self.push_frame(frame, direction)

See Also

  • API Reference - Complete API
  • Examples - Error handling examples
  • Services Reference - Service-specific errors