docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Standard pattern for voice agents:
{ .api }
# 1. Set up transport
transport = DailyTransport(
room_url="...",
token="...",
params=DailyParams(audio_in_enabled=True, audio_out_enabled=True)
)
# 2. Configure services
stt = DeepgramSTTService(api_key="...")
llm = OpenAILLMService(api_key="...", model="gpt-4")
tts = ElevenLabsTTSService(api_key="...", voice_id="...")
# 3. Set LLM context
context = LLMContext(messages=[...])
llm.set_context(context)
# 4. Build pipeline
pipeline = Pipeline([
transport.input(),
stt,
user_aggregator,
llm,
tts,
assistant_aggregator,
transport.output()
])
# 5. Run
task = PipelineTask(pipeline)
await task.run()Enable users to interrupt bot speech:
{ .api }
# Enable interruptions in transport
params = TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True, # Required for interruption detection
vad_analyzer=SileroVADAnalyzer(),
)
# Enable in pipeline task
task = PipelineTask(
pipeline,
params=PipelineParams(allow_interruptions=True)
)
# Pipeline automatically:
# - Preserves SystemFrames
# - Cancels DataFrames/ControlFrames
# - Respects UninterruptibleFrameSwitch services at runtime:
{ .api }
from pipecat.pipeline.service_switcher import (
ServiceSwitcher,
ServiceSwitcherStrategyManual
)
switcher = ServiceSwitcher(
services=[service1, service2, service3],
strategy=ServiceSwitcherStrategyManual(default_index=0)
)
# Switch via frame
from pipecat.frames.frames import ManuallySwitchServiceFrame
await task.queue_frame(ManuallySwitchServiceFrame(service_index=1))Create custom frame processors:
{ .api }
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.frames.frames import Frame, TextFrame
class TextTransformProcessor(FrameProcessor):
"""Transform text frames."""
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, TextFrame):
frame.text = frame.text.upper()
await self.push_frame(frame, direction)Handle errors gracefully:
{ .api }
from pipecat.frames.frames import ErrorFrame, FatalErrorFrame
try:
result = await some_operation()
except RecoverableError as e:
await self.push_frame(ErrorFrame(error=str(e)))
except FatalError as e:
await self.push_frame(FatalErrorFrame(error=str(e)))Automatic failover between services:
{ .api }
from pipecat.pipeline.service_switcher import (
ServiceSwitcher,
ServiceSwitcherStrategy
)
class FallbackStrategy(ServiceSwitcherStrategy):
def __init__(self):
self._current = 0
def select(self, services):
return self._current
def on_error(self):
self._current = (self._current + 1) % len(services)
switcher = ServiceSwitcher(
services=[primary, secondary, tertiary],
strategy=FallbackStrategy()
)Complex processing with sub-pipelines:
{ .api }
# Audio processing sub-pipeline
audio_pipeline = Pipeline([
audio_filter,
audio_resampler,
audio_mixer
])
# Main pipeline
main_pipeline = Pipeline([
transport.input(),
audio_pipeline, # Sub-pipeline
stt,
llm,
tts,
transport.output()
])Build pipelines based on configuration:
{ .api }
def build_pipeline(use_vision: bool = False, enable_functions: bool = False):
"""Build pipeline with optional features."""
processors = [transport.input(), stt, user_aggregator]
if use_vision:
processors.extend([vision_service, vision_aggregator])
processors.append(llm)
if enable_functions:
processors.append(function_handler)
processors.extend([assistant_aggregator, tts, transport.output()])
return Pipeline(processors)Inject frames into running pipeline:
{ .api }
async def inject_announcement():
"""Inject urgent announcement."""
await asyncio.sleep(5)
await task.queue_frames([
TextFrame("Emergency announcement"),
TTSSpeakFrame("This is urgent")
])
# Run alongside pipeline
await asyncio.gather(
task.run(),
inject_announcement()
)Track pipeline performance:
{ .api }
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.observers.loggers import (
MetricsLogObserver,
LLMLogObserver,
UserBotLatencyLogObserver
)
observer = TaskObserver()
observer.add_observer(MetricsLogObserver())
observer.add_observer(LLMLogObserver(log_tokens=True))
observer.add_observer(UserBotLatencyLogObserver())
task = PipelineTask(pipeline, observer=observer)Manage pipeline lifecycle:
{ .api }
import asyncio
async def managed_pipeline():
task = PipelineTask(pipeline)
try:
await asyncio.wait_for(task.run(), timeout=300)
except asyncio.TimeoutError:
print("Pipeline timeout")
await task.cancel()
except KeyboardInterrupt:
print("User interrupt")
await task.cancel()
finally:
if not task.has_finished():
print("Pipeline did not complete")