Python Stateful Stream Processing Framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Logging, tracing, and monitoring capabilities for observing dataflow execution. Supports integration with Jaeger, OpenTelemetry, and other observability platforms for production monitoring and debugging.
Base classes and implementations for configuring where traces and logs are sent.
class TracingConfig:
def __init__(self): ...
class JaegerConfig(TracingConfig):
def __init__(self, service_name: str, endpoint: Optional[str] = None, sampling_ratio: float = 1.0): ...
@property
def service_name(self) -> str: ...
@property
def endpoint(self) -> Optional[str]: ...
@property
def sampling_ratio(self) -> float: ...
class OtlpTracingConfig(TracingConfig):
def __init__(self, service_name: str, url: Optional[str] = None, sampling_ratio: float = 1.0): ...
@property
def service_name(self) -> str: ...
@property
def url(self) -> Optional[str]: ...
@property
def sampling_ratio(self) -> float: ...JaegerConfig Parameters:
service_name (str): Identifies this dataflow in Jaegerendpoint (str): Connection info, defaults to "127.0.0.1:6831" or uses environment variablessampling_ratio (float): Fraction of traces to send between 0.0 and 1.0OtlpTracingConfig Parameters:
service_name (str): Identifies this dataflow in OTLPurl (str): Connection info, defaults to "grpc://127.0.0.1:4317"sampling_ratio (float): Fraction of traces to send between 0.0 and 1.0Usage Examples:
from bytewax.tracing import JaegerConfig, OtlpTracingConfig
# Jaeger configuration
jaeger_config = JaegerConfig(
service_name="my-dataflow",
endpoint="jaeger-collector:6831",
sampling_ratio=0.1 # Sample 10% of traces
)
# OpenTelemetry configuration (recommended)
otlp_config = OtlpTracingConfig(
service_name="my-dataflow",
url="grpc://otel-collector:4317",
sampling_ratio=1.0 # Sample all traces
)Function to initialize tracing and logging for the Bytewax runtime.
def setup_tracing(tracing_config: Optional[TracingConfig] = None, log_level: Optional[str] = None) -> BytewaxTracer: ...
class BytewaxTracer:
"""Utility class used to handle tracing."""
...Parameters:
tracing_config (TracingConfig): Specific backend configuration (None for stdout logging)log_level (str): Log level - "ERROR", "WARN", "INFO", "DEBUG", "TRACE" (default: "ERROR")Returns:
BytewaxTracer: Tracer object that must be kept alive for tracing to workUsage Examples:
from bytewax.tracing import setup_tracing, JaegerConfig
# Basic setup with default stdout logging
tracer = setup_tracing(log_level="INFO")
# Jaeger tracing setup
jaeger_config = JaegerConfig("my-dataflow")
tracer = setup_tracing(jaeger_config, log_level="DEBUG")
# Keep tracer alive during dataflow execution
cli_main(flow)
# OpenTelemetry setup
from bytewax.tracing import OtlpTracingConfig
otlp_config = OtlpTracingConfig("production-dataflow")
tracer = setup_tracing(otlp_config, log_level="WARN")
cli_main(flow)Jaeger configuration can be controlled through environment variables:
# Jaeger agent configuration
export OTEL_EXPORTER_JAEGER_AGENT_HOST="127.0.0.1"
export OTEL_EXPORTER_JAEGER_AGENT_PORT="6831"
# These take precedence over endpoint parameterBasic Production Setup:
import os
from bytewax.tracing import setup_tracing, OtlpTracingConfig
def setup_production_tracing():
"""Set up tracing for production environment."""
service_name = os.environ.get("SERVICE_NAME", "bytewax-dataflow")
otel_endpoint = os.environ.get("OTEL_ENDPOINT", "grpc://localhost:4317")
log_level = os.environ.get("LOG_LEVEL", "INFO")
sampling_ratio = float(os.environ.get("TRACE_SAMPLING_RATIO", "0.1"))
config = OtlpTracingConfig(
service_name=service_name,
url=otel_endpoint,
sampling_ratio=sampling_ratio
)
return setup_tracing(config, log_level)
# Use in production
tracer = setup_production_tracing()
cli_main(flow)Kubernetes Deployment with OpenTelemetry:
def setup_k8s_tracing():
"""Set up tracing for Kubernetes deployment."""
# Service name from deployment metadata
service_name = os.environ.get("K8S_SERVICE_NAME", "bytewax-app")
namespace = os.environ.get("K8S_NAMESPACE", "default")
pod_name = os.environ.get("HOSTNAME", "unknown-pod")
# Full service identifier
full_service_name = f"{service_name}.{namespace}"
# OpenTelemetry collector endpoint (typically a service)
otel_endpoint = os.environ.get("OTEL_COLLECTOR_ENDPOINT", "grpc://otel-collector:4317")
config = OtlpTracingConfig(
service_name=full_service_name,
url=otel_endpoint,
sampling_ratio=0.05 # 5% sampling in production
)
# Add pod name as resource attribute if possible
tracer = setup_tracing(config, log_level="WARN")
print(f"Tracing initialized for {full_service_name} on pod {pod_name}")
return tracer
tracer = setup_k8s_tracing()Development vs Production Configuration:
def setup_environment_tracing():
"""Configure tracing based on environment."""
environment = os.environ.get("ENVIRONMENT", "development")
if environment == "development":
# Verbose logging for development
return setup_tracing(log_level="DEBUG")
elif environment == "staging":
# Jaeger for staging environment
jaeger_config = JaegerConfig(
service_name=f"bytewax-staging",
sampling_ratio=1.0 # Sample everything in staging
)
return setup_tracing(jaeger_config, log_level="INFO")
elif environment == "production":
# OpenTelemetry for production
otlp_config = OtlpTracingConfig(
service_name="bytewax-production",
sampling_ratio=0.01 # 1% sampling in production
)
return setup_tracing(otlp_config, log_level="WARN")
else:
# Default to stdout logging
return setup_tracing(log_level="ERROR")
tracer = setup_environment_tracing()Custom Metrics Integration:
import time
import logging
from prometheus_client import Counter, Histogram, start_http_server
# Prometheus metrics
DATAFLOW_MESSAGES = Counter('bytewax_messages_total', 'Total messages processed', ['stage'])
DATAFLOW_LATENCY = Histogram('bytewax_processing_seconds', 'Processing latency', ['stage'])
class MetricsTracer:
def __init__(self, bytewax_tracer):
self.bytewax_tracer = bytewax_tracer
self.logger = logging.getLogger("metrics")
def record_message_processed(self, stage_name):
"""Record a message was processed at a stage."""
DATAFLOW_MESSAGES.labels(stage=stage_name).inc()
def record_processing_time(self, stage_name, duration_seconds):
"""Record processing time for a stage."""
DATAFLOW_LATENCY.labels(stage=stage_name).observe(duration_seconds)
def log_throughput(self, messages_per_second):
"""Log current throughput."""
self.logger.info(f"Throughput: {messages_per_second:.1f} msg/sec")
def setup_metrics_and_tracing():
"""Set up both Bytewax tracing and custom metrics."""
# Start Prometheus metrics server
start_http_server(8000)
# Set up Bytewax tracing
otlp_config = OtlpTracingConfig("bytewax-with-metrics")
bytewax_tracer = setup_tracing(otlp_config, log_level="INFO")
# Combine with custom metrics
return MetricsTracer(bytewax_tracer)
metrics_tracer = setup_metrics_and_tracing()Distributed Tracing Context:
import uuid
from datetime import datetime
class DataflowContext:
"""Add correlation IDs for distributed tracing."""
def __init__(self):
self.trace_id = str(uuid.uuid4())
self.start_time = datetime.now()
def log_event(self, event_name, **kwargs):
"""Log event with trace context."""
logging.info(f"[{self.trace_id}] {event_name}", extra={
"trace_id": self.trace_id,
"event": event_name,
**kwargs
})
def create_child_context(self, span_name):
"""Create child context for sub-operations."""
child = DataflowContext()
child.parent_trace_id = self.trace_id
child.span_name = span_name
return child
# Use in dataflow
context = DataflowContext()
context.log_event("dataflow_started", flow_name="my-flow")
# In operators, pass context through
def traced_map_function(item):
context.log_event("item_processed", item_id=item.get("id"))
return process_item(item)Health Check Integration:
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
class HealthAndMetricsHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/health":
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
health_status = {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"tracing_enabled": hasattr(self.server, 'tracer'),
"version": "1.0.0"
}
self.wfile.write(json.dumps(health_status).encode())
elif self.path == "/metrics":
# Serve Prometheus metrics
from prometheus_client import generate_latest
self.send_response(200)
self.send_header('Content-type', 'text/plain; version=0.0.4; charset=utf-8')
self.end_headers()
self.wfile.write(generate_latest())
else:
self.send_response(404)
self.end_headers()
def start_monitoring_server(tracer):
"""Start HTTP server for health checks and metrics."""
server = HTTPServer(("0.0.0.0", 8080), HealthAndMetricsHandler)
server.tracer = tracer
import threading
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
return server
# Complete production setup
tracer = setup_production_tracing()
monitoring_server = start_monitoring_server(tracer)
# Run dataflow
cli_main(flow)Log Structured Output:
import json
import logging
class StructuredFormatter(logging.Formatter):
"""Format logs as structured JSON."""
def format(self, record):
log_entry = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add trace context if available
if hasattr(record, 'trace_id'):
log_entry["trace_id"] = record.trace_id
# Add any extra fields
for key, value in record.__dict__.items():
if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname', 'filename',
'module', 'exc_info', 'exc_text', 'stack_info', 'lineno', 'funcName',
'created', 'msecs', 'relativeCreated', 'thread', 'threadName',
'processName', 'process', 'getMessage']:
log_entry[key] = value
return json.dumps(log_entry)
# Configure structured logging
def setup_structured_logging():
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
setup_structured_logging()
tracer = setup_tracing(log_level="INFO")Install with Tessl CLI
npx tessl i tessl/pypi-bytewax