CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bytewax

Python Stateful Stream Processing Framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

tracing.mddocs/

Tracing and Monitoring

Logging, tracing, and monitoring capabilities for observing dataflow execution. Supports integration with Jaeger, OpenTelemetry, and other observability platforms for production monitoring and debugging.

Capabilities

Tracing Configuration

Base classes and implementations for configuring where traces and logs are sent.

class TracingConfig:
    def __init__(self): ...

class JaegerConfig(TracingConfig):
    def __init__(self, service_name: str, endpoint: Optional[str] = None, sampling_ratio: float = 1.0): ...
    
    @property
    def service_name(self) -> str: ...
    
    @property
    def endpoint(self) -> Optional[str]: ...
    
    @property
    def sampling_ratio(self) -> float: ...

class OtlpTracingConfig(TracingConfig):
    def __init__(self, service_name: str, url: Optional[str] = None, sampling_ratio: float = 1.0): ...
    
    @property
    def service_name(self) -> str: ...
    
    @property
    def url(self) -> Optional[str]: ...
    
    @property
    def sampling_ratio(self) -> float: ...

JaegerConfig Parameters:

  • service_name (str): Identifies this dataflow in Jaeger
  • endpoint (str): Connection info, defaults to "127.0.0.1:6831" or uses environment variables
  • sampling_ratio (float): Fraction of traces to send between 0.0 and 1.0

OtlpTracingConfig Parameters:

  • service_name (str): Identifies this dataflow in OTLP
  • url (str): Connection info, defaults to "grpc://127.0.0.1:4317"
  • sampling_ratio (float): Fraction of traces to send between 0.0 and 1.0

Usage Examples:

from bytewax.tracing import JaegerConfig, OtlpTracingConfig

# Jaeger configuration
jaeger_config = JaegerConfig(
    service_name="my-dataflow",
    endpoint="jaeger-collector:6831",
    sampling_ratio=0.1  # Sample 10% of traces
)

# OpenTelemetry configuration (recommended)
otlp_config = OtlpTracingConfig(
    service_name="my-dataflow",
    url="grpc://otel-collector:4317",
    sampling_ratio=1.0  # Sample all traces
)

Tracing Setup

Function to initialize tracing and logging for the Bytewax runtime.

def setup_tracing(tracing_config: Optional[TracingConfig] = None, log_level: Optional[str] = None) -> BytewaxTracer: ...

class BytewaxTracer:
    """Utility class used to handle tracing."""
    ...

Parameters:

  • tracing_config (TracingConfig): Specific backend configuration (None for stdout logging)
  • log_level (str): Log level - "ERROR", "WARN", "INFO", "DEBUG", "TRACE" (default: "ERROR")

Returns:

  • BytewaxTracer: Tracer object that must be kept alive for tracing to work

Usage Examples:

from bytewax.tracing import setup_tracing, JaegerConfig

# Basic setup with default stdout logging
tracer = setup_tracing(log_level="INFO")

# Jaeger tracing setup
jaeger_config = JaegerConfig("my-dataflow")
tracer = setup_tracing(jaeger_config, log_level="DEBUG")

# Keep tracer alive during dataflow execution
cli_main(flow)

# OpenTelemetry setup
from bytewax.tracing import OtlpTracingConfig

otlp_config = OtlpTracingConfig("production-dataflow")
tracer = setup_tracing(otlp_config, log_level="WARN")
cli_main(flow)

Environment Variable Configuration

Jaeger configuration can be controlled through environment variables:

# Jaeger agent configuration
export OTEL_EXPORTER_JAEGER_AGENT_HOST="127.0.0.1"
export OTEL_EXPORTER_JAEGER_AGENT_PORT="6831"

# These take precedence over endpoint parameter

Production Monitoring Patterns

Basic Production Setup:

import os
from bytewax.tracing import setup_tracing, OtlpTracingConfig

def setup_production_tracing():
    """Set up tracing for production environment."""
    service_name = os.environ.get("SERVICE_NAME", "bytewax-dataflow")
    otel_endpoint = os.environ.get("OTEL_ENDPOINT", "grpc://localhost:4317")
    log_level = os.environ.get("LOG_LEVEL", "INFO")
    sampling_ratio = float(os.environ.get("TRACE_SAMPLING_RATIO", "0.1"))
    
    config = OtlpTracingConfig(
        service_name=service_name,
        url=otel_endpoint,
        sampling_ratio=sampling_ratio
    )
    
    return setup_tracing(config, log_level)

# Use in production
tracer = setup_production_tracing()
cli_main(flow)

Kubernetes Deployment with OpenTelemetry:

def setup_k8s_tracing():
    """Set up tracing for Kubernetes deployment."""
    # Service name from deployment metadata
    service_name = os.environ.get("K8S_SERVICE_NAME", "bytewax-app")
    namespace = os.environ.get("K8S_NAMESPACE", "default")
    pod_name = os.environ.get("HOSTNAME", "unknown-pod")
    
    # Full service identifier
    full_service_name = f"{service_name}.{namespace}"
    
    # OpenTelemetry collector endpoint (typically a service)
    otel_endpoint = os.environ.get("OTEL_COLLECTOR_ENDPOINT", "grpc://otel-collector:4317")
    
    config = OtlpTracingConfig(
        service_name=full_service_name,
        url=otel_endpoint,
        sampling_ratio=0.05  # 5% sampling in production
    )
    
    # Add pod name as resource attribute if possible
    tracer = setup_tracing(config, log_level="WARN")
    
    print(f"Tracing initialized for {full_service_name} on pod {pod_name}")
    return tracer

tracer = setup_k8s_tracing()

Development vs Production Configuration:

def setup_environment_tracing():
    """Configure tracing based on environment."""
    environment = os.environ.get("ENVIRONMENT", "development")
    
    if environment == "development":
        # Verbose logging for development
        return setup_tracing(log_level="DEBUG")
    
    elif environment == "staging":
        # Jaeger for staging environment
        jaeger_config = JaegerConfig(
            service_name=f"bytewax-staging",
            sampling_ratio=1.0  # Sample everything in staging
        )
        return setup_tracing(jaeger_config, log_level="INFO")
    
    elif environment == "production":
        # OpenTelemetry for production
        otlp_config = OtlpTracingConfig(
            service_name="bytewax-production",
            sampling_ratio=0.01  # 1% sampling in production
        )
        return setup_tracing(otlp_config, log_level="WARN")
    
    else:
        # Default to stdout logging
        return setup_tracing(log_level="ERROR")

tracer = setup_environment_tracing()

Custom Metrics Integration:

import time
import logging
from prometheus_client import Counter, Histogram, start_http_server

# Prometheus metrics
DATAFLOW_MESSAGES = Counter('bytewax_messages_total', 'Total messages processed', ['stage'])
DATAFLOW_LATENCY = Histogram('bytewax_processing_seconds', 'Processing latency', ['stage'])

class MetricsTracer:
    def __init__(self, bytewax_tracer):
        self.bytewax_tracer = bytewax_tracer
        self.logger = logging.getLogger("metrics")
    
    def record_message_processed(self, stage_name):
        """Record a message was processed at a stage."""
        DATAFLOW_MESSAGES.labels(stage=stage_name).inc()
    
    def record_processing_time(self, stage_name, duration_seconds):
        """Record processing time for a stage."""
        DATAFLOW_LATENCY.labels(stage=stage_name).observe(duration_seconds)
    
    def log_throughput(self, messages_per_second):
        """Log current throughput."""
        self.logger.info(f"Throughput: {messages_per_second:.1f} msg/sec")

def setup_metrics_and_tracing():
    """Set up both Bytewax tracing and custom metrics."""
    # Start Prometheus metrics server
    start_http_server(8000)
    
    # Set up Bytewax tracing
    otlp_config = OtlpTracingConfig("bytewax-with-metrics")
    bytewax_tracer = setup_tracing(otlp_config, log_level="INFO")
    
    # Combine with custom metrics
    return MetricsTracer(bytewax_tracer)

metrics_tracer = setup_metrics_and_tracing()

Distributed Tracing Context:

import uuid
from datetime import datetime

class DataflowContext:
    """Add correlation IDs for distributed tracing."""
    
    def __init__(self):
        self.trace_id = str(uuid.uuid4())
        self.start_time = datetime.now()
    
    def log_event(self, event_name, **kwargs):
        """Log event with trace context."""
        logging.info(f"[{self.trace_id}] {event_name}", extra={
            "trace_id": self.trace_id,
            "event": event_name,
            **kwargs
        })
    
    def create_child_context(self, span_name):
        """Create child context for sub-operations."""
        child = DataflowContext()
        child.parent_trace_id = self.trace_id
        child.span_name = span_name
        return child

# Use in dataflow
context = DataflowContext()
context.log_event("dataflow_started", flow_name="my-flow")

# In operators, pass context through
def traced_map_function(item):
    context.log_event("item_processed", item_id=item.get("id"))
    return process_item(item)

Health Check Integration:

from http.server import HTTPServer, BaseHTTPRequestHandler
import json

class HealthAndMetricsHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == "/health":
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            
            health_status = {
                "status": "healthy",
                "timestamp": datetime.now().isoformat(),
                "tracing_enabled": hasattr(self.server, 'tracer'),
                "version": "1.0.0"
            }
            
            self.wfile.write(json.dumps(health_status).encode())
        
        elif self.path == "/metrics":
            # Serve Prometheus metrics
            from prometheus_client import generate_latest
            self.send_response(200)
            self.send_header('Content-type', 'text/plain; version=0.0.4; charset=utf-8')
            self.end_headers()
            self.wfile.write(generate_latest())
        
        else:
            self.send_response(404)
            self.end_headers()

def start_monitoring_server(tracer):
    """Start HTTP server for health checks and metrics."""
    server = HTTPServer(("0.0.0.0", 8080), HealthAndMetricsHandler)
    server.tracer = tracer
    
    import threading
    server_thread = threading.Thread(target=server.serve_forever)
    server_thread.daemon = True
    server_thread.start()
    
    return server

# Complete production setup
tracer = setup_production_tracing()
monitoring_server = start_monitoring_server(tracer)

# Run dataflow
cli_main(flow)

Log Structured Output:

import json
import logging

class StructuredFormatter(logging.Formatter):
    """Format logs as structured JSON."""
    
    def format(self, record):
        log_entry = {
            "timestamp": self.formatTime(record),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno
        }
        
        # Add trace context if available
        if hasattr(record, 'trace_id'):
            log_entry["trace_id"] = record.trace_id
        
        # Add any extra fields
        for key, value in record.__dict__.items():
            if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname', 'filename',
                          'module', 'exc_info', 'exc_text', 'stack_info', 'lineno', 'funcName',
                          'created', 'msecs', 'relativeCreated', 'thread', 'threadName',
                          'processName', 'process', 'getMessage']:
                log_entry[key] = value
        
        return json.dumps(log_entry)

# Configure structured logging
def setup_structured_logging():
    logger = logging.getLogger()
    handler = logging.StreamHandler()
    handler.setFormatter(StructuredFormatter())
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)

setup_structured_logging()
tracer = setup_tracing(log_level="INFO")

Install with Tessl CLI

npx tessl i tessl/pypi-bytewax

docs

connectors.md

dataflow.md

index.md

operators.md

recovery.md

runtime.md

sinks.md

sources.md

stateful.md

testing.md

tracing.md

windowing.md

tile.json