CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-langfuse

Comprehensive Python SDK for AI application observability and experimentation with OpenTelemetry-based tracing, automatic instrumentation, and dataset management.

Overview
Eval results
Files

advanced.mddocs/

Advanced Features

Support for media uploads, data masking, multi-project setups, and advanced configuration options for complex production environments and specialized use cases.

Capabilities

Media Handling

Support for uploading and managing media files with traces and observations.

class LangfuseMedia:
    def __init__(self, *, obj: object = None, base64_data_uri: str = None,
                 content_type: MediaContentType = None, content_bytes: bytes = None,
                 file_path: str = None):
        """Initialize media object for Langfuse.

        Args:
            obj: Source object to wrap (images, files, etc.)
            base64_data_uri: Base64 encoded data URI
            content_type: MIME type of the media content
            content_bytes: Raw bytes content
            file_path: Path to file to upload

        Note:
            Provide one of: obj, base64_data_uri, content_bytes + content_type, or file_path
        """

    @staticmethod
    def parse_reference_string(reference: str) -> ParsedMediaReference:
        """Parse media reference string into components.

        Args:
            reference: Media reference string from Langfuse

        Returns:
            ParsedMediaReference with parsed components
        """

    @staticmethod
    def resolve_media_references(data: Any) -> Any:
        """Replace media references in data with actual content.

        Args:
            data: Data structure potentially containing media references

        Returns:
            Data with media references resolved to content
        """

Trace and Context Management

Advanced utilities for working with trace context and IDs.

class Langfuse:
    def create_trace_id(self) -> str:
        """Generate a unique trace ID.

        Returns:
            New trace ID string
        """

    def get_current_trace_id(self) -> str:
        """Get current trace ID from execution context.

        Returns:
            Current trace ID or None if no active trace

        Raises:
            Exception: If no current trace found
        """

    def get_current_observation_id(self) -> str:
        """Get current observation ID from execution context.

        Returns:
            Current observation ID or None if no active observation

        Raises:
            Exception: If no current observation found
        """

    def get_trace_url(self, trace_id: str) -> str:
        """Get URL to view trace in Langfuse UI.

        Args:
            trace_id: Trace ID to generate URL for

        Returns:
            URL string for viewing trace in Langfuse dashboard
        """

Authentication and Health Checks

Methods for validating client configuration and connectivity.

class Langfuse:
    def auth_check(self) -> bool:
        """Verify API authentication credentials.

        Returns:
            True if authentication is valid, False otherwise

        Raises:
            Exception: If unable to perform authentication check
        """

Multi-Project Support

Support for managing multiple Langfuse projects within a single application.

def get_client(*, public_key: str = None) -> Langfuse:
    """Get or create Langfuse client for multi-project setups.

    Args:
        public_key: Project identifier for specific project

    Returns:
        Langfuse client instance for the specified project

    Note:
        - Without public_key: Returns single client or disabled client if multiple exist
        - With public_key: Returns client for that specific project
        - Multi-project support is experimental
    """

# Special parameters for observe decorator in multi-project setups
def decorated_function(data, langfuse_public_key=None):
    """Functions decorated with @observe accept langfuse_public_key parameter."""
    pass

Data Masking and Privacy

Protocol and utilities for implementing data masking in traces.

# Masking function protocol
class MaskFunction(Protocol):
    def __call__(self, *, data: Any) -> Any:
        """Transform data to mask sensitive information.

        Args:
            data: Original data to mask

        Returns:
            Masked version of the data
        """

class Langfuse:
    def __init__(self, *, mask: MaskFunction = None, **kwargs):
        """Initialize Langfuse client with optional data masking.

        Args:
            mask: Function to apply data masking to all trace data
            **kwargs: Other initialization parameters
        """

Advanced Configuration Options

Extended configuration for production and specialized environments.

class Langfuse:
    def __init__(self, *, public_key: str = None, secret_key: str = None,
                 host: str = "https://cloud.langfuse.com", tracing_enabled: bool = True,
                 environment: str = None, timeout: int = 60, flush_at: int = 15,
                 flush_interval: float = 0.5, release: str = None,
                 sample_rate: float = 1.0, mask: MaskFunction = None,
                 additional_headers: Dict[str, str] = None,
                 blocked_instrumentation_scopes: List[str] = None,
                 media_upload_thread_count: int = 3):
        """Initialize Langfuse client with advanced configuration.

        Args:
            public_key: Project public key
            secret_key: Project secret key
            host: Langfuse server URL
            tracing_enabled: Global tracing enable/disable
            environment: Environment tag for all traces
            timeout: Request timeout in seconds
            flush_at: Number of events to batch before flushing
            flush_interval: Time interval between flushes in seconds
            release: Release/version identifier
            sample_rate: Sampling rate (0.0 to 1.0) for traces
            mask: Data masking function
            additional_headers: Additional HTTP headers for requests
            blocked_instrumentation_scopes: OpenTelemetry scopes to block
            media_upload_thread_count: Number of threads for media uploads
        """

Type Definitions for Advanced Features

Supporting types for advanced functionality.

# Media content types
MediaContentType = str  # MIME type strings

# Parsed media reference structure
ParsedMediaReference = TypedDict('ParsedMediaReference', {
    'type': str,
    'id': str,
    'url': str
})

# Trace context structure
TraceContext = TypedDict('TraceContext', {
    'trace_id': str,
    'parent_span_id': Optional[str]
})

# Map value type for flexible attributes
MapValue = Union[str, int, float, bool, List[Any], Dict[str, Any]]

Usage Examples

Media Upload and Handling

from langfuse import Langfuse, LangfuseMedia
import base64
from PIL import Image

langfuse = Langfuse()

# Upload image file
image_media = LangfuseMedia(file_path="path/to/image.jpg")

with langfuse.start_as_current_span(name="image-analysis") as span:
    # Include media in trace
    span.update(
        input={"image": image_media, "prompt": "Analyze this image"},
        output={"description": "A beautiful landscape with mountains"}
    )

# Upload from bytes
with open("image.jpg", "rb") as f:
    image_bytes = f.read()

bytes_media = LangfuseMedia(
    content_bytes=image_bytes,
    content_type="image/jpeg"
)

# Upload from base64 data URI
def image_to_base64_uri(image_path):
    with open(image_path, "rb") as f:
        encoded = base64.b64encode(f.read()).decode()
        return f"data:image/jpeg;base64,{encoded}"

uri_media = LangfuseMedia(base64_data_uri=image_to_base64_uri("image.jpg"))

# Upload Python object (e.g., matplotlib figure)
import matplotlib.pyplot as plt

fig, ax = plt.subplots()
ax.plot([1, 2, 3, 4], [1, 4, 2, 3])

figure_media = LangfuseMedia(obj=fig)

span.update(input={"plot": figure_media})

Data Masking Implementation

import re
from typing import Any

class DataMasker:
    """Custom data masking implementation."""

    def __init__(self):
        # Common sensitive patterns
        self.patterns = {
            'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
            'phone': re.compile(r'\b\d{3}-\d{3}-\d{4}\b'),
            'ssn': re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
            'credit_card': re.compile(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b')
        }

    def __call__(self, *, data: Any) -> Any:
        """Mask sensitive data according to privacy requirements."""
        return self._mask_recursive(data)

    def _mask_recursive(self, obj):
        """Recursively mask data in nested structures."""
        if isinstance(obj, str):
            return self._mask_string(obj)
        elif isinstance(obj, dict):
            return {k: self._mask_recursive(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [self._mask_recursive(item) for item in obj]
        else:
            return obj

    def _mask_string(self, text: str) -> str:
        """Apply masking patterns to string content."""
        masked = text
        for pattern_name, pattern in self.patterns.items():
            masked = pattern.sub(f'[MASKED_{pattern_name.upper()}]', masked)
        return masked

# Initialize Langfuse with data masking
masker = DataMasker()
langfuse = Langfuse(mask=masker)

# All trace data automatically masked
with langfuse.start_as_current_span(name="process-user-data") as span:
    user_input = "My email is john@example.com and phone is 555-123-4567"
    span.update(input=user_input)
    # Stored as: "My email is [MASKED_EMAIL] and phone is [MASKED_PHONE]"

Multi-Project Configuration

# Initialize multiple clients for different projects
project_a = Langfuse(
    public_key="project-a-key",
    secret_key="project-a-secret",
    environment="production"
)

project_b = Langfuse(
    public_key="project-b-key",
    secret_key="project-b-secret",
    environment="staging"
)

# Use specific clients
with project_a.start_as_current_span(name="project-a-task") as span:
    result_a = process_for_project_a()
    span.update(output=result_a)

# Use get_client for dynamic project selection
@observe  # Uses default client resolution
def shared_function(data, langfuse_public_key=None):
    # Function uses client associated with langfuse_public_key
    return process_data(data)

# Call with specific project
result = shared_function(data, langfuse_public_key="project-a-key")

Advanced Client Configuration

# Production configuration with all options
production_langfuse = Langfuse(
    public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
    secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
    host="https://your-langfuse-instance.com",
    environment="production",
    release="v1.2.3",
    sample_rate=0.1,  # Sample 10% of traces
    flush_at=50,  # Batch 50 events before flushing
    flush_interval=2.0,  # Flush every 2 seconds
    timeout=30,  # 30 second timeout
    media_upload_thread_count=5,  # 5 threads for media uploads
    additional_headers={
        "X-Custom-Header": "custom-value",
        "Authorization-Proxy": "proxy-token"
    },
    blocked_instrumentation_scopes=[
        "httpx",  # Block httpx instrumentation
        "requests"  # Block requests instrumentation
    ]
)

Trace Context Management

# Manual trace context management
trace_id = langfuse.create_trace_id()
print(f"Created trace: {trace_id}")

# Use explicit trace context
with langfuse.start_as_current_span(name="main-process", trace_id=trace_id) as main_span:
    # Get current context
    current_trace = langfuse.get_current_trace_id()
    current_observation = langfuse.get_current_observation_id()

    print(f"Current trace: {current_trace}")
    print(f"Current observation: {current_observation}")

    # Create child with explicit parent
    child_span = main_span.start_observation(
        name="child-process",
        as_type="span"
    )

    try:
        result = child_process()
        child_span.update(output=result)
    finally:
        child_span.end()

# Generate UI link
trace_url = langfuse.get_trace_url(trace_id)
print(f"View trace: {trace_url}")

Sampling and Performance Optimization

class SmartSampler:
    """Custom sampling logic for different scenarios."""

    def __init__(self, base_rate=0.1):
        self.base_rate = base_rate
        self.error_rate = 1.0  # Always sample errors
        self.slow_request_rate = 0.5  # Sample 50% of slow requests

    def should_sample(self, context):
        """Determine if request should be sampled."""
        import random

        # Always sample errors
        if context.get("has_error"):
            return True

        # Higher sampling for slow requests
        if context.get("execution_time", 0) > 5.0:
            return random.random() < self.slow_request_rate

        # Base sampling rate
        return random.random() < self.base_rate

# Implementation with conditional tracing
sampler = SmartSampler()

def traced_function(input_data):
    start_time = time.time()
    has_error = False
    result = None

    try:
        result = process_data(input_data)
    except Exception as e:
        has_error = True
        raise
    finally:
        execution_time = time.time() - start_time

        # Determine if we should trace this execution
        context = {
            "has_error": has_error,
            "execution_time": execution_time
        }

        if sampler.should_sample(context):
            # Create trace retrospectively if needed
            with langfuse.start_as_current_span(name="sampled-function") as span:
                span.update(
                    input=input_data,
                    output=result,
                    metadata={
                        "execution_time": execution_time,
                        "sampled": True
                    }
                )

                if has_error:
                    span.update(level="ERROR")

    return result

Health Monitoring and Diagnostics

class LangfuseHealthMonitor:
    """Monitor Langfuse client health and connectivity."""

    def __init__(self, langfuse_client):
        self.client = langfuse_client

    def run_diagnostics(self):
        """Run comprehensive health checks."""
        diagnostics = {
            "timestamp": datetime.now().isoformat(),
            "checks": {}
        }

        # Authentication check
        try:
            auth_ok = self.client.auth_check()
            diagnostics["checks"]["authentication"] = {
                "status": "pass" if auth_ok else "fail",
                "message": "Authentication successful" if auth_ok else "Authentication failed"
            }
        except Exception as e:
            diagnostics["checks"]["authentication"] = {
                "status": "error",
                "message": f"Auth check failed: {str(e)}"
            }

        # Trace creation test
        try:
            trace_id = self.client.create_trace_id()
            diagnostics["checks"]["trace_creation"] = {
                "status": "pass",
                "message": f"Trace ID generated: {trace_id[:8]}..."
            }
        except Exception as e:
            diagnostics["checks"]["trace_creation"] = {
                "status": "error",
                "message": f"Trace creation failed: {str(e)}"
            }

        # Span creation test
        try:
            with self.client.start_as_current_span(name="health-check") as span:
                span.update(output="Health check successful")
            diagnostics["checks"]["span_creation"] = {
                "status": "pass",
                "message": "Span creation and management successful"
            }
        except Exception as e:
            diagnostics["checks"]["span_creation"] = {
                "status": "error",
                "message": f"Span creation failed: {str(e)}"
            }

        # Overall status
        all_passed = all(
            check["status"] == "pass"
            for check in diagnostics["checks"].values()
        )
        diagnostics["overall_status"] = "healthy" if all_passed else "unhealthy"

        return diagnostics

    def continuous_monitoring(self, interval=300):
        """Run continuous health monitoring."""
        while True:
            try:
                results = self.run_diagnostics()
                print(f"Health check: {results['overall_status']}")

                if results["overall_status"] != "healthy":
                    for check_name, check_result in results["checks"].items():
                        if check_result["status"] != "pass":
                            print(f"  {check_name}: {check_result['message']}")

            except KeyboardInterrupt:
                break
            except Exception as e:
                print(f"Health monitoring error: {e}")

            time.sleep(interval)

# Usage
monitor = LangfuseHealthMonitor(langfuse)
health_report = monitor.run_diagnostics()
print(health_report)

Environment-Specific Configuration

import os
from typing import Optional

class LangfuseFactory:
    """Factory for creating environment-appropriate Langfuse clients."""

    @classmethod
    def create_for_environment(cls, environment: Optional[str] = None) -> Langfuse:
        """Create Langfuse client configured for specific environment."""

        env = environment or os.getenv("ENVIRONMENT", "development")

        configs = {
            "development": {
                "tracing_enabled": True,
                "sample_rate": 1.0,
                "flush_at": 1,  # Immediate flushing for dev
                "timeout": 10
            },
            "staging": {
                "tracing_enabled": True,
                "sample_rate": 0.5,
                "flush_at": 10,
                "timeout": 20
            },
            "production": {
                "tracing_enabled": True,
                "sample_rate": 0.1,  # Lower sampling in prod
                "flush_at": 50,
                "timeout": 30,
                "additional_headers": {"X-Service": "ai-service"}
            },
            "test": {
                "tracing_enabled": False  # Disable for tests
            }
        }

        config = configs.get(env, configs["development"])

        return Langfuse(
            environment=env,
            **config
        )

# Usage
langfuse = LangfuseFactory.create_for_environment()

# Override for specific cases
production_client = LangfuseFactory.create_for_environment("production")

Install with Tessl CLI

npx tessl i tessl/pypi-langfuse

docs

advanced.md

core-tracing.md

datasets.md

experiments.md

index.md

integrations.md

observation-types.md

prompts.md

scoring.md

tile.json