0
# Exporters and Transports
1
2
Export collected traces and metrics to various backends including console output, files, and cloud services. Includes both synchronous and asynchronous transport options with configurable batching and error handling.
3
4
## Capabilities
5
6
### Base Exporter Interface
7
8
Foundation interface for all trace exporters, providing consistent export and emission methods for sending telemetry data to various backends.
9
10
```python { .api }
11
class Exporter:
12
"""Base class for trace exporters."""
13
14
def export(self, span_datas):
15
"""
16
Export span data to configured destination.
17
18
Parameters:
19
- span_datas: list, list of SpanData objects to export
20
"""
21
22
def emit(self, span_datas):
23
"""
24
Emit span data immediately.
25
26
Parameters:
27
- span_datas: list, list of SpanData objects to emit
28
"""
29
```
30
31
### Built-in Trace Exporters
32
33
Ready-to-use exporters for common output destinations including console, files, and Python logging system.
34
35
```python { .api }
36
class PrintExporter(Exporter):
37
"""
38
Console output exporter for debugging and development.
39
40
Exports trace data to stdout in human-readable format.
41
42
Parameters:
43
- transport: Transport, transport mechanism (default: SyncTransport)
44
"""
45
def __init__(self, transport=SyncTransport): ...
46
47
def export(self, span_datas):
48
"""Export spans to console output."""
49
50
def emit(self, span_datas):
51
"""Emit spans to console immediately."""
52
53
class FileExporter(Exporter):
54
"""
55
File output exporter for persistent trace storage.
56
57
Parameters:
58
- file_name: str, output file path (default: 'opencensus-traces.json')
59
- transport: Transport, transport mechanism (default: SyncTransport)
60
- file_mode: str, file mode for writing (default: 'w+')
61
"""
62
def __init__(self, file_name='opencensus-traces.json', transport=SyncTransport, file_mode='w+'): ...
63
64
def export(self, span_datas):
65
"""Export spans to configured file."""
66
67
def emit(self, span_datas):
68
"""Write spans to file immediately."""
69
70
class LoggingExporter(Exporter):
71
"""
72
Python logging system exporter.
73
74
Exports trace data through Python's logging system,
75
allowing integration with existing log configuration.
76
77
Parameters:
78
- handler: logging.Handler, custom log handler (optional)
79
- transport: Transport, transport mechanism (default: SyncTransport)
80
"""
81
def __init__(self, handler=None, transport=SyncTransport): ...
82
83
def export(self, span_datas):
84
"""Export spans through logging system."""
85
86
def emit(self, span_datas):
87
"""Log spans immediately."""
88
```
89
90
### Transport Mechanisms
91
92
Control how exported data is transmitted, with options for synchronous, asynchronous, and batched delivery.
93
94
```python { .api }
95
class Transport:
96
"""Base transport class for data transmission."""
97
98
def export(self, datas):
99
"""
100
Export data using this transport.
101
102
Parameters:
103
- datas: list, data objects to export
104
"""
105
106
def flush(self):
107
"""Flush any pending data."""
108
109
class SyncTransport(Transport):
110
"""
111
Synchronous transport for immediate data export.
112
113
Parameters:
114
- exporter: Exporter, destination exporter
115
"""
116
def __init__(self, exporter): ...
117
118
def export(self, datas):
119
"""Export data synchronously through configured exporter."""
120
121
class AsyncTransport(Transport):
122
"""
123
Asynchronous transport with background thread processing.
124
125
Provides batching, buffering, and automatic retry capabilities
126
for high-throughput scenarios.
127
128
Parameters:
129
- exporter: Exporter, destination exporter
130
- grace_period: float, shutdown grace period in seconds (default: 5.0)
131
- max_batch_size: int, maximum batch size (default: 600)
132
- wait_period: float, batch flush interval in seconds (default: 60.0)
133
"""
134
def __init__(self, exporter, grace_period=5.0, max_batch_size=600, wait_period=60.0): ...
135
136
def export(self, data):
137
"""Queue data for asynchronous export."""
138
139
def flush(self):
140
"""
141
Flush all pending data and wait for completion.
142
143
Blocks until all queued data has been exported or
144
grace period expires.
145
"""
146
```
147
148
### Metrics Export Infrastructure
149
150
Periodic export of metrics with background processing, aggregation, and error handling for monitoring systems.
151
152
```python { .api }
153
class PeriodicMetricTask:
154
"""
155
Periodic metric export task with background thread.
156
157
Parameters:
158
- interval: float, export interval in seconds
159
- function: callable, export function to call
160
- args: tuple, function arguments
161
- kwargs: dict, function keyword arguments
162
- name: str, task name for identification
163
"""
164
def __init__(self, interval=None, function=None, args=None, kwargs=None, name=None): ...
165
166
def run(self):
167
"""Start periodic metric export in background thread."""
168
169
def close(self):
170
"""
171
Stop periodic export and cleanup resources.
172
173
Stops background thread and waits for completion.
174
"""
175
176
def get_exporter_thread(metric_producers, exporter, interval=None):
177
"""
178
Create and start metric export task.
179
180
Parameters:
181
- metric_producers: list, MetricProducer instances to export from
182
- exporter: Exporter, destination for exported metrics
183
- interval: float, export interval in seconds (default: 60)
184
185
Returns:
186
PeriodicMetricTask: Running export task
187
"""
188
189
class TransportError(Exception):
190
"""Exception raised for transport-related errors."""
191
pass
192
193
# Constants
194
DEFAULT_INTERVAL = 60
195
"""int: Default export interval in seconds"""
196
197
GRACE_PERIOD = 5
198
"""int: Default grace period for shutdown in seconds"""
199
```
200
201
## Usage Examples
202
203
### Console Export for Development
204
205
```python
206
from opencensus.trace.tracer import Tracer
207
from opencensus.trace import print_exporter
208
from opencensus.trace.samplers import AlwaysOnSampler
209
210
# Create tracer with console export
211
tracer = Tracer(
212
sampler=AlwaysOnSampler(),
213
exporter=print_exporter.PrintExporter()
214
)
215
216
# Traced operations will print to console
217
with tracer.span('development_operation') as span:
218
span.add_attribute('debug_info', 'testing console export')
219
span.add_annotation('Starting development test')
220
221
# Your application logic
222
result = process_data()
223
224
span.add_attribute('result_size', len(result))
225
span.add_annotation('Development test completed')
226
227
# Console output will show span details:
228
# Span: development_operation
229
# Start: 2024-01-15T10:30:15.123456Z
230
# End: 2024-01-15T10:30:15.234567Z
231
# Attributes: {'debug_info': 'testing console export', 'result_size': 42}
232
# Annotations: [{'description': 'Starting development test', ...}, ...]
233
```
234
235
### File Export for Persistent Storage
236
237
```python
238
from opencensus.trace.tracer import Tracer
239
from opencensus.trace import file_exporter
240
from opencensus.trace.samplers import ProbabilitySampler
241
from opencensus.common.transports import AsyncTransport
242
243
# Create file exporter with async transport
244
exporter = file_exporter.FileExporter(
245
file_name='./traces.json',
246
transport=AsyncTransport(
247
exporter=file_exporter.FileExporter('./traces.json'),
248
max_batch_size=100,
249
wait_period=30.0
250
)
251
)
252
253
tracer = Tracer(
254
sampler=ProbabilitySampler(rate=0.1),
255
exporter=exporter
256
)
257
258
# High-throughput scenario
259
for i in range(1000):
260
with tracer.span(f'batch_operation_{i}') as span:
261
span.add_attribute('batch_id', i)
262
span.add_attribute('operation_type', 'batch_process')
263
264
# Process item
265
process_batch_item(i)
266
267
if i % 100 == 0:
268
span.add_annotation(f'Completed batch {i//100}')
269
270
# Flush remaining data before shutdown
271
exporter.transport.flush()
272
```
273
274
### Logging System Integration
275
276
```python
277
import logging
278
from opencensus.trace.tracer import Tracer
279
from opencensus.trace import logging_exporter
280
from opencensus.common.transports import SyncTransport
281
282
# Configure logging system
283
logging.basicConfig(
284
level=logging.INFO,
285
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
286
handlers=[
287
logging.FileHandler('application.log'),
288
logging.StreamHandler()
289
]
290
)
291
292
# Create logging exporter
293
exporter = logging_exporter.LoggingExporter(
294
logger_name='opencensus.traces',
295
transport=SyncTransport(logging_exporter.LoggingExporter())
296
)
297
298
tracer = Tracer(exporter=exporter)
299
300
# Traces will appear in logs
301
with tracer.span('user_authentication') as span:
302
span.add_attribute('user_id', 'user123')
303
span.add_attribute('auth_method', 'oauth2')
304
305
try:
306
authenticate_user('user123')
307
span.add_annotation('Authentication successful')
308
309
except AuthenticationError as e:
310
span.set_status(Status.from_exception(e))
311
span.add_annotation('Authentication failed')
312
raise
313
314
# Log entries will include trace information:
315
# 2024-01-15 10:30:15 - opencensus.traces - INFO - Span: user_authentication ...
316
```
317
318
### Custom Exporter Implementation
319
320
```python
321
from opencensus.trace.base_exporter import Exporter
322
import json
323
import requests
324
325
class HTTPExporter(Exporter):
326
"""Custom exporter that sends traces to HTTP endpoint."""
327
328
def __init__(self, endpoint_url, api_key=None, timeout=10):
329
self.endpoint_url = endpoint_url
330
self.api_key = api_key
331
self.timeout = timeout
332
self.session = requests.Session()
333
334
if api_key:
335
self.session.headers.update({'Authorization': f'Bearer {api_key}'})
336
337
def export(self, span_datas):
338
"""Export spans to HTTP endpoint."""
339
try:
340
# Convert spans to JSON format
341
trace_data = []
342
for span_data in span_datas:
343
trace_data.append({
344
'traceId': span_data.context.trace_id,
345
'spanId': span_data.span_id,
346
'name': span_data.name,
347
'startTime': span_data.start_time.isoformat(),
348
'endTime': span_data.end_time.isoformat() if span_data.end_time else None,
349
'attributes': dict(span_data.attributes or {}),
350
'status': {
351
'code': span_data.status.canonical_code if span_data.status else 0,
352
'message': span_data.status.message if span_data.status else None
353
}
354
})
355
356
# Send to endpoint
357
response = self.session.post(
358
self.endpoint_url,
359
json={'traces': trace_data},
360
timeout=self.timeout
361
)
362
response.raise_for_status()
363
364
except Exception as e:
365
print(f"Export failed: {e}")
366
367
def emit(self, span_datas):
368
"""Emit spans immediately."""
369
self.export(span_datas)
370
371
# Usage
372
custom_exporter = HTTPExporter(
373
endpoint_url='https://monitoring.example.com/traces',
374
api_key='your-api-key'
375
)
376
377
tracer = Tracer(exporter=custom_exporter)
378
379
with tracer.span('custom_export_test') as span:
380
span.add_attribute('custom_field', 'test_value')
381
# Span will be sent to HTTP endpoint
382
```
383
384
### Periodic Metrics Export
385
386
```python
387
from opencensus.metrics.transport import get_exporter_thread, PeriodicMetricTask
388
from opencensus.metrics.export import MetricProducer, Metric, MetricDescriptor
389
from opencensus.stats import Stats
390
import time
391
392
class CustomMetricProducer(MetricProducer):
393
"""Custom metric producer for application metrics."""
394
395
def __init__(self):
396
self.stats = Stats()
397
398
def get_metrics(self):
399
"""Get current metrics for export."""
400
# Collect current view data
401
view_datas = self.stats.view_manager.get_all_exported_data()
402
403
metrics = []
404
for view_data in view_datas:
405
# Convert view data to metrics
406
metric = self._convert_view_to_metric(view_data)
407
if metric:
408
metrics.append(metric)
409
410
return metrics
411
412
def _convert_view_to_metric(self, view_data):
413
"""Convert view data to metric format."""
414
# Implementation depends on your specific metric format
415
pass
416
417
class ConsoleMetricExporter:
418
"""Simple console exporter for metrics."""
419
420
def export(self, metrics):
421
"""Export metrics to console."""
422
print(f"=== Metrics Export ({len(metrics)} metrics) ===")
423
for metric in metrics:
424
print(f"Metric: {metric.descriptor.name}")
425
print(f" Description: {metric.descriptor.description}")
426
print(f" Time Series: {len(metric.time_series)}")
427
print("=" * 50)
428
429
# Set up periodic export
430
producer = CustomMetricProducer()
431
exporter = ConsoleMetricExporter()
432
433
# Start periodic export every 30 seconds
434
export_task = get_exporter_thread(
435
metric_producers=[producer],
436
exporter=exporter,
437
interval=30
438
)
439
440
# Let it run for a while
441
try:
442
time.sleep(120) # Run for 2 minutes
443
finally:
444
# Clean shutdown
445
export_task.close()
446
```
447
448
### Advanced Transport Configuration
449
450
```python
451
from opencensus.trace.tracer import Tracer
452
from opencensus.trace import file_exporter
453
from opencensus.common.transports import AsyncTransport
454
import signal
455
import sys
456
457
class GracefulFileExporter:
458
"""File exporter with graceful shutdown handling."""
459
460
def __init__(self, file_path, batch_size=500, flush_interval=60):
461
self.exporter = file_exporter.FileExporter(file_path)
462
self.transport = AsyncTransport(
463
exporter=self.exporter,
464
max_batch_size=batch_size,
465
wait_period=flush_interval,
466
grace_period=10.0
467
)
468
469
# Set up signal handlers for graceful shutdown
470
signal.signal(signal.SIGINT, self._signal_handler)
471
signal.signal(signal.SIGTERM, self._signal_handler)
472
473
def _signal_handler(self, signum, frame):
474
"""Handle shutdown signals gracefully."""
475
print(f"Received signal {signum}, flushing traces...")
476
self.transport.flush()
477
sys.exit(0)
478
479
def export(self, span_datas):
480
"""Export spans through async transport."""
481
for span_data in span_datas:
482
self.transport.export(span_data)
483
484
# Usage in production application
485
exporter = GracefulFileExporter('./production_traces.jsonl')
486
tracer = Tracer(exporter=exporter)
487
488
# Application will gracefully flush traces on shutdown
489
try:
490
while True: # Main application loop
491
with tracer.span('main_operation') as span:
492
# Your application logic
493
process_requests()
494
495
except KeyboardInterrupt:
496
print("Shutting down gracefully...")
497
# Signal handler will flush remaining traces
498
```