docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Pipecat provides essential core utilities for time handling, object identification, and string processing used throughout the framework.
{ .api }
from pipecat.utils.time import (
time_now_iso8601,
seconds_to_nanoseconds,
nanoseconds_to_seconds,
nanoseconds_to_str
)
def time_now_iso8601() -> str:
"""Get the current UTC time as an ISO8601 formatted string.
Returns:
The current UTC time in ISO8601 format with millisecond precision
Example:
timestamp = time_now_iso8601()
# Returns: "2024-01-15T14:30:45.123+00:00"
"""
pass
def seconds_to_nanoseconds(seconds: float) -> int:
"""Convert seconds to nanoseconds.
Args:
seconds: The number of seconds to convert
Returns:
The equivalent number of nanoseconds as an integer
Example:
ns = seconds_to_nanoseconds(1.5)
# Returns: 1500000000
"""
pass
def nanoseconds_to_seconds(nanoseconds: int) -> float:
"""Convert nanoseconds to seconds.
Args:
nanoseconds: The number of nanoseconds to convert
Returns:
The equivalent number of seconds as a float
Example:
sec = nanoseconds_to_seconds(1500000000)
# Returns: 1.5
"""
pass
def nanoseconds_to_str(nanoseconds: int) -> str:
"""Convert nanoseconds to a human-readable time string.
Args:
nanoseconds: The number of nanoseconds to convert
Returns:
A formatted time string in "H:MM:SS.microseconds" format
Example:
time_str = nanoseconds_to_str(3661500000000)
# Returns: "1:01:01.500000"
"""
pass{ .api }
from pipecat.utils.utils import obj_id, obj_count
def obj_id() -> int:
"""Generate a unique ID for an object.
Provides a globally unique identifier that increments across all
objects in the system. Thread-safe.
Returns:
A unique integer identifier
Example:
id1 = obj_id() # Returns: 0
id2 = obj_id() # Returns: 1
id3 = obj_id() # Returns: 2
# Use for object tracking
class Processor:
def __init__(self):
self.id = obj_id()
"""
pass
def obj_count(obj) -> int:
"""Generate a unique count for an object based on its class.
Provides a per-class instance counter that increments for each
instance of a specific class type. Thread-safe.
Args:
obj: The object instance to count
Returns:
A unique integer count for this class type
Example:
class MyProcessor:
def __init__(self):
self.count = obj_count(self)
p1 = MyProcessor() # count = 0
p2 = MyProcessor() # count = 1
p3 = MyProcessor() # count = 2
# Different class has separate counter
class OtherProcessor:
def __init__(self):
self.count = obj_count(self)
o1 = OtherProcessor() # count = 0 (separate counter)
"""
passComprehensive text processing utilities including sentence boundary detection, tag parsing, and pattern matching.
{ .api }
from pipecat.utils.string import SENTENCE_ENDING_PUNCTUATION, match_endofsentence
# Comprehensive set of sentence-ending punctuation from multiple writing systems
SENTENCE_ENDING_PUNCTUATION: FrozenSet[str]
"""Set of sentence-ending punctuation marks from multiple writing systems.
Includes punctuation from:
- Latin script: . ! ? ; …
- East Asian: 。?!;.。
- Indic scripts: । ॥
- Arabic script: ؟ ؛ ۔ ؏
- Thai: ။
- Myanmar/Burmese: ၊ ။
- Khmer: ។ ៕
- Lao: ໌ ༎
- Tibetan: ། ༎
- Armenian: ։ ՜ ՞
- Ethiopic: ። ፧ ፨
"""
def match_endofsentence(text: str) -> Optional[int]:
"""Find the end of the first complete sentence in text using NLTK.
Uses NLTK's sentence tokenizer for robust sentence boundary detection
across multiple languages and edge cases. Handles abbreviations, numbers,
and other common false positives.
Args:
text: The text to analyze for sentence boundaries
Returns:
The character index where the first sentence ends, or None if
no complete sentence is found
Example:
text = "Hello world. More text here."
index = match_endofsentence(text)
# Returns: 13
sentence = text[:index] # "Hello world."
remaining = text[index:] # " More text here."
# Handles abbreviations correctly
text = "Dr. Smith arrived."
index = match_endofsentence(text)
# Returns: 18 (doesn't split at "Dr.")
# Handles numbers
text = "The price is $29.99. Buy now!"
index = match_endofsentence(text)
# Returns: 21 (doesn't split at decimal point)
"""
pass{ .api }
from pipecat.utils.string import StartEndTags, parse_start_end_tags
# Type alias for start/end tag pairs
StartEndTags = Tuple[str, str]
def parse_start_end_tags(
text: str,
tags: Sequence[StartEndTags],
current_tag: Optional[StartEndTags],
current_tag_index: int
) -> Tuple[Optional[StartEndTags], int]:
"""Parse text for start/end tag pairs and track current tag state.
Tracks whether we're currently inside a tag pair and updates the
position within the end tag if found. Handles tags that may span
multiple text chunks in streaming scenarios.
Args:
text: The text to parse for tags
tags: Sequence of (start_tag, end_tag) tuples to search for
current_tag: The currently active tag pair, or None if not in a tag
current_tag_index: Current position within the end tag
Returns:
Tuple of (current_tag, current_tag_index) after processing text
Example:
# Define tags to track
tags = [
("<code>", "</code>"),
("<think>", "</think>")
]
current_tag = None
current_index = 0
# First chunk: entering code block
text1 = "Here is <code>print('Hi')"
current_tag, current_index = parse_start_end_tags(
text1, tags, current_tag, current_index
)
# current_tag = ("<code>", "</code>") - inside code block
# current_index = 0
# Second chunk: still inside code block
text2 = " + 'there'"
current_tag, current_index = parse_start_end_tags(
text2, tags, current_tag, current_index
)
# current_tag = ("<code>", "</code>") - still inside
# current_index = 0
# Third chunk: exiting code block
text3 = ")</code> more text"
current_tag, current_index = parse_start_end_tags(
text3, tags, current_tag, current_index
)
# current_tag = None - exited code block
# current_index = 0
"""
pass{ .api }
from pipecat.utils.string import replace_match
def replace_match(text: str, match: re.Match, old: str, new: str) -> str:
"""Replace occurrences of a substring within a matched section of text.
Performs replacements only within the boundaries of a regex match,
leaving text outside the match unchanged.
Args:
text: The input text in which replacements will be made
match: A regex match object representing the section to modify
old: The substring to replace within the matched section
new: The replacement substring
Returns:
The text with replacements applied within the matched section
Example:
import re
text = "Before [remove this remove] after"
pattern = r"\\[.*?\\]"
match = re.search(pattern, text)
if match:
result = replace_match(text, match, "remove", "keep")
# Returns: "Before [keep this keep] after"
"""
pass{ .api }
from pipecat.utils.time import (
time_now_iso8601,
seconds_to_nanoseconds,
nanoseconds_to_seconds
)
# Timestamp logging
def log_event(event: str):
timestamp = time_now_iso8601()
print(f"[{timestamp}] {event}")
log_event("Pipeline started")
# Outputs: [2024-01-15T14:30:45.123+00:00] Pipeline started
# Duration measurement
start_ns = seconds_to_nanoseconds(time.time())
await process_data()
end_ns = seconds_to_nanoseconds(time.time())
duration_ns = end_ns - start_ns
duration_sec = nanoseconds_to_seconds(duration_ns)
print(f"Processing took {duration_sec:.3f} seconds")
# Human-readable timing
from pipecat.utils.time import nanoseconds_to_str
elapsed_ns = 3661500000000 # 1 hour, 1 minute, 1.5 seconds
time_str = nanoseconds_to_str(elapsed_ns)
print(f"Elapsed: {time_str}") # "1:01:01.500000"{ .api }
from pipecat.utils.utils import obj_id, obj_count
# Unique processor IDs
class FrameProcessor:
def __init__(self, name: str):
self.id = obj_id()
self.name = name
self.instance_count = obj_count(self)
def __str__(self):
return f"{self.name}_{self.instance_count} (id={self.id})"
# Create processors
p1 = FrameProcessor("TTS") # TTS_0 (id=0)
p2 = FrameProcessor("STT") # STT_0 (id=1)
p3 = FrameProcessor("TTS") # TTS_1 (id=2)
# Track by class
class MetricsCollector:
def __init__(self):
self.processor_counts = {}
def register(self, processor: FrameProcessor):
class_name = processor.__class__.__name__
count = obj_count(processor)
self.processor_counts[class_name] = count + 1
def report(self):
for cls, count in self.processor_counts.items():
print(f"{cls}: {count} instances"){ .api }
from pipecat.utils.string import match_endofsentence
# Split text into sentences
def split_sentences(text: str) -> list[str]:
"""Split text into individual sentences."""
sentences = []
remaining = text
while remaining:
# Find next sentence boundary
index = match_endofsentence(remaining)
if index is None:
# No more complete sentences
if remaining.strip():
sentences.append(remaining.strip())
break
# Extract sentence
sentence = remaining[:index].strip()
if sentence:
sentences.append(sentence)
# Continue with remaining text
remaining = remaining[index:]
return sentences
# Usage
text = "Hello world. How are you? I'm fine thanks!"
sentences = split_sentences(text)
# Returns: ["Hello world.", "How are you?", "I'm fine thanks!"]
# Handles complex cases
text = "Dr. Smith paid $29.99. Great deal!"
sentences = split_sentences(text)
# Returns: ["Dr. Smith paid $29.99.", "Great deal!"]{ .api }
from pipecat.utils.string import parse_start_end_tags, StartEndTags
# Track tags in streaming text
class TagTracker:
def __init__(self, tags: list[StartEndTags]):
self.tags = tags
self.current_tag = None
self.current_index = 0
self.inside_tag = False
async def process_chunk(self, chunk: str) -> str:
"""Process text chunk and track tag state."""
# Update tag state
self.current_tag, self.current_index = parse_start_end_tags(
chunk,
self.tags,
self.current_tag,
self.current_index
)
# Check if inside tag
self.inside_tag = self.current_tag is not None
if self.inside_tag:
print(f"Inside tag: {self.current_tag[0]}")
# Handle tagged content
return "" # Skip content in tags
else:
# Process normal content
return chunk
# Usage
tracker = TagTracker([
("<code>", "</code>"),
("<thinking>", "</thinking>")
])
# Process streaming text
chunks = [
"Normal text ",
"<code>print('hi')",
")</code>",
" more text"
]
for chunk in chunks:
processed = await tracker.process_chunk(chunk)
if processed:
await send_to_tts(processed){ .api }
from pipecat.utils.time import time_now_iso8601, nanoseconds_to_seconds
from pipecat.utils.utils import obj_id, obj_count
import time
class ProcessorMetrics:
"""Track processor performance metrics."""
def __init__(self, processor):
self.processor_id = obj_id()
self.processor_class = processor.__class__.__name__
self.instance_num = obj_count(processor)
self.start_time = None
self.frame_count = 0
self.total_processing_ns = 0
def start_frame(self):
"""Mark the start of frame processing."""
self.start_time = time.time_ns()
def end_frame(self):
"""Mark the end of frame processing."""
if self.start_time:
duration_ns = time.time_ns() - self.start_time
self.total_processing_ns += duration_ns
self.frame_count += 1
self.start_time = None
def report(self) -> dict:
"""Generate metrics report."""
avg_ns = (
self.total_processing_ns / self.frame_count
if self.frame_count > 0
else 0
)
return {
"processor_id": self.processor_id,
"processor_class": self.processor_class,
"instance": self.instance_num,
"frames_processed": self.frame_count,
"total_time_sec": nanoseconds_to_seconds(self.total_processing_ns),
"avg_time_sec": nanoseconds_to_seconds(avg_ns),
"timestamp": time_now_iso8601()
}
# Usage
class MyProcessor:
def __init__(self):
self.metrics = ProcessorMetrics(self)
async def process_frame(self, frame):
self.metrics.start_frame()
try:
await self.do_processing(frame)
finally:
self.metrics.end_frame()
def get_metrics(self):
return self.metrics.report()
# Create processors
p1 = MyProcessor()
p2 = MyProcessor()
# Process frames
for frame in frames:
await p1.process_frame(frame)
# Get metrics
print(p1.get_metrics())
# {
# "processor_id": 0,
# "processor_class": "MyProcessor",
# "instance": 0,
# "frames_processed": 100,
# "total_time_sec": 5.234,
# "avg_time_sec": 0.05234,
# "timestamp": "2024-01-15T14:30:45.123+00:00"
# }{ .api }
from pipecat.utils.string import match_endofsentence
class StreamingSentenceBuffer:
"""Buffer for accumulating and yielding complete sentences from streaming text."""
def __init__(self):
self.buffer = ""
async def add_text(self, text: str) -> list[str]:
"""Add text to buffer and return any complete sentences.
Args:
text: New text chunk to add
Returns:
List of complete sentences found
"""
self.buffer += text
sentences = []
# Extract all complete sentences
while True:
index = match_endofsentence(self.buffer)
if index is None:
break
# Found a sentence
sentence = self.buffer[:index].strip()
if sentence:
sentences.append(sentence)
# Remove from buffer
self.buffer = self.buffer[index:]
return sentences
async def flush(self) -> Optional[str]:
"""Flush remaining buffer contents.
Returns:
Any remaining text in buffer
"""
if self.buffer.strip():
text = self.buffer.strip()
self.buffer = ""
return text
return None
# Usage
buffer = StreamingSentenceBuffer()
# Process streaming LLM output
async for chunk in llm_stream:
sentences = await buffer.add_text(chunk)
for sentence in sentences:
# Send complete sentences to TTS immediately
await tts.speak(sentence)
# End of stream - flush remaining
remaining = await buffer.flush()
if remaining:
await tts.speak(remaining){ .api }
from pipecat.utils.time import time_now_iso8601
# Good: ISO8601 format for logs and metrics
timestamp = time_now_iso8601()
log_entry = {"event": "started", "timestamp": timestamp}
# {"event": "started", "timestamp": "2024-01-15T14:30:45.123+00:00"}
# Good: Consistent across systems and timezones
# Bad: Using local time without timezone
import datetime
timestamp = datetime.datetime.now().isoformat() # Missing timezone info!{ .api }
from pipecat.utils.time import seconds_to_nanoseconds, nanoseconds_to_seconds
# Good: Use nanoseconds for high-precision timing
start_ns = time.time_ns()
await process()
end_ns = time.time_ns()
duration_ns = end_ns - start_ns
# Good: Convert to seconds for display
duration_sec = nanoseconds_to_seconds(duration_ns)
print(f"Processing took {duration_sec:.6f} seconds")
# Bad: Using float seconds (loses precision)
start = time.time()
await process()
duration = time.time() - start # Less precise for short operations{ .api }
from pipecat.utils.utils import obj_count
# Good: Per-class instance counting
class Processor:
def __init__(self, name: str):
self.name = name
self.instance = obj_count(self)
def __str__(self):
return f"{self.name}_{self.instance}"
# Creates clear, unique identifiers per class
p1 = Processor("TTS") # TTS_0
p2 = Processor("TTS") # TTS_1
# Bad: Manual counting (error-prone)
class Processor:
_count = 0
def __init__(self):
Processor._count += 1 # Not thread-safe!
self.instance = Processor._count{ .api }
from pipecat.utils.string import match_endofsentence
# Good: Always flush remaining content
async def process_stream(chunks):
buffer = ""
for chunk in chunks:
buffer += chunk
# Process complete sentences
while True:
index = match_endofsentence(buffer)
if index is None:
break
sentence = buffer[:index]
await process(sentence)
buffer = buffer[index:]
# Critical: Flush remaining buffer
if buffer.strip():
await process(buffer.strip())
# Bad: Forgetting to flush (loses last partial sentence)
# Don't forget this step!{ .api }
from pipecat.utils.string import StartEndTags, parse_start_end_tags
from typing import Sequence
# Good: Use type hints and constants
SUPPORTED_TAGS: Sequence[StartEndTags] = [
("<code>", "</code>"),
("<thinking>", "</thinking>"),
("<math>", "</math>")
]
def track_tags(text: str, tags: Sequence[StartEndTags]) -> bool:
current_tag = None
current_index = 0
current_tag, current_index = parse_start_end_tags(
text, tags, current_tag, current_index
)
return current_tag is not None
# Bad: Using string tuples without type hints
tags = [("<code>", "</code>")] # What type is this?
# Missing type safety and IDE support