CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aws-xray-sdk

The AWS X-Ray SDK for Python enables Python developers to record and emit information from within their applications to the AWS X-Ray service for distributed tracing.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

core-recording.mddocs/

Core Recording

Primary tracing functionality for creating and managing X-Ray segments and subsegments. Provides both synchronous and asynchronous recording capabilities through context managers, decorators, and manual management methods.

Capabilities

Global Recorder Instance

The SDK provides a global recorder instance that serves as the primary interface for X-Ray tracing operations.

xray_recorder: AsyncAWSXRayRecorder

The global xray_recorder is an instance of AsyncAWSXRayRecorder that provides both synchronous and asynchronous tracing capabilities.

Recorder Configuration

Configure the X-Ray recorder with sampling settings, plugins, context behavior, and daemon connection details.

def configure(
    sampling: bool = None,
    plugins: tuple = None,
    context_missing: str = None,
    sampling_rules: str = None,
    daemon_address: str = None,
    service: str = None,
    context: object = None,
    emitter: object = None,
    streaming: bool = None,
    dynamic_naming: str = None,
    streaming_threshold: int = None,
    max_trace_back: int = None,
    sampler: object = None,
    stream_sql: bool = None
) -> None:
    """
    Configure the X-Ray recorder.

    Args:
        sampling (bool): Enable/disable sampling
        plugins (tuple): Plugin names to load ('EC2Plugin', 'ECSPlugin', 'ElasticBeanstalkPlugin')
        context_missing (str): Behavior when no active segment ('LOG_ERROR', 'RUNTIME_ERROR', 'IGNORE_ERROR')
        sampling_rules (str): Path to sampling rules JSON file
        daemon_address (str): X-Ray daemon address (default: '127.0.0.1:2000')
        service (str): Service name override
        context (object): Context implementation for trace storage
        emitter (object): Custom emitter for sending traces
        streaming (bool): Enable subsegment streaming
        dynamic_naming (str): Pattern for dynamic segment naming
        streaming_threshold (int): Subsegment count threshold for streaming
        max_trace_back (int): Maximum stack trace depth
        sampler (object): Custom sampler implementation
        stream_sql (bool): Enable SQL query streaming
    """

Context Managers

Segment Context Managers

Create and manage segments using context managers for automatic lifecycle management.

def in_segment(name: str, **segment_kwargs) -> SegmentContextManager:
    """
    Context manager for creating and managing a segment.

    Args:
        name (str): Segment name
        **segment_kwargs: Additional segment parameters

    Returns:
        SegmentContextManager: Context manager yielding the segment

    Usage:
        with xray_recorder.in_segment('my-segment') as segment:
            # Traced code here
    """

def in_segment_async(name: str, **segment_kwargs) -> AsyncSegmentContextManager:
    """
    Async context manager for creating and managing a segment.

    Args:
        name (str): Segment name
        **segment_kwargs: Additional segment parameters

    Returns:
        AsyncSegmentContextManager: Async context manager yielding the segment

    Usage:
        async with xray_recorder.in_segment_async('my-segment') as segment:
            # Traced async code here
    """

Subsegment Context Managers

Create and manage subsegments within segments for detailed tracing of operations.

def in_subsegment(name: str, **subsegment_kwargs) -> SubsegmentContextManager:
    """
    Context manager for creating and managing a subsegment.

    Args:
        name (str): Subsegment name
        **subsegment_kwargs: Additional subsegment parameters

    Returns:
        SubsegmentContextManager: Context manager yielding the subsegment

    Usage:
        with xray_recorder.in_subsegment('database-query') as subsegment:
            # Database operation here
    """

def in_subsegment_async(name: str, **subsegment_kwargs) -> AsyncSubsegmentContextManager:
    """
    Async context manager for creating and managing a subsegment.

    Args:
        name (str): Subsegment name
        **subsegment_kwargs: Additional subsegment parameters

    Returns:
        AsyncSubsegmentContextManager: Async context manager yielding the subsegment

    Usage:
        async with xray_recorder.in_subsegment_async('async-operation') as subsegment:
            # Async operation here
    """

Decorators

Function Tracing Decorators

Automatically trace function execution using decorators.

def capture(name: str) -> Callable:
    """
    Decorator for automatic function tracing in a subsegment.

    Args:
        name (str): Subsegment name for the traced function

    Returns:
        Callable: Decorated function

    Usage:
        @xray_recorder.capture('process-data')
        def process_data(data):
            return processed_data
    """

def capture_async(name: str) -> Callable:
    """
    Decorator for automatic async function tracing in a subsegment.

    Args:
        name (str): Subsegment name for the traced async function

    Returns:
        Callable: Decorated async function

    Usage:
        @xray_recorder.capture_async('async-process')
        async def async_process_data(data):
            return await process_async(data)
    """

Manual Segment Management

Segment Control

Manually create and manage segments for fine-grained control over trace lifecycle.

def begin_segment(
    name: str,
    traceid: str = None,
    parent_id: str = None,
    sampling: int = None
) -> Segment:
    """
    Begin a new segment.

    Args:
        name (str): Segment name
        traceid (str): Optional trace ID (auto-generated if not provided)
        parent_id (str): Optional parent segment ID
        sampling (int): Sampling decision (1=sampled, 0=not sampled)

    Returns:
        Segment: The created segment
    """

def end_segment(end_time: float = None) -> None:
    """
    End the current segment.

    Args:
        end_time (float): Optional end timestamp (defaults to current time)
    """

def current_segment() -> Segment:
    """
    Get the current active segment.

    Returns:
        Segment: Current active segment

    Raises:
        SegmentNotFoundException: If no segment is active
    """

Subsegment Control

Manually create and manage subsegments within segments.

def begin_subsegment(name: str, namespace: str = None) -> Subsegment:
    """
    Begin a new subsegment under the current segment.

    Args:
        name (str): Subsegment name
        namespace (str): Optional namespace for categorization

    Returns:
        Subsegment: The created subsegment
    """

def begin_subsegment_without_sampling(name: str) -> Subsegment:
    """
    Begin a new unsampled subsegment under the current segment.

    Args:
        name (str): Subsegment name

    Returns:
        Subsegment: The created unsampled subsegment
    """

def end_subsegment(end_time: float = None) -> None:
    """
    End the current subsegment.

    Args:
        end_time (float): Optional end timestamp (defaults to current time)
    """

def current_subsegment() -> Subsegment:
    """
    Get the current active subsegment.

    Returns:
        Subsegment: Current active subsegment

    Raises:
        SegmentNotFoundException: If no subsegment is active
    """

Trace Entity Management

Control the current trace context and entity hierarchy.

def get_trace_entity() -> Union[Segment, Subsegment]:
    """
    Get the current active trace entity (segment or subsegment).

    Returns:
        Union[Segment, Subsegment]: Current active trace entity
    """

def set_trace_entity(trace_entity: Union[Segment, Subsegment]) -> None:
    """
    Set the current active trace entity.

    Args:
        trace_entity (Union[Segment, Subsegment]): Trace entity to set as active
    """

def clear_trace_entities() -> None:
    """
    Clear all trace entities from the current context.
    Useful for preventing thread pollution in multi-threaded applications.
    """

Streaming Control

Manage subsegment streaming for large traces.

def stream_subsegments() -> None:
    """
    Stream closed subsegments to reduce memory usage.
    Automatically called when streaming is enabled and thresholds are met.
    """

Advanced Recording

Low-level recording functionality for custom integrations.

def record_subsegment(
    wrapped: Callable,
    instance: object,
    args: tuple,
    kwargs: dict,
    name: str,
    namespace: str = None,
    meta_processor: Callable = None
) -> Any:
    """
    Record function execution in a subsegment with metadata processing.

    Args:
        wrapped (Callable): Function to wrap and trace
        instance (object): Instance object for method calls
        args (tuple): Function arguments
        kwargs (dict): Function keyword arguments
        name (str): Subsegment name
        namespace (str): Optional namespace
        meta_processor (Callable): Optional metadata processor function

    Returns:
        Any: Function result
    """

def record_subsegment_async(
    wrapped: Callable,
    instance: object,
    args: tuple,
    kwargs: dict,
    name: str,
    namespace: str = None,
    meta_processor: Callable = None
) -> Any:
    """
    Record async function execution in a subsegment with metadata processing.

    Args:
        wrapped (Callable): Async function to wrap and trace
        instance (object): Instance object for method calls
        args (tuple): Function arguments
        kwargs (dict): Function keyword arguments
        name (str): Subsegment name
        namespace (str): Optional namespace
        meta_processor (Callable): Optional metadata processor function

    Returns:
        Any: Function result
    """

Usage Examples

Basic Segment and Subsegment Usage

from aws_xray_sdk.core import xray_recorder

# Configure the recorder
xray_recorder.configure(
    sampling=False,
    context_missing='LOG_ERROR'
)

# Using context managers
with xray_recorder.in_segment('web-request') as segment:
    segment.put_annotation('user_id', '12345')
    segment.put_metadata('request_info', {'method': 'GET', 'path': '/api/data'})
    
    with xray_recorder.in_subsegment('database-query') as subsegment:
        subsegment.put_annotation('table', 'users')
        # Database query here
        
    with xray_recorder.in_subsegment('external-api-call') as subsegment:
        subsegment.put_annotation('service', 'payment-api')
        # External API call here

Async Usage

import asyncio
from aws_xray_sdk.core import xray_recorder

async def async_operation():
    async with xray_recorder.in_segment_async('async-request') as segment:
        segment.put_annotation('operation', 'data-processing')
        
        async with xray_recorder.in_subsegment_async('async-db-query') as subsegment:
            # Async database operation
            await asyncio.sleep(0.1)  # Simulate async work
            
        async with xray_recorder.in_subsegment_async('async-computation') as subsegment:
            # Async computation
            await asyncio.sleep(0.2)  # Simulate async work

Decorator Usage

from aws_xray_sdk.core import xray_recorder

@xray_recorder.capture('data-processing')
def process_user_data(user_id, data):
    # Add annotations within the traced function
    xray_recorder.put_annotation('user_id', user_id)
    xray_recorder.put_metadata('data_size', len(data))
    
    # Process data
    return processed_data

@xray_recorder.capture_async('async-data-processing') 
async def async_process_user_data(user_id, data):
    # Async processing with automatic tracing
    xray_recorder.put_annotation('user_id', user_id)
    result = await process_async(data)
    return result

Manual Management

from aws_xray_sdk.core import xray_recorder

# Manual segment management
segment = xray_recorder.begin_segment('manual-segment')
segment.put_annotation('type', 'manual')

try:
    # Start subsegment
    subsegment = xray_recorder.begin_subsegment('manual-subsegment')
    subsegment.put_metadata('operation', 'manual-process')
    
    # Your traced code here
    result = perform_operation()
    
except Exception as e:
    # Exception will be automatically captured
    raise
finally:
    # Clean up
    xray_recorder.end_subsegment()
    xray_recorder.end_segment()

Thread Management

import concurrent.futures
from aws_xray_sdk.core import xray_recorder

def worker_function(data, trace_entity):
    # Set the trace entity for the worker thread
    xray_recorder.set_trace_entity(trace_entity)
    
    try:
        with xray_recorder.in_subsegment('worker-operation') as subsegment:
            # Process data in worker thread
            result = process_data(data)
            return result
    finally:
        # Prevent thread pollution
        xray_recorder.clear_trace_entities()

# Main thread
with xray_recorder.in_segment('threaded-operation') as segment:
    current_entity = xray_recorder.get_trace_entity()
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(worker_function, data, current_entity)
            for data in data_list
        ]
        
        results = [future.result() for future in futures]

Install with Tessl CLI

npx tessl i tessl/pypi-aws-xray-sdk

docs

annotations-metadata.md

aws-services.md

configuration-plugins.md

core-recording.md

database-integration.md

http-utilities.md

index.md

library-patching.md

sampling.md

web-frameworks.md

tile.json