Core metrics interfaces and implementations for Apache Flink stream processing framework
—
Experimental distributed tracing capabilities for capturing execution spans across Flink's distributed runtime. Provides span creation, attribute attachment, and pluggable trace reporting for observability in distributed stream processing applications.
Core interface representing a distributed tracing span that captures something that happened in Flink at a certain point in time.
/**
* Span represents something that happened in Flink at certain point of time,
* that will be reported to a TraceReporter. Currently we don't support traces
* with multiple spans. Each span is self-contained and represents things like
* a checkpoint or recovery.
*/
@Experimental
public interface Span {
/**
* Creates a new SpanBuilder for constructing Span instances.
* @param classScope Flink's convention is that the scope of each Span is
* defined by the class that is creating it
* @param name Human readable name of this span
* @return SpanBuilder for constructing the span
*/
static SpanBuilder builder(Class<?> classScope, String name);
/**
* Returns the scope of this span.
* @return scope string (typically class canonical name)
*/
String getScope();
/**
* Returns the name of this span.
* @return human readable span name
*/
String getName();
/**
* Returns the start timestamp in milliseconds.
* @return start timestamp in milliseconds since epoch
*/
long getStartTsMillis();
/**
* Returns the end timestamp in milliseconds.
* @return end timestamp in milliseconds since epoch
*/
long getEndTsMillis();
/**
* Returns the attributes attached to this span.
* Currently returned values can be of type String, Long or Double,
* however more types can be added in the future.
* @return map of attribute names to values
*/
Map<String, Object> getAttributes();
}Usage Examples:
// Basic span creation
public class CheckpointCoordinator {
public void triggerCheckpoint() {
Span checkpointSpan = Span.builder(CheckpointCoordinator.class, "checkpoint")
.setStartTsMillis(System.currentTimeMillis())
.setAttribute("checkpoint-id", checkpointId)
.setAttribute("num-tasks", numberOfTasks)
.build();
try {
performCheckpoint();
// Update span with completion info
checkpointSpan = Span.builder(CheckpointCoordinator.class, "checkpoint")
.setStartTsMillis(startTime)
.setEndTsMillis(System.currentTimeMillis())
.setAttribute("checkpoint-id", checkpointId)
.setAttribute("num-tasks", numberOfTasks)
.setAttribute("success", true)
.setAttribute("duration-ms", System.currentTimeMillis() - startTime)
.build();
} catch (Exception e) {
// Span for failed checkpoint
checkpointSpan = Span.builder(CheckpointCoordinator.class, "checkpoint")
.setStartTsMillis(startTime)
.setEndTsMillis(System.currentTimeMillis())
.setAttribute("checkpoint-id", checkpointId)
.setAttribute("success", false)
.setAttribute("error", e.getMessage())
.build();
}
// Report span
metricGroup.addSpan(checkpointSpan.builder(CheckpointCoordinator.class, "checkpoint"));
}
}
// Span for operator lifecycle events
public class StreamOperatorLifecycle {
public void open() throws Exception {
long startTime = System.currentTimeMillis();
try {
performOpen();
Span openSpan = Span.builder(this.getClass(), "operator-open")
.setStartTsMillis(startTime)
.setEndTsMillis(System.currentTimeMillis())
.setAttribute("operator-name", getOperatorName())
.setAttribute("parallelism", getParallelism())
.setAttribute("subtask-index", getSubtaskIndex())
.setAttribute("success", true)
.build();
reportSpan(openSpan);
} catch (Exception e) {
Span failedOpenSpan = Span.builder(this.getClass(), "operator-open")
.setStartTsMillis(startTime)
.setEndTsMillis(System.currentTimeMillis())
.setAttribute("operator-name", getOperatorName())
.setAttribute("success", false)
.setAttribute("error", e.getMessage())
.build();
reportSpan(failedOpenSpan);
throw e;
}
}
}Builder for constructing Span instances with fluent API for setting timestamps and attributes.
/**
* Builder used to construct Span instances.
*/
@Experimental
public class SpanBuilder {
/**
* Constructor for SpanBuilder.
* @param classScope Flink's convention is that the scope of each Span is
* defined by the class that is creating it. If you are
* building the Span in your class MyClass, as the classScope
* you should pass MyClass.class.
* @param name Human readable name of this span, that describes what the
* built Span will represent.
*/
SpanBuilder(Class<?> classScope, String name);
/**
* Builds the Span instance.
* @return constructed Span
*/
public Span build();
/**
* Optionally you can manually set the Span's startTs. If not specified,
* System.currentTimeMillis() will be used.
* @param startTsMillis start timestamp in milliseconds
* @return this SpanBuilder for method chaining
*/
public SpanBuilder setStartTsMillis(long startTsMillis);
/**
* Optionally you can manually set the Span's endTs. If not specified,
* startTsMillis will be used.
* @param endTsMillis end timestamp in milliseconds
* @return this SpanBuilder for method chaining
*/
public SpanBuilder setEndTsMillis(long endTsMillis);
/**
* Additional string attribute to be attached to this Span.
* @param key attribute key
* @param value string attribute value
* @return this SpanBuilder for method chaining
*/
public SpanBuilder setAttribute(String key, String value);
/**
* Additional long attribute to be attached to this Span.
* @param key attribute key
* @param value long attribute value
* @return this SpanBuilder for method chaining
*/
public SpanBuilder setAttribute(String key, long value);
/**
* Additional double attribute to be attached to this Span.
* @param key attribute key
* @param value double attribute value
* @return this SpanBuilder for method chaining
*/
public SpanBuilder setAttribute(String key, double value);
}Usage Examples:
// Comprehensive span creation
public class TaskExecutor {
public void executeTask(Task task) {
SpanBuilder spanBuilder = Span.builder(TaskExecutor.class, "task-execution")
.setAttribute("task-id", task.getId())
.setAttribute("task-type", task.getType())
.setAttribute("parallelism", task.getParallelism())
.setAttribute("operator-chain-length", task.getOperatorChain().size());
long startTime = System.currentTimeMillis();
spanBuilder.setStartTsMillis(startTime);
try {
Object result = task.execute();
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
Span successSpan = spanBuilder
.setEndTsMillis(endTime)
.setAttribute("success", true)
.setAttribute("duration-ms", duration)
.setAttribute("result-size", getResultSize(result))
.build();
reportSpan(successSpan);
} catch (Exception e) {
long endTime = System.currentTimeMillis();
Span failureSpan = spanBuilder
.setEndTsMillis(endTime)
.setAttribute("success", false)
.setAttribute("error-type", e.getClass().getSimpleName())
.setAttribute("error-message", e.getMessage())
.setAttribute("duration-ms", endTime - startTime)
.build();
reportSpan(failureSpan);
throw e;
}
}
}
// Network operation tracing
public class NetworkClient {
public void sendData(byte[] data, String destination) {
Span networkSpan = Span.builder(NetworkClient.class, "network-send")
.setStartTsMillis(System.currentTimeMillis())
.setAttribute("destination", destination)
.setAttribute("data-size", data.length)
.setAttribute("protocol", "tcp")
.build();
// For immediate operations, start and end can be the same
reportSpan(networkSpan);
}
public CompletableFuture<Response> sendRequestAsync(Request request) {
long startTime = System.currentTimeMillis();
return sendAsync(request)
.whenComplete((response, throwable) -> {
long endTime = System.currentTimeMillis();
SpanBuilder spanBuilder = Span.builder(NetworkClient.class, "async-request")
.setStartTsMillis(startTime)
.setEndTsMillis(endTime)
.setAttribute("request-type", request.getType())
.setAttribute("request-size", request.getSize())
.setAttribute("duration-ms", endTime - startTime);
if (throwable == null) {
Span successSpan = spanBuilder
.setAttribute("success", true)
.setAttribute("response-size", response.getSize())
.setAttribute("status-code", response.getStatusCode())
.build();
reportSpan(successSpan);
} else {
Span errorSpan = spanBuilder
.setAttribute("success", false)
.setAttribute("error", throwable.getMessage())
.build();
reportSpan(errorSpan);
}
});
}
}
// State backend operation tracing
public class StateBackendTracing {
public void checkpoint(CheckpointId checkpointId) {
SpanBuilder checkpointSpan = Span.builder(StateBackendTracing.class, "state-checkpoint")
.setAttribute("checkpoint-id", checkpointId.getValue())
.setAttribute("backend-type", getBackendType());
long startTime = System.currentTimeMillis();
try {
long stateSize = performCheckpoint(checkpointId);
long endTime = System.currentTimeMillis();
Span completedSpan = checkpointSpan
.setStartTsMillis(startTime)
.setEndTsMillis(endTime)
.setAttribute("state-size-bytes", stateSize)
.setAttribute("duration-ms", endTime - startTime)
.setAttribute("success", true)
.build();
reportSpan(completedSpan);
} catch (Exception e) {
Span failedSpan = checkpointSpan
.setStartTsMillis(startTime)
.setEndTsMillis(System.currentTimeMillis())
.setAttribute("success", false)
.setAttribute("error", e.getMessage())
.build();
reportSpan(failedSpan);
throw e;
}
}
}Default implementation of the Span interface.
/**
* Default implementation of Span interface.
*/
class SimpleSpan implements Span {
// Internal implementation constructed by SpanBuilder
}Interface for exporting spans to external tracing systems, similar to MetricReporter but for trace data.
/**
* Trace reporters are used to export Spans to an external backend.
* Reporters are instantiated via a TraceReporterFactory.
*/
@Experimental
public interface TraceReporter {
/**
* Configures this reporter. If the reporter was instantiated generically
* and hence parameter-less, this method is the place where the reporter
* sets its basic fields based on configuration values.
* This method is always called first on a newly instantiated reporter.
* @param config A properties object that contains all parameters set for this reporter
*/
void open(MetricConfig config);
/**
* Closes this reporter. Should be used to close channels, streams and release resources.
*/
void close();
/**
* Called when a new Span is added.
* @param span the span that was added
*/
void notifyOfAddedSpan(Span span);
}Usage Examples:
// Custom trace reporter implementation
public class JaegerTraceReporter implements TraceReporter {
private JaegerTracer tracer;
private String serviceName;
private String jaegerEndpoint;
@Override
public void open(MetricConfig config) {
this.serviceName = config.getString("service.name", "flink-application");
this.jaegerEndpoint = config.getString("jaeger.endpoint", "http://localhost:14268/api/traces");
// Initialize Jaeger tracer
this.tracer = Configuration.fromEnv(serviceName)
.withSampling(Configuration.SamplerConfiguration.fromEnv()
.withType(ConstSampler.TYPE)
.withParam(1))
.withReporter(Configuration.ReporterConfiguration.fromEnv()
.withSender(Configuration.SenderConfiguration.fromEnv()
.withEndpoint(jaegerEndpoint)))
.getTracer();
}
@Override
public void notifyOfAddedSpan(Span span) {
// Convert Flink span to Jaeger span
io.opentracing.Span jaegerSpan = tracer.buildSpan(span.getName())
.withStartTimestamp(span.getStartTsMillis() * 1000) // Convert to microseconds
.start();
// Add attributes as tags
Map<String, Object> attributes = span.getAttributes();
for (Map.Entry<String, Object> entry : attributes.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value instanceof String) {
jaegerSpan.setTag(key, (String) value);
} else if (value instanceof Number) {
jaegerSpan.setTag(key, (Number) value);
} else if (value instanceof Boolean) {
jaegerSpan.setTag(key, (Boolean) value);
}
}
// Set scope as a tag
jaegerSpan.setTag("scope", span.getScope());
// Finish span with end timestamp
jaegerSpan.finish(span.getEndTsMillis() * 1000);
}
@Override
public void close() {
if (tracer != null) {
tracer.close();
}
}
}
// Console trace reporter for debugging
public class ConsoleTraceReporter implements TraceReporter {
private boolean includeAttributes;
private String dateFormat;
@Override
public void open(MetricConfig config) {
this.includeAttributes = config.getBoolean("include.attributes", true);
this.dateFormat = config.getString("date.format", "yyyy-MM-dd HH:mm:ss.SSS");
}
@Override
public void notifyOfAddedSpan(Span span) {
SimpleDateFormat formatter = new SimpleDateFormat(dateFormat);
System.out.println("=== SPAN ===");
System.out.println("Name: " + span.getName());
System.out.println("Scope: " + span.getScope());
System.out.println("Start: " + formatter.format(new Date(span.getStartTsMillis())));
System.out.println("End: " + formatter.format(new Date(span.getEndTsMillis())));
System.out.println("Duration: " + (span.getEndTsMillis() - span.getStartTsMillis()) + "ms");
if (includeAttributes && !span.getAttributes().isEmpty()) {
System.out.println("Attributes:");
span.getAttributes().forEach((key, value) ->
System.out.println(" " + key + ": " + value));
}
System.out.println("============");
}
@Override
public void close() {
System.out.println("Console trace reporter closed");
}
}
// Batching trace reporter
public class BatchingTraceReporter implements TraceReporter {
private final List<Span> spanBuffer = new ArrayList<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private int batchSize;
private long flushInterval;
private String endpoint;
@Override
public void open(MetricConfig config) {
this.batchSize = config.getInteger("batch.size", 100);
this.flushInterval = config.getLong("flush.interval", 5000); // 5 seconds
this.endpoint = config.getString("endpoint", "http://localhost:9411/api/v2/spans");
// Schedule periodic flush
scheduler.scheduleAtFixedRate(this::flushSpans, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
}
@Override
public synchronized void notifyOfAddedSpan(Span span) {
spanBuffer.add(span);
if (spanBuffer.size() >= batchSize) {
flushSpans();
}
}
private synchronized void flushSpans() {
if (spanBuffer.isEmpty()) {
return;
}
List<Span> toFlush = new ArrayList<>(spanBuffer);
spanBuffer.clear();
// Send spans asynchronously
CompletableFuture.runAsync(() -> sendSpans(toFlush));
}
private void sendSpans(List<Span> spans) {
try {
// Convert spans to JSON and send to endpoint
String json = convertSpansToJson(spans);
sendToEndpoint(json);
} catch (Exception e) {
System.err.println("Failed to send spans: " + e.getMessage());
}
}
@Override
public void close() {
flushSpans(); // Flush remaining spans
scheduler.shutdown();
}
}Factory interface for creating trace reporters.
/**
* Factory for creating TraceReporter instances.
*/
@Experimental
public interface TraceReporterFactory {
/**
* Creates a new trace reporter.
* @param properties configured properties for the reporter
* @return created trace reporter
*/
TraceReporter createTraceReporter(Properties properties);
}Usage Examples:
// Factory implementation
public class JaegerTraceReporterFactory implements TraceReporterFactory {
@Override
public TraceReporter createTraceReporter(Properties properties) {
return new JaegerTraceReporter();
}
}
// Configurable factory
public class ConfigurableTraceReporterFactory implements TraceReporterFactory {
@Override
public TraceReporter createTraceReporter(Properties properties) {
String type = properties.getProperty("type", "console");
switch (type.toLowerCase()) {
case "jaeger":
return new JaegerTraceReporter();
case "zipkin":
return new ZipkinTraceReporter();
case "console":
return new ConsoleTraceReporter();
default:
throw new IllegalArgumentException("Unknown trace reporter type: " + type);
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-metrics-core