or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

common-patterns.mddocs/examples/

Common Patterns

Building a Voice Agent

Standard pattern for voice agents:

{ .api }
# 1. Set up transport
transport = DailyTransport(
    room_url="...",
    token="...",
    params=DailyParams(audio_in_enabled=True, audio_out_enabled=True)
)

# 2. Configure services
stt = DeepgramSTTService(api_key="...")
llm = OpenAILLMService(api_key="...", model="gpt-4")
tts = ElevenLabsTTSService(api_key="...", voice_id="...")

# 3. Set LLM context
context = LLMContext(messages=[...])
llm.set_context(context)

# 4. Build pipeline
pipeline = Pipeline([
    transport.input(),
    stt,
    user_aggregator,
    llm,
    tts,
    assistant_aggregator,
    transport.output()
])

# 5. Run
task = PipelineTask(pipeline)
await task.run()

Handling Interruptions

Enable users to interrupt bot speech:

{ .api }
# Enable interruptions in transport
params = TransportParams(
    audio_in_enabled=True,
    audio_out_enabled=True,
    vad_enabled=True,  # Required for interruption detection
    vad_analyzer=SileroVADAnalyzer(),
)

# Enable in pipeline task
task = PipelineTask(
    pipeline,
    params=PipelineParams(allow_interruptions=True)
)

# Pipeline automatically:
# - Preserves SystemFrames
# - Cancels DataFrames/ControlFrames
# - Respects UninterruptibleFrame

Dynamic Service Switching

Switch services at runtime:

{ .api }
from pipecat.pipeline.service_switcher import (
    ServiceSwitcher,
    ServiceSwitcherStrategyManual
)

switcher = ServiceSwitcher(
    services=[service1, service2, service3],
    strategy=ServiceSwitcherStrategyManual(default_index=0)
)

# Switch via frame
from pipecat.frames.frames import ManuallySwitchServiceFrame
await task.queue_frame(ManuallySwitchServiceFrame(service_index=1))

Custom Processors

Create custom frame processors:

{ .api }
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.frames.frames import Frame, TextFrame

class TextTransformProcessor(FrameProcessor):
    """Transform text frames."""

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        if isinstance(frame, TextFrame):
            frame.text = frame.text.upper()
        await self.push_frame(frame, direction)

Error Handling

Handle errors gracefully:

{ .api }
from pipecat.frames.frames import ErrorFrame, FatalErrorFrame

try:
    result = await some_operation()
except RecoverableError as e:
    await self.push_frame(ErrorFrame(error=str(e)))
except FatalError as e:
    await self.push_frame(FatalErrorFrame(error=str(e)))

Service Fallback

Automatic failover between services:

{ .api }
from pipecat.pipeline.service_switcher import (
    ServiceSwitcher,
    ServiceSwitcherStrategy
)

class FallbackStrategy(ServiceSwitcherStrategy):
    def __init__(self):
        self._current = 0
    
    def select(self, services):
        return self._current
    
    def on_error(self):
        self._current = (self._current + 1) % len(services)

switcher = ServiceSwitcher(
    services=[primary, secondary, tertiary],
    strategy=FallbackStrategy()
)

Nested Pipelines

Complex processing with sub-pipelines:

{ .api }
# Audio processing sub-pipeline
audio_pipeline = Pipeline([
    audio_filter,
    audio_resampler,
    audio_mixer
])

# Main pipeline
main_pipeline = Pipeline([
    transport.input(),
    audio_pipeline,  # Sub-pipeline
    stt,
    llm,
    tts,
    transport.output()
])

Dynamic Pipeline Construction

Build pipelines based on configuration:

{ .api }
def build_pipeline(use_vision: bool = False, enable_functions: bool = False):
    """Build pipeline with optional features."""
    processors = [transport.input(), stt, user_aggregator]
    
    if use_vision:
        processors.extend([vision_service, vision_aggregator])
    
    processors.append(llm)
    
    if enable_functions:
        processors.append(function_handler)
    
    processors.extend([assistant_aggregator, tts, transport.output()])
    
    return Pipeline(processors)

Frame Injection

Inject frames into running pipeline:

{ .api }
async def inject_announcement():
    """Inject urgent announcement."""
    await asyncio.sleep(5)
    await task.queue_frames([
        TextFrame("Emergency announcement"),
        TTSSpeakFrame("This is urgent")
    ])

# Run alongside pipeline
await asyncio.gather(
    task.run(),
    inject_announcement()
)

Monitoring and Metrics

Track pipeline performance:

{ .api }
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.observers.loggers import (
    MetricsLogObserver,
    LLMLogObserver,
    UserBotLatencyLogObserver
)

observer = TaskObserver()
observer.add_observer(MetricsLogObserver())
observer.add_observer(LLMLogObserver(log_tokens=True))
observer.add_observer(UserBotLatencyLogObserver())

task = PipelineTask(pipeline, observer=observer)

Task Lifecycle Management

Manage pipeline lifecycle:

{ .api }
import asyncio

async def managed_pipeline():
    task = PipelineTask(pipeline)
    
    try:
        await asyncio.wait_for(task.run(), timeout=300)
    except asyncio.TimeoutError:
        print("Pipeline timeout")
        await task.cancel()
    except KeyboardInterrupt:
        print("User interrupt")
        await task.cancel()
    finally:
        if not task.has_finished():
            print("Pipeline did not complete")

See Also

  • Real-World Scenarios - Complete application examples
  • Edge Cases - Advanced scenarios
  • API Reference - Complete API documentation