or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

common.mdexporters.mdindex.mdlogging.mdmetrics-stats.mdtags-context.mdtracing.md

exporters.mddocs/

0

# Exporters and Transports

1

2

Export collected traces and metrics to various backends including console output, files, and cloud services. Includes both synchronous and asynchronous transport options with configurable batching and error handling.

3

4

## Capabilities

5

6

### Base Exporter Interface

7

8

Foundation interface for all trace exporters, providing consistent export and emission methods for sending telemetry data to various backends.

9

10

```python { .api }

11

class Exporter:

12

"""Base class for trace exporters."""

13

14

def export(self, span_datas):

15

"""

16

Export span data to configured destination.

17

18

Parameters:

19

- span_datas: list, list of SpanData objects to export

20

"""

21

22

def emit(self, span_datas):

23

"""

24

Emit span data immediately.

25

26

Parameters:

27

- span_datas: list, list of SpanData objects to emit

28

"""

29

```

30

31

### Built-in Trace Exporters

32

33

Ready-to-use exporters for common output destinations including console, files, and Python logging system.

34

35

```python { .api }

36

class PrintExporter(Exporter):

37

"""

38

Console output exporter for debugging and development.

39

40

Exports trace data to stdout in human-readable format.

41

42

Parameters:

43

- transport: Transport, transport mechanism (default: SyncTransport)

44

"""

45

def __init__(self, transport=SyncTransport): ...

46

47

def export(self, span_datas):

48

"""Export spans to console output."""

49

50

def emit(self, span_datas):

51

"""Emit spans to console immediately."""

52

53

class FileExporter(Exporter):

54

"""

55

File output exporter for persistent trace storage.

56

57

Parameters:

58

- file_name: str, output file path (default: 'opencensus-traces.json')

59

- transport: Transport, transport mechanism (default: SyncTransport)

60

- file_mode: str, file mode for writing (default: 'w+')

61

"""

62

def __init__(self, file_name='opencensus-traces.json', transport=SyncTransport, file_mode='w+'): ...

63

64

def export(self, span_datas):

65

"""Export spans to configured file."""

66

67

def emit(self, span_datas):

68

"""Write spans to file immediately."""

69

70

class LoggingExporter(Exporter):

71

"""

72

Python logging system exporter.

73

74

Exports trace data through Python's logging system,

75

allowing integration with existing log configuration.

76

77

Parameters:

78

- handler: logging.Handler, custom log handler (optional)

79

- transport: Transport, transport mechanism (default: SyncTransport)

80

"""

81

def __init__(self, handler=None, transport=SyncTransport): ...

82

83

def export(self, span_datas):

84

"""Export spans through logging system."""

85

86

def emit(self, span_datas):

87

"""Log spans immediately."""

88

```

89

90

### Transport Mechanisms

91

92

Control how exported data is transmitted, with options for synchronous, asynchronous, and batched delivery.

93

94

```python { .api }

95

class Transport:

96

"""Base transport class for data transmission."""

97

98

def export(self, datas):

99

"""

100

Export data using this transport.

101

102

Parameters:

103

- datas: list, data objects to export

104

"""

105

106

def flush(self):

107

"""Flush any pending data."""

108

109

class SyncTransport(Transport):

110

"""

111

Synchronous transport for immediate data export.

112

113

Parameters:

114

- exporter: Exporter, destination exporter

115

"""

116

def __init__(self, exporter): ...

117

118

def export(self, datas):

119

"""Export data synchronously through configured exporter."""

120

121

class AsyncTransport(Transport):

122

"""

123

Asynchronous transport with background thread processing.

124

125

Provides batching, buffering, and automatic retry capabilities

126

for high-throughput scenarios.

127

128

Parameters:

129

- exporter: Exporter, destination exporter

130

- grace_period: float, shutdown grace period in seconds (default: 5.0)

131

- max_batch_size: int, maximum batch size (default: 600)

132

- wait_period: float, batch flush interval in seconds (default: 60.0)

133

"""

134

def __init__(self, exporter, grace_period=5.0, max_batch_size=600, wait_period=60.0): ...

135

136

def export(self, data):

137

"""Queue data for asynchronous export."""

138

139

def flush(self):

140

"""

141

Flush all pending data and wait for completion.

142

143

Blocks until all queued data has been exported or

144

grace period expires.

145

"""

146

```

147

148

### Metrics Export Infrastructure

149

150

Periodic export of metrics with background processing, aggregation, and error handling for monitoring systems.

151

152

```python { .api }

153

class PeriodicMetricTask:

154

"""

155

Periodic metric export task with background thread.

156

157

Parameters:

158

- interval: float, export interval in seconds

159

- function: callable, export function to call

160

- args: tuple, function arguments

161

- kwargs: dict, function keyword arguments

162

- name: str, task name for identification

163

"""

164

def __init__(self, interval=None, function=None, args=None, kwargs=None, name=None): ...

165

166

def run(self):

167

"""Start periodic metric export in background thread."""

168

169

def close(self):

170

"""

171

Stop periodic export and cleanup resources.

172

173

Stops background thread and waits for completion.

174

"""

175

176

def get_exporter_thread(metric_producers, exporter, interval=None):

177

"""

178

Create and start metric export task.

179

180

Parameters:

181

- metric_producers: list, MetricProducer instances to export from

182

- exporter: Exporter, destination for exported metrics

183

- interval: float, export interval in seconds (default: 60)

184

185

Returns:

186

PeriodicMetricTask: Running export task

187

"""

188

189

class TransportError(Exception):

190

"""Exception raised for transport-related errors."""

191

pass

192

193

# Constants

194

DEFAULT_INTERVAL = 60

195

"""int: Default export interval in seconds"""

196

197

GRACE_PERIOD = 5

198

"""int: Default grace period for shutdown in seconds"""

199

```

200

201

## Usage Examples

202

203

### Console Export for Development

204

205

```python

206

from opencensus.trace.tracer import Tracer

207

from opencensus.trace import print_exporter

208

from opencensus.trace.samplers import AlwaysOnSampler

209

210

# Create tracer with console export

211

tracer = Tracer(

212

sampler=AlwaysOnSampler(),

213

exporter=print_exporter.PrintExporter()

214

)

215

216

# Traced operations will print to console

217

with tracer.span('development_operation') as span:

218

span.add_attribute('debug_info', 'testing console export')

219

span.add_annotation('Starting development test')

220

221

# Your application logic

222

result = process_data()

223

224

span.add_attribute('result_size', len(result))

225

span.add_annotation('Development test completed')

226

227

# Console output will show span details:

228

# Span: development_operation

229

# Start: 2024-01-15T10:30:15.123456Z

230

# End: 2024-01-15T10:30:15.234567Z

231

# Attributes: {'debug_info': 'testing console export', 'result_size': 42}

232

# Annotations: [{'description': 'Starting development test', ...}, ...]

233

```

234

235

### File Export for Persistent Storage

236

237

```python

238

from opencensus.trace.tracer import Tracer

239

from opencensus.trace import file_exporter

240

from opencensus.trace.samplers import ProbabilitySampler

241

from opencensus.common.transports import AsyncTransport

242

243

# Create file exporter with async transport

244

exporter = file_exporter.FileExporter(

245

file_name='./traces.json',

246

transport=AsyncTransport(

247

exporter=file_exporter.FileExporter('./traces.json'),

248

max_batch_size=100,

249

wait_period=30.0

250

)

251

)

252

253

tracer = Tracer(

254

sampler=ProbabilitySampler(rate=0.1),

255

exporter=exporter

256

)

257

258

# High-throughput scenario

259

for i in range(1000):

260

with tracer.span(f'batch_operation_{i}') as span:

261

span.add_attribute('batch_id', i)

262

span.add_attribute('operation_type', 'batch_process')

263

264

# Process item

265

process_batch_item(i)

266

267

if i % 100 == 0:

268

span.add_annotation(f'Completed batch {i//100}')

269

270

# Flush remaining data before shutdown

271

exporter.transport.flush()

272

```

273

274

### Logging System Integration

275

276

```python

277

import logging

278

from opencensus.trace.tracer import Tracer

279

from opencensus.trace import logging_exporter

280

from opencensus.common.transports import SyncTransport

281

282

# Configure logging system

283

logging.basicConfig(

284

level=logging.INFO,

285

format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',

286

handlers=[

287

logging.FileHandler('application.log'),

288

logging.StreamHandler()

289

]

290

)

291

292

# Create logging exporter

293

exporter = logging_exporter.LoggingExporter(

294

logger_name='opencensus.traces',

295

transport=SyncTransport(logging_exporter.LoggingExporter())

296

)

297

298

tracer = Tracer(exporter=exporter)

299

300

# Traces will appear in logs

301

with tracer.span('user_authentication') as span:

302

span.add_attribute('user_id', 'user123')

303

span.add_attribute('auth_method', 'oauth2')

304

305

try:

306

authenticate_user('user123')

307

span.add_annotation('Authentication successful')

308

309

except AuthenticationError as e:

310

span.set_status(Status.from_exception(e))

311

span.add_annotation('Authentication failed')

312

raise

313

314

# Log entries will include trace information:

315

# 2024-01-15 10:30:15 - opencensus.traces - INFO - Span: user_authentication ...

316

```

317

318

### Custom Exporter Implementation

319

320

```python

321

from opencensus.trace.base_exporter import Exporter

322

import json

323

import requests

324

325

class HTTPExporter(Exporter):

326

"""Custom exporter that sends traces to HTTP endpoint."""

327

328

def __init__(self, endpoint_url, api_key=None, timeout=10):

329

self.endpoint_url = endpoint_url

330

self.api_key = api_key

331

self.timeout = timeout

332

self.session = requests.Session()

333

334

if api_key:

335

self.session.headers.update({'Authorization': f'Bearer {api_key}'})

336

337

def export(self, span_datas):

338

"""Export spans to HTTP endpoint."""

339

try:

340

# Convert spans to JSON format

341

trace_data = []

342

for span_data in span_datas:

343

trace_data.append({

344

'traceId': span_data.context.trace_id,

345

'spanId': span_data.span_id,

346

'name': span_data.name,

347

'startTime': span_data.start_time.isoformat(),

348

'endTime': span_data.end_time.isoformat() if span_data.end_time else None,

349

'attributes': dict(span_data.attributes or {}),

350

'status': {

351

'code': span_data.status.canonical_code if span_data.status else 0,

352

'message': span_data.status.message if span_data.status else None

353

}

354

})

355

356

# Send to endpoint

357

response = self.session.post(

358

self.endpoint_url,

359

json={'traces': trace_data},

360

timeout=self.timeout

361

)

362

response.raise_for_status()

363

364

except Exception as e:

365

print(f"Export failed: {e}")

366

367

def emit(self, span_datas):

368

"""Emit spans immediately."""

369

self.export(span_datas)

370

371

# Usage

372

custom_exporter = HTTPExporter(

373

endpoint_url='https://monitoring.example.com/traces',

374

api_key='your-api-key'

375

)

376

377

tracer = Tracer(exporter=custom_exporter)

378

379

with tracer.span('custom_export_test') as span:

380

span.add_attribute('custom_field', 'test_value')

381

# Span will be sent to HTTP endpoint

382

```

383

384

### Periodic Metrics Export

385

386

```python

387

from opencensus.metrics.transport import get_exporter_thread, PeriodicMetricTask

388

from opencensus.metrics.export import MetricProducer, Metric, MetricDescriptor

389

from opencensus.stats import Stats

390

import time

391

392

class CustomMetricProducer(MetricProducer):

393

"""Custom metric producer for application metrics."""

394

395

def __init__(self):

396

self.stats = Stats()

397

398

def get_metrics(self):

399

"""Get current metrics for export."""

400

# Collect current view data

401

view_datas = self.stats.view_manager.get_all_exported_data()

402

403

metrics = []

404

for view_data in view_datas:

405

# Convert view data to metrics

406

metric = self._convert_view_to_metric(view_data)

407

if metric:

408

metrics.append(metric)

409

410

return metrics

411

412

def _convert_view_to_metric(self, view_data):

413

"""Convert view data to metric format."""

414

# Implementation depends on your specific metric format

415

pass

416

417

class ConsoleMetricExporter:

418

"""Simple console exporter for metrics."""

419

420

def export(self, metrics):

421

"""Export metrics to console."""

422

print(f"=== Metrics Export ({len(metrics)} metrics) ===")

423

for metric in metrics:

424

print(f"Metric: {metric.descriptor.name}")

425

print(f" Description: {metric.descriptor.description}")

426

print(f" Time Series: {len(metric.time_series)}")

427

print("=" * 50)

428

429

# Set up periodic export

430

producer = CustomMetricProducer()

431

exporter = ConsoleMetricExporter()

432

433

# Start periodic export every 30 seconds

434

export_task = get_exporter_thread(

435

metric_producers=[producer],

436

exporter=exporter,

437

interval=30

438

)

439

440

# Let it run for a while

441

try:

442

time.sleep(120) # Run for 2 minutes

443

finally:

444

# Clean shutdown

445

export_task.close()

446

```

447

448

### Advanced Transport Configuration

449

450

```python

451

from opencensus.trace.tracer import Tracer

452

from opencensus.trace import file_exporter

453

from opencensus.common.transports import AsyncTransport

454

import signal

455

import sys

456

457

class GracefulFileExporter:

458

"""File exporter with graceful shutdown handling."""

459

460

def __init__(self, file_path, batch_size=500, flush_interval=60):

461

self.exporter = file_exporter.FileExporter(file_path)

462

self.transport = AsyncTransport(

463

exporter=self.exporter,

464

max_batch_size=batch_size,

465

wait_period=flush_interval,

466

grace_period=10.0

467

)

468

469

# Set up signal handlers for graceful shutdown

470

signal.signal(signal.SIGINT, self._signal_handler)

471

signal.signal(signal.SIGTERM, self._signal_handler)

472

473

def _signal_handler(self, signum, frame):

474

"""Handle shutdown signals gracefully."""

475

print(f"Received signal {signum}, flushing traces...")

476

self.transport.flush()

477

sys.exit(0)

478

479

def export(self, span_datas):

480

"""Export spans through async transport."""

481

for span_data in span_datas:

482

self.transport.export(span_data)

483

484

# Usage in production application

485

exporter = GracefulFileExporter('./production_traces.jsonl')

486

tracer = Tracer(exporter=exporter)

487

488

# Application will gracefully flush traces on shutdown

489

try:

490

while True: # Main application loop

491

with tracer.span('main_operation') as span:

492

# Your application logic

493

process_requests()

494

495

except KeyboardInterrupt:

496

print("Shutting down gracefully...")

497

# Signal handler will flush remaining traces

498

```