or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

core-utilities.mddocs/utilities/

Core Utilities

Pipecat provides essential core utilities for time handling, object identification, and string processing used throughout the framework.

Time Utilities

{ .api }
from pipecat.utils.time import (
    time_now_iso8601,
    seconds_to_nanoseconds,
    nanoseconds_to_seconds,
    nanoseconds_to_str
)

def time_now_iso8601() -> str:
    """Get the current UTC time as an ISO8601 formatted string.

    Returns:
        The current UTC time in ISO8601 format with millisecond precision

    Example:
        timestamp = time_now_iso8601()
        # Returns: "2024-01-15T14:30:45.123+00:00"
    """
    pass

def seconds_to_nanoseconds(seconds: float) -> int:
    """Convert seconds to nanoseconds.

    Args:
        seconds: The number of seconds to convert

    Returns:
        The equivalent number of nanoseconds as an integer

    Example:
        ns = seconds_to_nanoseconds(1.5)
        # Returns: 1500000000
    """
    pass

def nanoseconds_to_seconds(nanoseconds: int) -> float:
    """Convert nanoseconds to seconds.

    Args:
        nanoseconds: The number of nanoseconds to convert

    Returns:
        The equivalent number of seconds as a float

    Example:
        sec = nanoseconds_to_seconds(1500000000)
        # Returns: 1.5
    """
    pass

def nanoseconds_to_str(nanoseconds: int) -> str:
    """Convert nanoseconds to a human-readable time string.

    Args:
        nanoseconds: The number of nanoseconds to convert

    Returns:
        A formatted time string in "H:MM:SS.microseconds" format

    Example:
        time_str = nanoseconds_to_str(3661500000000)
        # Returns: "1:01:01.500000"
    """
    pass

Object Utilities

{ .api }
from pipecat.utils.utils import obj_id, obj_count

def obj_id() -> int:
    """Generate a unique ID for an object.

    Provides a globally unique identifier that increments across all
    objects in the system. Thread-safe.

    Returns:
        A unique integer identifier

    Example:
        id1 = obj_id()  # Returns: 0
        id2 = obj_id()  # Returns: 1
        id3 = obj_id()  # Returns: 2

        # Use for object tracking
        class Processor:
            def __init__(self):
                self.id = obj_id()
    """
    pass

def obj_count(obj) -> int:
    """Generate a unique count for an object based on its class.

    Provides a per-class instance counter that increments for each
    instance of a specific class type. Thread-safe.

    Args:
        obj: The object instance to count

    Returns:
        A unique integer count for this class type

    Example:
        class MyProcessor:
            def __init__(self):
                self.count = obj_count(self)

        p1 = MyProcessor()  # count = 0
        p2 = MyProcessor()  # count = 1
        p3 = MyProcessor()  # count = 2

        # Different class has separate counter
        class OtherProcessor:
            def __init__(self):
                self.count = obj_count(self)

        o1 = OtherProcessor()  # count = 0 (separate counter)
    """
    pass

String Utilities

Comprehensive text processing utilities including sentence boundary detection, tag parsing, and pattern matching.

Sentence Detection

{ .api }
from pipecat.utils.string import SENTENCE_ENDING_PUNCTUATION, match_endofsentence

# Comprehensive set of sentence-ending punctuation from multiple writing systems
SENTENCE_ENDING_PUNCTUATION: FrozenSet[str]
"""Set of sentence-ending punctuation marks from multiple writing systems.

Includes punctuation from:
    - Latin script: . ! ? ; …
    - East Asian: 。?!;.。
    - Indic scripts: । ॥
    - Arabic script: ؟ ؛ ۔ ؏
    - Thai: ။
    - Myanmar/Burmese: ၊ ။
    - Khmer: ។ ៕
    - Lao: ໌ ༎
    - Tibetan: ། ༎
    - Armenian: ։ ՜ ՞
    - Ethiopic: ። ፧ ፨
"""

def match_endofsentence(text: str) -> Optional[int]:
    """Find the end of the first complete sentence in text using NLTK.

    Uses NLTK's sentence tokenizer for robust sentence boundary detection
    across multiple languages and edge cases. Handles abbreviations, numbers,
    and other common false positives.

    Args:
        text: The text to analyze for sentence boundaries

    Returns:
        The character index where the first sentence ends, or None if
        no complete sentence is found

    Example:
        text = "Hello world. More text here."
        index = match_endofsentence(text)
        # Returns: 13

        sentence = text[:index]   # "Hello world."
        remaining = text[index:]  # " More text here."

        # Handles abbreviations correctly
        text = "Dr. Smith arrived."
        index = match_endofsentence(text)
        # Returns: 18 (doesn't split at "Dr.")

        # Handles numbers
        text = "The price is $29.99. Buy now!"
        index = match_endofsentence(text)
        # Returns: 21 (doesn't split at decimal point)
    """
    pass

Tag Parsing

{ .api }
from pipecat.utils.string import StartEndTags, parse_start_end_tags

# Type alias for start/end tag pairs
StartEndTags = Tuple[str, str]

def parse_start_end_tags(
    text: str,
    tags: Sequence[StartEndTags],
    current_tag: Optional[StartEndTags],
    current_tag_index: int
) -> Tuple[Optional[StartEndTags], int]:
    """Parse text for start/end tag pairs and track current tag state.

    Tracks whether we're currently inside a tag pair and updates the
    position within the end tag if found. Handles tags that may span
    multiple text chunks in streaming scenarios.

    Args:
        text: The text to parse for tags
        tags: Sequence of (start_tag, end_tag) tuples to search for
        current_tag: The currently active tag pair, or None if not in a tag
        current_tag_index: Current position within the end tag

    Returns:
        Tuple of (current_tag, current_tag_index) after processing text

    Example:
        # Define tags to track
        tags = [
            ("<code>", "</code>"),
            ("<think>", "</think>")
        ]

        current_tag = None
        current_index = 0

        # First chunk: entering code block
        text1 = "Here is <code>print('Hi')"
        current_tag, current_index = parse_start_end_tags(
            text1, tags, current_tag, current_index
        )
        # current_tag = ("<code>", "</code>") - inside code block
        # current_index = 0

        # Second chunk: still inside code block
        text2 = " + 'there'"
        current_tag, current_index = parse_start_end_tags(
            text2, tags, current_tag, current_index
        )
        # current_tag = ("<code>", "</code>") - still inside
        # current_index = 0

        # Third chunk: exiting code block
        text3 = ")</code> more text"
        current_tag, current_index = parse_start_end_tags(
            text3, tags, current_tag, current_index
        )
        # current_tag = None - exited code block
        # current_index = 0
    """
    pass

Pattern Replacement

{ .api }
from pipecat.utils.string import replace_match

def replace_match(text: str, match: re.Match, old: str, new: str) -> str:
    """Replace occurrences of a substring within a matched section of text.

    Performs replacements only within the boundaries of a regex match,
    leaving text outside the match unchanged.

    Args:
        text: The input text in which replacements will be made
        match: A regex match object representing the section to modify
        old: The substring to replace within the matched section
        new: The replacement substring

    Returns:
        The text with replacements applied within the matched section

    Example:
        import re

        text = "Before [remove this remove] after"
        pattern = r"\\[.*?\\]"
        match = re.search(pattern, text)

        if match:
            result = replace_match(text, match, "remove", "keep")
            # Returns: "Before [keep this keep] after"
    """
    pass

Usage Patterns

Time Tracking

{ .api }
from pipecat.utils.time import (
    time_now_iso8601,
    seconds_to_nanoseconds,
    nanoseconds_to_seconds
)

# Timestamp logging
def log_event(event: str):
    timestamp = time_now_iso8601()
    print(f"[{timestamp}] {event}")

log_event("Pipeline started")
# Outputs: [2024-01-15T14:30:45.123+00:00] Pipeline started

# Duration measurement
start_ns = seconds_to_nanoseconds(time.time())
await process_data()
end_ns = seconds_to_nanoseconds(time.time())

duration_ns = end_ns - start_ns
duration_sec = nanoseconds_to_seconds(duration_ns)
print(f"Processing took {duration_sec:.3f} seconds")

# Human-readable timing
from pipecat.utils.time import nanoseconds_to_str

elapsed_ns = 3661500000000  # 1 hour, 1 minute, 1.5 seconds
time_str = nanoseconds_to_str(elapsed_ns)
print(f"Elapsed: {time_str}")  # "1:01:01.500000"

Object Identification

{ .api }
from pipecat.utils.utils import obj_id, obj_count

# Unique processor IDs
class FrameProcessor:
    def __init__(self, name: str):
        self.id = obj_id()
        self.name = name
        self.instance_count = obj_count(self)

    def __str__(self):
        return f"{self.name}_{self.instance_count} (id={self.id})"

# Create processors
p1 = FrameProcessor("TTS")      # TTS_0 (id=0)
p2 = FrameProcessor("STT")      # STT_0 (id=1)
p3 = FrameProcessor("TTS")      # TTS_1 (id=2)

# Track by class
class MetricsCollector:
    def __init__(self):
        self.processor_counts = {}

    def register(self, processor: FrameProcessor):
        class_name = processor.__class__.__name__
        count = obj_count(processor)
        self.processor_counts[class_name] = count + 1

    def report(self):
        for cls, count in self.processor_counts.items():
            print(f"{cls}: {count} instances")

Sentence Boundary Detection

{ .api }
from pipecat.utils.string import match_endofsentence

# Split text into sentences
def split_sentences(text: str) -> list[str]:
    """Split text into individual sentences."""
    sentences = []
    remaining = text

    while remaining:
        # Find next sentence boundary
        index = match_endofsentence(remaining)

        if index is None:
            # No more complete sentences
            if remaining.strip():
                sentences.append(remaining.strip())
            break

        # Extract sentence
        sentence = remaining[:index].strip()
        if sentence:
            sentences.append(sentence)

        # Continue with remaining text
        remaining = remaining[index:]

    return sentences

# Usage
text = "Hello world. How are you? I'm fine thanks!"
sentences = split_sentences(text)
# Returns: ["Hello world.", "How are you?", "I'm fine thanks!"]

# Handles complex cases
text = "Dr. Smith paid $29.99. Great deal!"
sentences = split_sentences(text)
# Returns: ["Dr. Smith paid $29.99.", "Great deal!"]

Tag-Based Text Processing

{ .api }
from pipecat.utils.string import parse_start_end_tags, StartEndTags

# Track tags in streaming text
class TagTracker:
    def __init__(self, tags: list[StartEndTags]):
        self.tags = tags
        self.current_tag = None
        self.current_index = 0
        self.inside_tag = False

    async def process_chunk(self, chunk: str) -> str:
        """Process text chunk and track tag state."""
        # Update tag state
        self.current_tag, self.current_index = parse_start_end_tags(
            chunk,
            self.tags,
            self.current_tag,
            self.current_index
        )

        # Check if inside tag
        self.inside_tag = self.current_tag is not None

        if self.inside_tag:
            print(f"Inside tag: {self.current_tag[0]}")
            # Handle tagged content
            return ""  # Skip content in tags
        else:
            # Process normal content
            return chunk

# Usage
tracker = TagTracker([
    ("<code>", "</code>"),
    ("<thinking>", "</thinking>")
])

# Process streaming text
chunks = [
    "Normal text ",
    "<code>print('hi')",
    ")</code>",
    " more text"
]

for chunk in chunks:
    processed = await tracker.process_chunk(chunk)
    if processed:
        await send_to_tts(processed)

Combining Utilities for Metrics

{ .api }
from pipecat.utils.time import time_now_iso8601, nanoseconds_to_seconds
from pipecat.utils.utils import obj_id, obj_count
import time

class ProcessorMetrics:
    """Track processor performance metrics."""

    def __init__(self, processor):
        self.processor_id = obj_id()
        self.processor_class = processor.__class__.__name__
        self.instance_num = obj_count(processor)
        self.start_time = None
        self.frame_count = 0
        self.total_processing_ns = 0

    def start_frame(self):
        """Mark the start of frame processing."""
        self.start_time = time.time_ns()

    def end_frame(self):
        """Mark the end of frame processing."""
        if self.start_time:
            duration_ns = time.time_ns() - self.start_time
            self.total_processing_ns += duration_ns
            self.frame_count += 1
            self.start_time = None

    def report(self) -> dict:
        """Generate metrics report."""
        avg_ns = (
            self.total_processing_ns / self.frame_count
            if self.frame_count > 0
            else 0
        )

        return {
            "processor_id": self.processor_id,
            "processor_class": self.processor_class,
            "instance": self.instance_num,
            "frames_processed": self.frame_count,
            "total_time_sec": nanoseconds_to_seconds(self.total_processing_ns),
            "avg_time_sec": nanoseconds_to_seconds(avg_ns),
            "timestamp": time_now_iso8601()
        }

# Usage
class MyProcessor:
    def __init__(self):
        self.metrics = ProcessorMetrics(self)

    async def process_frame(self, frame):
        self.metrics.start_frame()
        try:
            await self.do_processing(frame)
        finally:
            self.metrics.end_frame()

    def get_metrics(self):
        return self.metrics.report()

# Create processors
p1 = MyProcessor()
p2 = MyProcessor()

# Process frames
for frame in frames:
    await p1.process_frame(frame)

# Get metrics
print(p1.get_metrics())
# {
#   "processor_id": 0,
#   "processor_class": "MyProcessor",
#   "instance": 0,
#   "frames_processed": 100,
#   "total_time_sec": 5.234,
#   "avg_time_sec": 0.05234,
#   "timestamp": "2024-01-15T14:30:45.123+00:00"
# }

Streaming Text with Sentence Detection

{ .api }
from pipecat.utils.string import match_endofsentence

class StreamingSentenceBuffer:
    """Buffer for accumulating and yielding complete sentences from streaming text."""

    def __init__(self):
        self.buffer = ""

    async def add_text(self, text: str) -> list[str]:
        """Add text to buffer and return any complete sentences.

        Args:
            text: New text chunk to add

        Returns:
            List of complete sentences found
        """
        self.buffer += text
        sentences = []

        # Extract all complete sentences
        while True:
            index = match_endofsentence(self.buffer)
            if index is None:
                break

            # Found a sentence
            sentence = self.buffer[:index].strip()
            if sentence:
                sentences.append(sentence)

            # Remove from buffer
            self.buffer = self.buffer[index:]

        return sentences

    async def flush(self) -> Optional[str]:
        """Flush remaining buffer contents.

        Returns:
            Any remaining text in buffer
        """
        if self.buffer.strip():
            text = self.buffer.strip()
            self.buffer = ""
            return text
        return None

# Usage
buffer = StreamingSentenceBuffer()

# Process streaming LLM output
async for chunk in llm_stream:
    sentences = await buffer.add_text(chunk)
    for sentence in sentences:
        # Send complete sentences to TTS immediately
        await tts.speak(sentence)

# End of stream - flush remaining
remaining = await buffer.flush()
if remaining:
    await tts.speak(remaining)

Best Practices

Use ISO8601 for Timestamps

{ .api }
from pipecat.utils.time import time_now_iso8601

# Good: ISO8601 format for logs and metrics
timestamp = time_now_iso8601()
log_entry = {"event": "started", "timestamp": timestamp}
# {"event": "started", "timestamp": "2024-01-15T14:30:45.123+00:00"}

# Good: Consistent across systems and timezones
# Bad: Using local time without timezone
import datetime
timestamp = datetime.datetime.now().isoformat()  # Missing timezone info!

Use Nanoseconds for Precision Timing

{ .api }
from pipecat.utils.time import seconds_to_nanoseconds, nanoseconds_to_seconds

# Good: Use nanoseconds for high-precision timing
start_ns = time.time_ns()
await process()
end_ns = time.time_ns()
duration_ns = end_ns - start_ns

# Good: Convert to seconds for display
duration_sec = nanoseconds_to_seconds(duration_ns)
print(f"Processing took {duration_sec:.6f} seconds")

# Bad: Using float seconds (loses precision)
start = time.time()
await process()
duration = time.time() - start  # Less precise for short operations

Use obj_count for Instance Tracking

{ .api }
from pipecat.utils.utils import obj_count

# Good: Per-class instance counting
class Processor:
    def __init__(self, name: str):
        self.name = name
        self.instance = obj_count(self)

    def __str__(self):
        return f"{self.name}_{self.instance}"

# Creates clear, unique identifiers per class
p1 = Processor("TTS")  # TTS_0
p2 = Processor("TTS")  # TTS_1

# Bad: Manual counting (error-prone)
class Processor:
    _count = 0
    def __init__(self):
        Processor._count += 1  # Not thread-safe!
        self.instance = Processor._count

Always Flush Buffers

{ .api }
from pipecat.utils.string import match_endofsentence

# Good: Always flush remaining content
async def process_stream(chunks):
    buffer = ""

    for chunk in chunks:
        buffer += chunk
        # Process complete sentences
        while True:
            index = match_endofsentence(buffer)
            if index is None:
                break
            sentence = buffer[:index]
            await process(sentence)
            buffer = buffer[index:]

    # Critical: Flush remaining buffer
    if buffer.strip():
        await process(buffer.strip())

# Bad: Forgetting to flush (loses last partial sentence)
# Don't forget this step!

Use Type-Safe Tag Tracking

{ .api }
from pipecat.utils.string import StartEndTags, parse_start_end_tags
from typing import Sequence

# Good: Use type hints and constants
SUPPORTED_TAGS: Sequence[StartEndTags] = [
    ("<code>", "</code>"),
    ("<thinking>", "</thinking>"),
    ("<math>", "</math>")
]

def track_tags(text: str, tags: Sequence[StartEndTags]) -> bool:
    current_tag = None
    current_index = 0
    current_tag, current_index = parse_start_end_tags(
        text, tags, current_tag, current_index
    )
    return current_tag is not None

# Bad: Using string tuples without type hints
tags = [("<code>", "</code>")]  # What type is this?
# Missing type safety and IDE support