0
# Tracing Support
1
2
Experimental distributed tracing capabilities for capturing execution spans across Flink's distributed runtime. Provides span creation, attribute attachment, and pluggable trace reporting for observability in distributed stream processing applications.
3
4
## Capabilities
5
6
### Span Interface
7
8
Core interface representing a distributed tracing span that captures something that happened in Flink at a certain point in time.
9
10
```java { .api }
11
/**
12
* Span represents something that happened in Flink at certain point of time,
13
* that will be reported to a TraceReporter. Currently we don't support traces
14
* with multiple spans. Each span is self-contained and represents things like
15
* a checkpoint or recovery.
16
*/
17
@Experimental
18
public interface Span {
19
20
/**
21
* Creates a new SpanBuilder for constructing Span instances.
22
* @param classScope Flink's convention is that the scope of each Span is
23
* defined by the class that is creating it
24
* @param name Human readable name of this span
25
* @return SpanBuilder for constructing the span
26
*/
27
static SpanBuilder builder(Class<?> classScope, String name);
28
29
/**
30
* Returns the scope of this span.
31
* @return scope string (typically class canonical name)
32
*/
33
String getScope();
34
35
/**
36
* Returns the name of this span.
37
* @return human readable span name
38
*/
39
String getName();
40
41
/**
42
* Returns the start timestamp in milliseconds.
43
* @return start timestamp in milliseconds since epoch
44
*/
45
long getStartTsMillis();
46
47
/**
48
* Returns the end timestamp in milliseconds.
49
* @return end timestamp in milliseconds since epoch
50
*/
51
long getEndTsMillis();
52
53
/**
54
* Returns the attributes attached to this span.
55
* Currently returned values can be of type String, Long or Double,
56
* however more types can be added in the future.
57
* @return map of attribute names to values
58
*/
59
Map<String, Object> getAttributes();
60
}
61
```
62
63
**Usage Examples:**
64
65
```java
66
// Basic span creation
67
public class CheckpointCoordinator {
68
69
public void triggerCheckpoint() {
70
Span checkpointSpan = Span.builder(CheckpointCoordinator.class, "checkpoint")
71
.setStartTsMillis(System.currentTimeMillis())
72
.setAttribute("checkpoint-id", checkpointId)
73
.setAttribute("num-tasks", numberOfTasks)
74
.build();
75
76
try {
77
performCheckpoint();
78
79
// Update span with completion info
80
checkpointSpan = Span.builder(CheckpointCoordinator.class, "checkpoint")
81
.setStartTsMillis(startTime)
82
.setEndTsMillis(System.currentTimeMillis())
83
.setAttribute("checkpoint-id", checkpointId)
84
.setAttribute("num-tasks", numberOfTasks)
85
.setAttribute("success", true)
86
.setAttribute("duration-ms", System.currentTimeMillis() - startTime)
87
.build();
88
89
} catch (Exception e) {
90
// Span for failed checkpoint
91
checkpointSpan = Span.builder(CheckpointCoordinator.class, "checkpoint")
92
.setStartTsMillis(startTime)
93
.setEndTsMillis(System.currentTimeMillis())
94
.setAttribute("checkpoint-id", checkpointId)
95
.setAttribute("success", false)
96
.setAttribute("error", e.getMessage())
97
.build();
98
}
99
100
// Report span
101
metricGroup.addSpan(checkpointSpan.builder(CheckpointCoordinator.class, "checkpoint"));
102
}
103
}
104
105
// Span for operator lifecycle events
106
public class StreamOperatorLifecycle {
107
108
public void open() throws Exception {
109
long startTime = System.currentTimeMillis();
110
111
try {
112
performOpen();
113
114
Span openSpan = Span.builder(this.getClass(), "operator-open")
115
.setStartTsMillis(startTime)
116
.setEndTsMillis(System.currentTimeMillis())
117
.setAttribute("operator-name", getOperatorName())
118
.setAttribute("parallelism", getParallelism())
119
.setAttribute("subtask-index", getSubtaskIndex())
120
.setAttribute("success", true)
121
.build();
122
123
reportSpan(openSpan);
124
125
} catch (Exception e) {
126
Span failedOpenSpan = Span.builder(this.getClass(), "operator-open")
127
.setStartTsMillis(startTime)
128
.setEndTsMillis(System.currentTimeMillis())
129
.setAttribute("operator-name", getOperatorName())
130
.setAttribute("success", false)
131
.setAttribute("error", e.getMessage())
132
.build();
133
134
reportSpan(failedOpenSpan);
135
throw e;
136
}
137
}
138
}
139
```
140
141
### SpanBuilder Class
142
143
Builder for constructing Span instances with fluent API for setting timestamps and attributes.
144
145
```java { .api }
146
/**
147
* Builder used to construct Span instances.
148
*/
149
@Experimental
150
public class SpanBuilder {
151
/**
152
* Constructor for SpanBuilder.
153
* @param classScope Flink's convention is that the scope of each Span is
154
* defined by the class that is creating it. If you are
155
* building the Span in your class MyClass, as the classScope
156
* you should pass MyClass.class.
157
* @param name Human readable name of this span, that describes what the
158
* built Span will represent.
159
*/
160
SpanBuilder(Class<?> classScope, String name);
161
162
/**
163
* Builds the Span instance.
164
* @return constructed Span
165
*/
166
public Span build();
167
168
/**
169
* Optionally you can manually set the Span's startTs. If not specified,
170
* System.currentTimeMillis() will be used.
171
* @param startTsMillis start timestamp in milliseconds
172
* @return this SpanBuilder for method chaining
173
*/
174
public SpanBuilder setStartTsMillis(long startTsMillis);
175
176
/**
177
* Optionally you can manually set the Span's endTs. If not specified,
178
* startTsMillis will be used.
179
* @param endTsMillis end timestamp in milliseconds
180
* @return this SpanBuilder for method chaining
181
*/
182
public SpanBuilder setEndTsMillis(long endTsMillis);
183
184
/**
185
* Additional string attribute to be attached to this Span.
186
* @param key attribute key
187
* @param value string attribute value
188
* @return this SpanBuilder for method chaining
189
*/
190
public SpanBuilder setAttribute(String key, String value);
191
192
/**
193
* Additional long attribute to be attached to this Span.
194
* @param key attribute key
195
* @param value long attribute value
196
* @return this SpanBuilder for method chaining
197
*/
198
public SpanBuilder setAttribute(String key, long value);
199
200
/**
201
* Additional double attribute to be attached to this Span.
202
* @param key attribute key
203
* @param value double attribute value
204
* @return this SpanBuilder for method chaining
205
*/
206
public SpanBuilder setAttribute(String key, double value);
207
}
208
```
209
210
**Usage Examples:**
211
212
```java
213
// Comprehensive span creation
214
public class TaskExecutor {
215
216
public void executeTask(Task task) {
217
SpanBuilder spanBuilder = Span.builder(TaskExecutor.class, "task-execution")
218
.setAttribute("task-id", task.getId())
219
.setAttribute("task-type", task.getType())
220
.setAttribute("parallelism", task.getParallelism())
221
.setAttribute("operator-chain-length", task.getOperatorChain().size());
222
223
long startTime = System.currentTimeMillis();
224
spanBuilder.setStartTsMillis(startTime);
225
226
try {
227
Object result = task.execute();
228
229
long endTime = System.currentTimeMillis();
230
long duration = endTime - startTime;
231
232
Span successSpan = spanBuilder
233
.setEndTsMillis(endTime)
234
.setAttribute("success", true)
235
.setAttribute("duration-ms", duration)
236
.setAttribute("result-size", getResultSize(result))
237
.build();
238
239
reportSpan(successSpan);
240
241
} catch (Exception e) {
242
long endTime = System.currentTimeMillis();
243
244
Span failureSpan = spanBuilder
245
.setEndTsMillis(endTime)
246
.setAttribute("success", false)
247
.setAttribute("error-type", e.getClass().getSimpleName())
248
.setAttribute("error-message", e.getMessage())
249
.setAttribute("duration-ms", endTime - startTime)
250
.build();
251
252
reportSpan(failureSpan);
253
throw e;
254
}
255
}
256
}
257
258
// Network operation tracing
259
public class NetworkClient {
260
261
public void sendData(byte[] data, String destination) {
262
Span networkSpan = Span.builder(NetworkClient.class, "network-send")
263
.setStartTsMillis(System.currentTimeMillis())
264
.setAttribute("destination", destination)
265
.setAttribute("data-size", data.length)
266
.setAttribute("protocol", "tcp")
267
.build();
268
269
// For immediate operations, start and end can be the same
270
reportSpan(networkSpan);
271
}
272
273
public CompletableFuture<Response> sendRequestAsync(Request request) {
274
long startTime = System.currentTimeMillis();
275
276
return sendAsync(request)
277
.whenComplete((response, throwable) -> {
278
long endTime = System.currentTimeMillis();
279
280
SpanBuilder spanBuilder = Span.builder(NetworkClient.class, "async-request")
281
.setStartTsMillis(startTime)
282
.setEndTsMillis(endTime)
283
.setAttribute("request-type", request.getType())
284
.setAttribute("request-size", request.getSize())
285
.setAttribute("duration-ms", endTime - startTime);
286
287
if (throwable == null) {
288
Span successSpan = spanBuilder
289
.setAttribute("success", true)
290
.setAttribute("response-size", response.getSize())
291
.setAttribute("status-code", response.getStatusCode())
292
.build();
293
reportSpan(successSpan);
294
} else {
295
Span errorSpan = spanBuilder
296
.setAttribute("success", false)
297
.setAttribute("error", throwable.getMessage())
298
.build();
299
reportSpan(errorSpan);
300
}
301
});
302
}
303
}
304
305
// State backend operation tracing
306
public class StateBackendTracing {
307
308
public void checkpoint(CheckpointId checkpointId) {
309
SpanBuilder checkpointSpan = Span.builder(StateBackendTracing.class, "state-checkpoint")
310
.setAttribute("checkpoint-id", checkpointId.getValue())
311
.setAttribute("backend-type", getBackendType());
312
313
long startTime = System.currentTimeMillis();
314
315
try {
316
long stateSize = performCheckpoint(checkpointId);
317
long endTime = System.currentTimeMillis();
318
319
Span completedSpan = checkpointSpan
320
.setStartTsMillis(startTime)
321
.setEndTsMillis(endTime)
322
.setAttribute("state-size-bytes", stateSize)
323
.setAttribute("duration-ms", endTime - startTime)
324
.setAttribute("success", true)
325
.build();
326
327
reportSpan(completedSpan);
328
329
} catch (Exception e) {
330
Span failedSpan = checkpointSpan
331
.setStartTsMillis(startTime)
332
.setEndTsMillis(System.currentTimeMillis())
333
.setAttribute("success", false)
334
.setAttribute("error", e.getMessage())
335
.build();
336
337
reportSpan(failedSpan);
338
throw e;
339
}
340
}
341
}
342
```
343
344
### SimpleSpan Implementation
345
346
Default implementation of the Span interface.
347
348
```java { .api }
349
/**
350
* Default implementation of Span interface.
351
*/
352
class SimpleSpan implements Span {
353
// Internal implementation constructed by SpanBuilder
354
}
355
```
356
357
### TraceReporter Interface
358
359
Interface for exporting spans to external tracing systems, similar to MetricReporter but for trace data.
360
361
```java { .api }
362
/**
363
* Trace reporters are used to export Spans to an external backend.
364
* Reporters are instantiated via a TraceReporterFactory.
365
*/
366
@Experimental
367
public interface TraceReporter {
368
369
/**
370
* Configures this reporter. If the reporter was instantiated generically
371
* and hence parameter-less, this method is the place where the reporter
372
* sets its basic fields based on configuration values.
373
* This method is always called first on a newly instantiated reporter.
374
* @param config A properties object that contains all parameters set for this reporter
375
*/
376
void open(MetricConfig config);
377
378
/**
379
* Closes this reporter. Should be used to close channels, streams and release resources.
380
*/
381
void close();
382
383
/**
384
* Called when a new Span is added.
385
* @param span the span that was added
386
*/
387
void notifyOfAddedSpan(Span span);
388
}
389
```
390
391
**Usage Examples:**
392
393
```java
394
// Custom trace reporter implementation
395
public class JaegerTraceReporter implements TraceReporter {
396
private JaegerTracer tracer;
397
private String serviceName;
398
private String jaegerEndpoint;
399
400
@Override
401
public void open(MetricConfig config) {
402
this.serviceName = config.getString("service.name", "flink-application");
403
this.jaegerEndpoint = config.getString("jaeger.endpoint", "http://localhost:14268/api/traces");
404
405
// Initialize Jaeger tracer
406
this.tracer = Configuration.fromEnv(serviceName)
407
.withSampling(Configuration.SamplerConfiguration.fromEnv()
408
.withType(ConstSampler.TYPE)
409
.withParam(1))
410
.withReporter(Configuration.ReporterConfiguration.fromEnv()
411
.withSender(Configuration.SenderConfiguration.fromEnv()
412
.withEndpoint(jaegerEndpoint)))
413
.getTracer();
414
}
415
416
@Override
417
public void notifyOfAddedSpan(Span span) {
418
// Convert Flink span to Jaeger span
419
io.opentracing.Span jaegerSpan = tracer.buildSpan(span.getName())
420
.withStartTimestamp(span.getStartTsMillis() * 1000) // Convert to microseconds
421
.start();
422
423
// Add attributes as tags
424
Map<String, Object> attributes = span.getAttributes();
425
for (Map.Entry<String, Object> entry : attributes.entrySet()) {
426
String key = entry.getKey();
427
Object value = entry.getValue();
428
429
if (value instanceof String) {
430
jaegerSpan.setTag(key, (String) value);
431
} else if (value instanceof Number) {
432
jaegerSpan.setTag(key, (Number) value);
433
} else if (value instanceof Boolean) {
434
jaegerSpan.setTag(key, (Boolean) value);
435
}
436
}
437
438
// Set scope as a tag
439
jaegerSpan.setTag("scope", span.getScope());
440
441
// Finish span with end timestamp
442
jaegerSpan.finish(span.getEndTsMillis() * 1000);
443
}
444
445
@Override
446
public void close() {
447
if (tracer != null) {
448
tracer.close();
449
}
450
}
451
}
452
453
// Console trace reporter for debugging
454
public class ConsoleTraceReporter implements TraceReporter {
455
private boolean includeAttributes;
456
private String dateFormat;
457
458
@Override
459
public void open(MetricConfig config) {
460
this.includeAttributes = config.getBoolean("include.attributes", true);
461
this.dateFormat = config.getString("date.format", "yyyy-MM-dd HH:mm:ss.SSS");
462
}
463
464
@Override
465
public void notifyOfAddedSpan(Span span) {
466
SimpleDateFormat formatter = new SimpleDateFormat(dateFormat);
467
468
System.out.println("=== SPAN ===");
469
System.out.println("Name: " + span.getName());
470
System.out.println("Scope: " + span.getScope());
471
System.out.println("Start: " + formatter.format(new Date(span.getStartTsMillis())));
472
System.out.println("End: " + formatter.format(new Date(span.getEndTsMillis())));
473
System.out.println("Duration: " + (span.getEndTsMillis() - span.getStartTsMillis()) + "ms");
474
475
if (includeAttributes && !span.getAttributes().isEmpty()) {
476
System.out.println("Attributes:");
477
span.getAttributes().forEach((key, value) ->
478
System.out.println(" " + key + ": " + value));
479
}
480
481
System.out.println("============");
482
}
483
484
@Override
485
public void close() {
486
System.out.println("Console trace reporter closed");
487
}
488
}
489
490
// Batching trace reporter
491
public class BatchingTraceReporter implements TraceReporter {
492
private final List<Span> spanBuffer = new ArrayList<>();
493
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
494
private int batchSize;
495
private long flushInterval;
496
private String endpoint;
497
498
@Override
499
public void open(MetricConfig config) {
500
this.batchSize = config.getInteger("batch.size", 100);
501
this.flushInterval = config.getLong("flush.interval", 5000); // 5 seconds
502
this.endpoint = config.getString("endpoint", "http://localhost:9411/api/v2/spans");
503
504
// Schedule periodic flush
505
scheduler.scheduleAtFixedRate(this::flushSpans, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
506
}
507
508
@Override
509
public synchronized void notifyOfAddedSpan(Span span) {
510
spanBuffer.add(span);
511
512
if (spanBuffer.size() >= batchSize) {
513
flushSpans();
514
}
515
}
516
517
private synchronized void flushSpans() {
518
if (spanBuffer.isEmpty()) {
519
return;
520
}
521
522
List<Span> toFlush = new ArrayList<>(spanBuffer);
523
spanBuffer.clear();
524
525
// Send spans asynchronously
526
CompletableFuture.runAsync(() -> sendSpans(toFlush));
527
}
528
529
private void sendSpans(List<Span> spans) {
530
try {
531
// Convert spans to JSON and send to endpoint
532
String json = convertSpansToJson(spans);
533
sendToEndpoint(json);
534
} catch (Exception e) {
535
System.err.println("Failed to send spans: " + e.getMessage());
536
}
537
}
538
539
@Override
540
public void close() {
541
flushSpans(); // Flush remaining spans
542
scheduler.shutdown();
543
}
544
}
545
```
546
547
### TraceReporterFactory Interface
548
549
Factory interface for creating trace reporters.
550
551
```java { .api }
552
/**
553
* Factory for creating TraceReporter instances.
554
*/
555
@Experimental
556
public interface TraceReporterFactory {
557
/**
558
* Creates a new trace reporter.
559
* @param properties configured properties for the reporter
560
* @return created trace reporter
561
*/
562
TraceReporter createTraceReporter(Properties properties);
563
}
564
```
565
566
**Usage Examples:**
567
568
```java
569
// Factory implementation
570
public class JaegerTraceReporterFactory implements TraceReporterFactory {
571
@Override
572
public TraceReporter createTraceReporter(Properties properties) {
573
return new JaegerTraceReporter();
574
}
575
}
576
577
// Configurable factory
578
public class ConfigurableTraceReporterFactory implements TraceReporterFactory {
579
@Override
580
public TraceReporter createTraceReporter(Properties properties) {
581
String type = properties.getProperty("type", "console");
582
583
switch (type.toLowerCase()) {
584
case "jaeger":
585
return new JaegerTraceReporter();
586
case "zipkin":
587
return new ZipkinTraceReporter();
588
case "console":
589
return new ConsoleTraceReporter();
590
default:
591
throw new IllegalArgumentException("Unknown trace reporter type: " + type);
592
}
593
}
594
}
595
```