or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddataflow.mdindex.mdoperators.mdrecovery.mdruntime.mdsinks.mdsources.mdstateful.mdtesting.mdtracing.mdwindowing.md

tracing.mddocs/

0

# Tracing and Monitoring

1

2

Logging, tracing, and monitoring capabilities for observing dataflow execution. Supports integration with Jaeger, OpenTelemetry, and other observability platforms for production monitoring and debugging.

3

4

## Capabilities

5

6

### Tracing Configuration

7

8

Base classes and implementations for configuring where traces and logs are sent.

9

10

```python { .api }

11

class TracingConfig:

12

def __init__(self): ...

13

14

class JaegerConfig(TracingConfig):

15

def __init__(self, service_name: str, endpoint: Optional[str] = None, sampling_ratio: float = 1.0): ...

16

17

@property

18

def service_name(self) -> str: ...

19

20

@property

21

def endpoint(self) -> Optional[str]: ...

22

23

@property

24

def sampling_ratio(self) -> float: ...

25

26

class OtlpTracingConfig(TracingConfig):

27

def __init__(self, service_name: str, url: Optional[str] = None, sampling_ratio: float = 1.0): ...

28

29

@property

30

def service_name(self) -> str: ...

31

32

@property

33

def url(self) -> Optional[str]: ...

34

35

@property

36

def sampling_ratio(self) -> float: ...

37

```

38

39

**JaegerConfig Parameters:**

40

- `service_name` (str): Identifies this dataflow in Jaeger

41

- `endpoint` (str): Connection info, defaults to "127.0.0.1:6831" or uses environment variables

42

- `sampling_ratio` (float): Fraction of traces to send between 0.0 and 1.0

43

44

**OtlpTracingConfig Parameters:**

45

- `service_name` (str): Identifies this dataflow in OTLP

46

- `url` (str): Connection info, defaults to "grpc://127.0.0.1:4317"

47

- `sampling_ratio` (float): Fraction of traces to send between 0.0 and 1.0

48

49

**Usage Examples:**

50

```python

51

from bytewax.tracing import JaegerConfig, OtlpTracingConfig

52

53

# Jaeger configuration

54

jaeger_config = JaegerConfig(

55

service_name="my-dataflow",

56

endpoint="jaeger-collector:6831",

57

sampling_ratio=0.1 # Sample 10% of traces

58

)

59

60

# OpenTelemetry configuration (recommended)

61

otlp_config = OtlpTracingConfig(

62

service_name="my-dataflow",

63

url="grpc://otel-collector:4317",

64

sampling_ratio=1.0 # Sample all traces

65

)

66

```

67

68

### Tracing Setup

69

70

Function to initialize tracing and logging for the Bytewax runtime.

71

72

```python { .api }

73

def setup_tracing(tracing_config: Optional[TracingConfig] = None, log_level: Optional[str] = None) -> BytewaxTracer: ...

74

75

class BytewaxTracer:

76

"""Utility class used to handle tracing."""

77

...

78

```

79

80

**Parameters:**

81

- `tracing_config` (TracingConfig): Specific backend configuration (None for stdout logging)

82

- `log_level` (str): Log level - "ERROR", "WARN", "INFO", "DEBUG", "TRACE" (default: "ERROR")

83

84

**Returns:**

85

- `BytewaxTracer`: Tracer object that must be kept alive for tracing to work

86

87

**Usage Examples:**

88

```python

89

from bytewax.tracing import setup_tracing, JaegerConfig

90

91

# Basic setup with default stdout logging

92

tracer = setup_tracing(log_level="INFO")

93

94

# Jaeger tracing setup

95

jaeger_config = JaegerConfig("my-dataflow")

96

tracer = setup_tracing(jaeger_config, log_level="DEBUG")

97

98

# Keep tracer alive during dataflow execution

99

cli_main(flow)

100

101

# OpenTelemetry setup

102

from bytewax.tracing import OtlpTracingConfig

103

104

otlp_config = OtlpTracingConfig("production-dataflow")

105

tracer = setup_tracing(otlp_config, log_level="WARN")

106

cli_main(flow)

107

```

108

109

### Environment Variable Configuration

110

111

Jaeger configuration can be controlled through environment variables:

112

113

```bash

114

# Jaeger agent configuration

115

export OTEL_EXPORTER_JAEGER_AGENT_HOST="127.0.0.1"

116

export OTEL_EXPORTER_JAEGER_AGENT_PORT="6831"

117

118

# These take precedence over endpoint parameter

119

```

120

121

### Production Monitoring Patterns

122

123

**Basic Production Setup:**

124

```python

125

import os

126

from bytewax.tracing import setup_tracing, OtlpTracingConfig

127

128

def setup_production_tracing():

129

"""Set up tracing for production environment."""

130

service_name = os.environ.get("SERVICE_NAME", "bytewax-dataflow")

131

otel_endpoint = os.environ.get("OTEL_ENDPOINT", "grpc://localhost:4317")

132

log_level = os.environ.get("LOG_LEVEL", "INFO")

133

sampling_ratio = float(os.environ.get("TRACE_SAMPLING_RATIO", "0.1"))

134

135

config = OtlpTracingConfig(

136

service_name=service_name,

137

url=otel_endpoint,

138

sampling_ratio=sampling_ratio

139

)

140

141

return setup_tracing(config, log_level)

142

143

# Use in production

144

tracer = setup_production_tracing()

145

cli_main(flow)

146

```

147

148

**Kubernetes Deployment with OpenTelemetry:**

149

```python

150

def setup_k8s_tracing():

151

"""Set up tracing for Kubernetes deployment."""

152

# Service name from deployment metadata

153

service_name = os.environ.get("K8S_SERVICE_NAME", "bytewax-app")

154

namespace = os.environ.get("K8S_NAMESPACE", "default")

155

pod_name = os.environ.get("HOSTNAME", "unknown-pod")

156

157

# Full service identifier

158

full_service_name = f"{service_name}.{namespace}"

159

160

# OpenTelemetry collector endpoint (typically a service)

161

otel_endpoint = os.environ.get("OTEL_COLLECTOR_ENDPOINT", "grpc://otel-collector:4317")

162

163

config = OtlpTracingConfig(

164

service_name=full_service_name,

165

url=otel_endpoint,

166

sampling_ratio=0.05 # 5% sampling in production

167

)

168

169

# Add pod name as resource attribute if possible

170

tracer = setup_tracing(config, log_level="WARN")

171

172

print(f"Tracing initialized for {full_service_name} on pod {pod_name}")

173

return tracer

174

175

tracer = setup_k8s_tracing()

176

```

177

178

**Development vs Production Configuration:**

179

```python

180

def setup_environment_tracing():

181

"""Configure tracing based on environment."""

182

environment = os.environ.get("ENVIRONMENT", "development")

183

184

if environment == "development":

185

# Verbose logging for development

186

return setup_tracing(log_level="DEBUG")

187

188

elif environment == "staging":

189

# Jaeger for staging environment

190

jaeger_config = JaegerConfig(

191

service_name=f"bytewax-staging",

192

sampling_ratio=1.0 # Sample everything in staging

193

)

194

return setup_tracing(jaeger_config, log_level="INFO")

195

196

elif environment == "production":

197

# OpenTelemetry for production

198

otlp_config = OtlpTracingConfig(

199

service_name="bytewax-production",

200

sampling_ratio=0.01 # 1% sampling in production

201

)

202

return setup_tracing(otlp_config, log_level="WARN")

203

204

else:

205

# Default to stdout logging

206

return setup_tracing(log_level="ERROR")

207

208

tracer = setup_environment_tracing()

209

```

210

211

**Custom Metrics Integration:**

212

```python

213

import time

214

import logging

215

from prometheus_client import Counter, Histogram, start_http_server

216

217

# Prometheus metrics

218

DATAFLOW_MESSAGES = Counter('bytewax_messages_total', 'Total messages processed', ['stage'])

219

DATAFLOW_LATENCY = Histogram('bytewax_processing_seconds', 'Processing latency', ['stage'])

220

221

class MetricsTracer:

222

def __init__(self, bytewax_tracer):

223

self.bytewax_tracer = bytewax_tracer

224

self.logger = logging.getLogger("metrics")

225

226

def record_message_processed(self, stage_name):

227

"""Record a message was processed at a stage."""

228

DATAFLOW_MESSAGES.labels(stage=stage_name).inc()

229

230

def record_processing_time(self, stage_name, duration_seconds):

231

"""Record processing time for a stage."""

232

DATAFLOW_LATENCY.labels(stage=stage_name).observe(duration_seconds)

233

234

def log_throughput(self, messages_per_second):

235

"""Log current throughput."""

236

self.logger.info(f"Throughput: {messages_per_second:.1f} msg/sec")

237

238

def setup_metrics_and_tracing():

239

"""Set up both Bytewax tracing and custom metrics."""

240

# Start Prometheus metrics server

241

start_http_server(8000)

242

243

# Set up Bytewax tracing

244

otlp_config = OtlpTracingConfig("bytewax-with-metrics")

245

bytewax_tracer = setup_tracing(otlp_config, log_level="INFO")

246

247

# Combine with custom metrics

248

return MetricsTracer(bytewax_tracer)

249

250

metrics_tracer = setup_metrics_and_tracing()

251

```

252

253

**Distributed Tracing Context:**

254

```python

255

import uuid

256

from datetime import datetime

257

258

class DataflowContext:

259

"""Add correlation IDs for distributed tracing."""

260

261

def __init__(self):

262

self.trace_id = str(uuid.uuid4())

263

self.start_time = datetime.now()

264

265

def log_event(self, event_name, **kwargs):

266

"""Log event with trace context."""

267

logging.info(f"[{self.trace_id}] {event_name}", extra={

268

"trace_id": self.trace_id,

269

"event": event_name,

270

**kwargs

271

})

272

273

def create_child_context(self, span_name):

274

"""Create child context for sub-operations."""

275

child = DataflowContext()

276

child.parent_trace_id = self.trace_id

277

child.span_name = span_name

278

return child

279

280

# Use in dataflow

281

context = DataflowContext()

282

context.log_event("dataflow_started", flow_name="my-flow")

283

284

# In operators, pass context through

285

def traced_map_function(item):

286

context.log_event("item_processed", item_id=item.get("id"))

287

return process_item(item)

288

```

289

290

**Health Check Integration:**

291

```python

292

from http.server import HTTPServer, BaseHTTPRequestHandler

293

import json

294

295

class HealthAndMetricsHandler(BaseHTTPRequestHandler):

296

def do_GET(self):

297

if self.path == "/health":

298

self.send_response(200)

299

self.send_header('Content-type', 'application/json')

300

self.end_headers()

301

302

health_status = {

303

"status": "healthy",

304

"timestamp": datetime.now().isoformat(),

305

"tracing_enabled": hasattr(self.server, 'tracer'),

306

"version": "1.0.0"

307

}

308

309

self.wfile.write(json.dumps(health_status).encode())

310

311

elif self.path == "/metrics":

312

# Serve Prometheus metrics

313

from prometheus_client import generate_latest

314

self.send_response(200)

315

self.send_header('Content-type', 'text/plain; version=0.0.4; charset=utf-8')

316

self.end_headers()

317

self.wfile.write(generate_latest())

318

319

else:

320

self.send_response(404)

321

self.end_headers()

322

323

def start_monitoring_server(tracer):

324

"""Start HTTP server for health checks and metrics."""

325

server = HTTPServer(("0.0.0.0", 8080), HealthAndMetricsHandler)

326

server.tracer = tracer

327

328

import threading

329

server_thread = threading.Thread(target=server.serve_forever)

330

server_thread.daemon = True

331

server_thread.start()

332

333

return server

334

335

# Complete production setup

336

tracer = setup_production_tracing()

337

monitoring_server = start_monitoring_server(tracer)

338

339

# Run dataflow

340

cli_main(flow)

341

```

342

343

**Log Structured Output:**

344

```python

345

import json

346

import logging

347

348

class StructuredFormatter(logging.Formatter):

349

"""Format logs as structured JSON."""

350

351

def format(self, record):

352

log_entry = {

353

"timestamp": self.formatTime(record),

354

"level": record.levelname,

355

"logger": record.name,

356

"message": record.getMessage(),

357

"module": record.module,

358

"function": record.funcName,

359

"line": record.lineno

360

}

361

362

# Add trace context if available

363

if hasattr(record, 'trace_id'):

364

log_entry["trace_id"] = record.trace_id

365

366

# Add any extra fields

367

for key, value in record.__dict__.items():

368

if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname', 'filename',

369

'module', 'exc_info', 'exc_text', 'stack_info', 'lineno', 'funcName',

370

'created', 'msecs', 'relativeCreated', 'thread', 'threadName',

371

'processName', 'process', 'getMessage']:

372

log_entry[key] = value

373

374

return json.dumps(log_entry)

375

376

# Configure structured logging

377

def setup_structured_logging():

378

logger = logging.getLogger()

379

handler = logging.StreamHandler()

380

handler.setFormatter(StructuredFormatter())

381

logger.addHandler(handler)

382

logger.setLevel(logging.INFO)

383

384

setup_structured_logging()

385

tracer = setup_tracing(log_level="INFO")

386

```