0
# Tracing and Monitoring
1
2
Logging, tracing, and monitoring capabilities for observing dataflow execution. Supports integration with Jaeger, OpenTelemetry, and other observability platforms for production monitoring and debugging.
3
4
## Capabilities
5
6
### Tracing Configuration
7
8
Base classes and implementations for configuring where traces and logs are sent.
9
10
```python { .api }
11
class TracingConfig:
12
def __init__(self): ...
13
14
class JaegerConfig(TracingConfig):
15
def __init__(self, service_name: str, endpoint: Optional[str] = None, sampling_ratio: float = 1.0): ...
16
17
@property
18
def service_name(self) -> str: ...
19
20
@property
21
def endpoint(self) -> Optional[str]: ...
22
23
@property
24
def sampling_ratio(self) -> float: ...
25
26
class OtlpTracingConfig(TracingConfig):
27
def __init__(self, service_name: str, url: Optional[str] = None, sampling_ratio: float = 1.0): ...
28
29
@property
30
def service_name(self) -> str: ...
31
32
@property
33
def url(self) -> Optional[str]: ...
34
35
@property
36
def sampling_ratio(self) -> float: ...
37
```
38
39
**JaegerConfig Parameters:**
40
- `service_name` (str): Identifies this dataflow in Jaeger
41
- `endpoint` (str): Connection info, defaults to "127.0.0.1:6831" or uses environment variables
42
- `sampling_ratio` (float): Fraction of traces to send between 0.0 and 1.0
43
44
**OtlpTracingConfig Parameters:**
45
- `service_name` (str): Identifies this dataflow in OTLP
46
- `url` (str): Connection info, defaults to "grpc://127.0.0.1:4317"
47
- `sampling_ratio` (float): Fraction of traces to send between 0.0 and 1.0
48
49
**Usage Examples:**
50
```python
51
from bytewax.tracing import JaegerConfig, OtlpTracingConfig
52
53
# Jaeger configuration
54
jaeger_config = JaegerConfig(
55
service_name="my-dataflow",
56
endpoint="jaeger-collector:6831",
57
sampling_ratio=0.1 # Sample 10% of traces
58
)
59
60
# OpenTelemetry configuration (recommended)
61
otlp_config = OtlpTracingConfig(
62
service_name="my-dataflow",
63
url="grpc://otel-collector:4317",
64
sampling_ratio=1.0 # Sample all traces
65
)
66
```
67
68
### Tracing Setup
69
70
Function to initialize tracing and logging for the Bytewax runtime.
71
72
```python { .api }
73
def setup_tracing(tracing_config: Optional[TracingConfig] = None, log_level: Optional[str] = None) -> BytewaxTracer: ...
74
75
class BytewaxTracer:
76
"""Utility class used to handle tracing."""
77
...
78
```
79
80
**Parameters:**
81
- `tracing_config` (TracingConfig): Specific backend configuration (None for stdout logging)
82
- `log_level` (str): Log level - "ERROR", "WARN", "INFO", "DEBUG", "TRACE" (default: "ERROR")
83
84
**Returns:**
85
- `BytewaxTracer`: Tracer object that must be kept alive for tracing to work
86
87
**Usage Examples:**
88
```python
89
from bytewax.tracing import setup_tracing, JaegerConfig
90
91
# Basic setup with default stdout logging
92
tracer = setup_tracing(log_level="INFO")
93
94
# Jaeger tracing setup
95
jaeger_config = JaegerConfig("my-dataflow")
96
tracer = setup_tracing(jaeger_config, log_level="DEBUG")
97
98
# Keep tracer alive during dataflow execution
99
cli_main(flow)
100
101
# OpenTelemetry setup
102
from bytewax.tracing import OtlpTracingConfig
103
104
otlp_config = OtlpTracingConfig("production-dataflow")
105
tracer = setup_tracing(otlp_config, log_level="WARN")
106
cli_main(flow)
107
```
108
109
### Environment Variable Configuration
110
111
Jaeger configuration can be controlled through environment variables:
112
113
```bash
114
# Jaeger agent configuration
115
export OTEL_EXPORTER_JAEGER_AGENT_HOST="127.0.0.1"
116
export OTEL_EXPORTER_JAEGER_AGENT_PORT="6831"
117
118
# These take precedence over endpoint parameter
119
```
120
121
### Production Monitoring Patterns
122
123
**Basic Production Setup:**
124
```python
125
import os
126
from bytewax.tracing import setup_tracing, OtlpTracingConfig
127
128
def setup_production_tracing():
129
"""Set up tracing for production environment."""
130
service_name = os.environ.get("SERVICE_NAME", "bytewax-dataflow")
131
otel_endpoint = os.environ.get("OTEL_ENDPOINT", "grpc://localhost:4317")
132
log_level = os.environ.get("LOG_LEVEL", "INFO")
133
sampling_ratio = float(os.environ.get("TRACE_SAMPLING_RATIO", "0.1"))
134
135
config = OtlpTracingConfig(
136
service_name=service_name,
137
url=otel_endpoint,
138
sampling_ratio=sampling_ratio
139
)
140
141
return setup_tracing(config, log_level)
142
143
# Use in production
144
tracer = setup_production_tracing()
145
cli_main(flow)
146
```
147
148
**Kubernetes Deployment with OpenTelemetry:**
149
```python
150
def setup_k8s_tracing():
151
"""Set up tracing for Kubernetes deployment."""
152
# Service name from deployment metadata
153
service_name = os.environ.get("K8S_SERVICE_NAME", "bytewax-app")
154
namespace = os.environ.get("K8S_NAMESPACE", "default")
155
pod_name = os.environ.get("HOSTNAME", "unknown-pod")
156
157
# Full service identifier
158
full_service_name = f"{service_name}.{namespace}"
159
160
# OpenTelemetry collector endpoint (typically a service)
161
otel_endpoint = os.environ.get("OTEL_COLLECTOR_ENDPOINT", "grpc://otel-collector:4317")
162
163
config = OtlpTracingConfig(
164
service_name=full_service_name,
165
url=otel_endpoint,
166
sampling_ratio=0.05 # 5% sampling in production
167
)
168
169
# Add pod name as resource attribute if possible
170
tracer = setup_tracing(config, log_level="WARN")
171
172
print(f"Tracing initialized for {full_service_name} on pod {pod_name}")
173
return tracer
174
175
tracer = setup_k8s_tracing()
176
```
177
178
**Development vs Production Configuration:**
179
```python
180
def setup_environment_tracing():
181
"""Configure tracing based on environment."""
182
environment = os.environ.get("ENVIRONMENT", "development")
183
184
if environment == "development":
185
# Verbose logging for development
186
return setup_tracing(log_level="DEBUG")
187
188
elif environment == "staging":
189
# Jaeger for staging environment
190
jaeger_config = JaegerConfig(
191
service_name=f"bytewax-staging",
192
sampling_ratio=1.0 # Sample everything in staging
193
)
194
return setup_tracing(jaeger_config, log_level="INFO")
195
196
elif environment == "production":
197
# OpenTelemetry for production
198
otlp_config = OtlpTracingConfig(
199
service_name="bytewax-production",
200
sampling_ratio=0.01 # 1% sampling in production
201
)
202
return setup_tracing(otlp_config, log_level="WARN")
203
204
else:
205
# Default to stdout logging
206
return setup_tracing(log_level="ERROR")
207
208
tracer = setup_environment_tracing()
209
```
210
211
**Custom Metrics Integration:**
212
```python
213
import time
214
import logging
215
from prometheus_client import Counter, Histogram, start_http_server
216
217
# Prometheus metrics
218
DATAFLOW_MESSAGES = Counter('bytewax_messages_total', 'Total messages processed', ['stage'])
219
DATAFLOW_LATENCY = Histogram('bytewax_processing_seconds', 'Processing latency', ['stage'])
220
221
class MetricsTracer:
222
def __init__(self, bytewax_tracer):
223
self.bytewax_tracer = bytewax_tracer
224
self.logger = logging.getLogger("metrics")
225
226
def record_message_processed(self, stage_name):
227
"""Record a message was processed at a stage."""
228
DATAFLOW_MESSAGES.labels(stage=stage_name).inc()
229
230
def record_processing_time(self, stage_name, duration_seconds):
231
"""Record processing time for a stage."""
232
DATAFLOW_LATENCY.labels(stage=stage_name).observe(duration_seconds)
233
234
def log_throughput(self, messages_per_second):
235
"""Log current throughput."""
236
self.logger.info(f"Throughput: {messages_per_second:.1f} msg/sec")
237
238
def setup_metrics_and_tracing():
239
"""Set up both Bytewax tracing and custom metrics."""
240
# Start Prometheus metrics server
241
start_http_server(8000)
242
243
# Set up Bytewax tracing
244
otlp_config = OtlpTracingConfig("bytewax-with-metrics")
245
bytewax_tracer = setup_tracing(otlp_config, log_level="INFO")
246
247
# Combine with custom metrics
248
return MetricsTracer(bytewax_tracer)
249
250
metrics_tracer = setup_metrics_and_tracing()
251
```
252
253
**Distributed Tracing Context:**
254
```python
255
import uuid
256
from datetime import datetime
257
258
class DataflowContext:
259
"""Add correlation IDs for distributed tracing."""
260
261
def __init__(self):
262
self.trace_id = str(uuid.uuid4())
263
self.start_time = datetime.now()
264
265
def log_event(self, event_name, **kwargs):
266
"""Log event with trace context."""
267
logging.info(f"[{self.trace_id}] {event_name}", extra={
268
"trace_id": self.trace_id,
269
"event": event_name,
270
**kwargs
271
})
272
273
def create_child_context(self, span_name):
274
"""Create child context for sub-operations."""
275
child = DataflowContext()
276
child.parent_trace_id = self.trace_id
277
child.span_name = span_name
278
return child
279
280
# Use in dataflow
281
context = DataflowContext()
282
context.log_event("dataflow_started", flow_name="my-flow")
283
284
# In operators, pass context through
285
def traced_map_function(item):
286
context.log_event("item_processed", item_id=item.get("id"))
287
return process_item(item)
288
```
289
290
**Health Check Integration:**
291
```python
292
from http.server import HTTPServer, BaseHTTPRequestHandler
293
import json
294
295
class HealthAndMetricsHandler(BaseHTTPRequestHandler):
296
def do_GET(self):
297
if self.path == "/health":
298
self.send_response(200)
299
self.send_header('Content-type', 'application/json')
300
self.end_headers()
301
302
health_status = {
303
"status": "healthy",
304
"timestamp": datetime.now().isoformat(),
305
"tracing_enabled": hasattr(self.server, 'tracer'),
306
"version": "1.0.0"
307
}
308
309
self.wfile.write(json.dumps(health_status).encode())
310
311
elif self.path == "/metrics":
312
# Serve Prometheus metrics
313
from prometheus_client import generate_latest
314
self.send_response(200)
315
self.send_header('Content-type', 'text/plain; version=0.0.4; charset=utf-8')
316
self.end_headers()
317
self.wfile.write(generate_latest())
318
319
else:
320
self.send_response(404)
321
self.end_headers()
322
323
def start_monitoring_server(tracer):
324
"""Start HTTP server for health checks and metrics."""
325
server = HTTPServer(("0.0.0.0", 8080), HealthAndMetricsHandler)
326
server.tracer = tracer
327
328
import threading
329
server_thread = threading.Thread(target=server.serve_forever)
330
server_thread.daemon = True
331
server_thread.start()
332
333
return server
334
335
# Complete production setup
336
tracer = setup_production_tracing()
337
monitoring_server = start_monitoring_server(tracer)
338
339
# Run dataflow
340
cli_main(flow)
341
```
342
343
**Log Structured Output:**
344
```python
345
import json
346
import logging
347
348
class StructuredFormatter(logging.Formatter):
349
"""Format logs as structured JSON."""
350
351
def format(self, record):
352
log_entry = {
353
"timestamp": self.formatTime(record),
354
"level": record.levelname,
355
"logger": record.name,
356
"message": record.getMessage(),
357
"module": record.module,
358
"function": record.funcName,
359
"line": record.lineno
360
}
361
362
# Add trace context if available
363
if hasattr(record, 'trace_id'):
364
log_entry["trace_id"] = record.trace_id
365
366
# Add any extra fields
367
for key, value in record.__dict__.items():
368
if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname', 'filename',
369
'module', 'exc_info', 'exc_text', 'stack_info', 'lineno', 'funcName',
370
'created', 'msecs', 'relativeCreated', 'thread', 'threadName',
371
'processName', 'process', 'getMessage']:
372
log_entry[key] = value
373
374
return json.dumps(log_entry)
375
376
# Configure structured logging
377
def setup_structured_logging():
378
logger = logging.getLogger()
379
handler = logging.StreamHandler()
380
handler.setFormatter(StructuredFormatter())
381
logger.addHandler(handler)
382
logger.setLevel(logging.INFO)
383
384
setup_structured_logging()
385
tracer = setup_tracing(log_level="INFO")
386
```