CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-metrics-core

Core metrics interfaces and implementations for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

tracing.mddocs/

Tracing Support

Experimental distributed tracing capabilities for capturing execution spans across Flink's distributed runtime. Provides span creation, attribute attachment, and pluggable trace reporting for observability in distributed stream processing applications.

Capabilities

Span Interface

Core interface representing a distributed tracing span that captures something that happened in Flink at a certain point in time.

/**
 * Span represents something that happened in Flink at certain point of time, 
 * that will be reported to a TraceReporter. Currently we don't support traces 
 * with multiple spans. Each span is self-contained and represents things like 
 * a checkpoint or recovery.
 */
@Experimental
public interface Span {
    
    /**
     * Creates a new SpanBuilder for constructing Span instances.
     * @param classScope Flink's convention is that the scope of each Span is 
     *                   defined by the class that is creating it
     * @param name Human readable name of this span
     * @return SpanBuilder for constructing the span
     */
    static SpanBuilder builder(Class<?> classScope, String name);
    
    /**
     * Returns the scope of this span.
     * @return scope string (typically class canonical name)
     */
    String getScope();
    
    /**
     * Returns the name of this span.
     * @return human readable span name
     */
    String getName();
    
    /**
     * Returns the start timestamp in milliseconds.
     * @return start timestamp in milliseconds since epoch
     */
    long getStartTsMillis();
    
    /**
     * Returns the end timestamp in milliseconds.
     * @return end timestamp in milliseconds since epoch
     */
    long getEndTsMillis();
    
    /**
     * Returns the attributes attached to this span.
     * Currently returned values can be of type String, Long or Double, 
     * however more types can be added in the future.
     * @return map of attribute names to values
     */
    Map<String, Object> getAttributes();
}

Usage Examples:

// Basic span creation
public class CheckpointCoordinator {
    
    public void triggerCheckpoint() {
        Span checkpointSpan = Span.builder(CheckpointCoordinator.class, "checkpoint")
            .setStartTsMillis(System.currentTimeMillis())
            .setAttribute("checkpoint-id", checkpointId)
            .setAttribute("num-tasks", numberOfTasks)
            .build();
        
        try {
            performCheckpoint();
            
            // Update span with completion info
            checkpointSpan = Span.builder(CheckpointCoordinator.class, "checkpoint")
                .setStartTsMillis(startTime)
                .setEndTsMillis(System.currentTimeMillis())
                .setAttribute("checkpoint-id", checkpointId)
                .setAttribute("num-tasks", numberOfTasks)
                .setAttribute("success", true)
                .setAttribute("duration-ms", System.currentTimeMillis() - startTime)
                .build();
                
        } catch (Exception e) {
            // Span for failed checkpoint
            checkpointSpan = Span.builder(CheckpointCoordinator.class, "checkpoint")
                .setStartTsMillis(startTime)
                .setEndTsMillis(System.currentTimeMillis())
                .setAttribute("checkpoint-id", checkpointId)
                .setAttribute("success", false)
                .setAttribute("error", e.getMessage())
                .build();
        }
        
        // Report span
        metricGroup.addSpan(checkpointSpan.builder(CheckpointCoordinator.class, "checkpoint"));
    }
}

// Span for operator lifecycle events
public class StreamOperatorLifecycle {
    
    public void open() throws Exception {
        long startTime = System.currentTimeMillis();
        
        try {
            performOpen();
            
            Span openSpan = Span.builder(this.getClass(), "operator-open")
                .setStartTsMillis(startTime)
                .setEndTsMillis(System.currentTimeMillis())
                .setAttribute("operator-name", getOperatorName())
                .setAttribute("parallelism", getParallelism())
                .setAttribute("subtask-index", getSubtaskIndex())
                .setAttribute("success", true)
                .build();
                
            reportSpan(openSpan);
            
        } catch (Exception e) {
            Span failedOpenSpan = Span.builder(this.getClass(), "operator-open")
                .setStartTsMillis(startTime)
                .setEndTsMillis(System.currentTimeMillis())
                .setAttribute("operator-name", getOperatorName())
                .setAttribute("success", false)
                .setAttribute("error", e.getMessage())
                .build();
                
            reportSpan(failedOpenSpan);
            throw e;
        }
    }
}

SpanBuilder Class

Builder for constructing Span instances with fluent API for setting timestamps and attributes.

/**
 * Builder used to construct Span instances.
 */
@Experimental
public class SpanBuilder {
    /**
     * Constructor for SpanBuilder.
     * @param classScope Flink's convention is that the scope of each Span is 
     *                   defined by the class that is creating it. If you are 
     *                   building the Span in your class MyClass, as the classScope 
     *                   you should pass MyClass.class.
     * @param name Human readable name of this span, that describes what the 
     *             built Span will represent.
     */
    SpanBuilder(Class<?> classScope, String name);
    
    /**
     * Builds the Span instance.
     * @return constructed Span
     */
    public Span build();
    
    /**
     * Optionally you can manually set the Span's startTs. If not specified, 
     * System.currentTimeMillis() will be used.
     * @param startTsMillis start timestamp in milliseconds
     * @return this SpanBuilder for method chaining
     */
    public SpanBuilder setStartTsMillis(long startTsMillis);
    
    /**
     * Optionally you can manually set the Span's endTs. If not specified, 
     * startTsMillis will be used.
     * @param endTsMillis end timestamp in milliseconds
     * @return this SpanBuilder for method chaining
     */
    public SpanBuilder setEndTsMillis(long endTsMillis);
    
    /**
     * Additional string attribute to be attached to this Span.
     * @param key attribute key
     * @param value string attribute value
     * @return this SpanBuilder for method chaining
     */
    public SpanBuilder setAttribute(String key, String value);
    
    /**
     * Additional long attribute to be attached to this Span.
     * @param key attribute key
     * @param value long attribute value
     * @return this SpanBuilder for method chaining
     */
    public SpanBuilder setAttribute(String key, long value);
    
    /**
     * Additional double attribute to be attached to this Span.
     * @param key attribute key
     * @param value double attribute value
     * @return this SpanBuilder for method chaining
     */
    public SpanBuilder setAttribute(String key, double value);
}

Usage Examples:

// Comprehensive span creation
public class TaskExecutor {
    
    public void executeTask(Task task) {
        SpanBuilder spanBuilder = Span.builder(TaskExecutor.class, "task-execution")
            .setAttribute("task-id", task.getId())
            .setAttribute("task-type", task.getType())
            .setAttribute("parallelism", task.getParallelism())
            .setAttribute("operator-chain-length", task.getOperatorChain().size());
        
        long startTime = System.currentTimeMillis();
        spanBuilder.setStartTsMillis(startTime);
        
        try {
            Object result = task.execute();
            
            long endTime = System.currentTimeMillis();
            long duration = endTime - startTime;
            
            Span successSpan = spanBuilder
                .setEndTsMillis(endTime)
                .setAttribute("success", true)
                .setAttribute("duration-ms", duration)
                .setAttribute("result-size", getResultSize(result))
                .build();
                
            reportSpan(successSpan);
            
        } catch (Exception e) {
            long endTime = System.currentTimeMillis();
            
            Span failureSpan = spanBuilder
                .setEndTsMillis(endTime)
                .setAttribute("success", false)
                .setAttribute("error-type", e.getClass().getSimpleName())
                .setAttribute("error-message", e.getMessage())
                .setAttribute("duration-ms", endTime - startTime)
                .build();
                
            reportSpan(failureSpan);
            throw e;
        }
    }
}

// Network operation tracing
public class NetworkClient {
    
    public void sendData(byte[] data, String destination) {
        Span networkSpan = Span.builder(NetworkClient.class, "network-send")
            .setStartTsMillis(System.currentTimeMillis())
            .setAttribute("destination", destination)
            .setAttribute("data-size", data.length)
            .setAttribute("protocol", "tcp")
            .build();
            
        // For immediate operations, start and end can be the same
        reportSpan(networkSpan);
    }
    
    public CompletableFuture<Response> sendRequestAsync(Request request) {
        long startTime = System.currentTimeMillis();
        
        return sendAsync(request)
            .whenComplete((response, throwable) -> {
                long endTime = System.currentTimeMillis();
                
                SpanBuilder spanBuilder = Span.builder(NetworkClient.class, "async-request")
                    .setStartTsMillis(startTime)
                    .setEndTsMillis(endTime)
                    .setAttribute("request-type", request.getType())
                    .setAttribute("request-size", request.getSize())
                    .setAttribute("duration-ms", endTime - startTime);
                
                if (throwable == null) {
                    Span successSpan = spanBuilder
                        .setAttribute("success", true)
                        .setAttribute("response-size", response.getSize())
                        .setAttribute("status-code", response.getStatusCode())
                        .build();
                    reportSpan(successSpan);
                } else {
                    Span errorSpan = spanBuilder
                        .setAttribute("success", false)
                        .setAttribute("error", throwable.getMessage())
                        .build();
                    reportSpan(errorSpan);
                }
            });
    }
}

// State backend operation tracing
public class StateBackendTracing {
    
    public void checkpoint(CheckpointId checkpointId) {
        SpanBuilder checkpointSpan = Span.builder(StateBackendTracing.class, "state-checkpoint")
            .setAttribute("checkpoint-id", checkpointId.getValue())
            .setAttribute("backend-type", getBackendType());
        
        long startTime = System.currentTimeMillis();
        
        try {
            long stateSize = performCheckpoint(checkpointId);
            long endTime = System.currentTimeMillis();
            
            Span completedSpan = checkpointSpan
                .setStartTsMillis(startTime)
                .setEndTsMillis(endTime)
                .setAttribute("state-size-bytes", stateSize)
                .setAttribute("duration-ms", endTime - startTime)
                .setAttribute("success", true)
                .build();
                
            reportSpan(completedSpan);
            
        } catch (Exception e) {
            Span failedSpan = checkpointSpan
                .setStartTsMillis(startTime)
                .setEndTsMillis(System.currentTimeMillis())
                .setAttribute("success", false)
                .setAttribute("error", e.getMessage())
                .build();
                
            reportSpan(failedSpan);
            throw e;
        }
    }
}

SimpleSpan Implementation

Default implementation of the Span interface.

/**
 * Default implementation of Span interface.
 */
class SimpleSpan implements Span {
    // Internal implementation constructed by SpanBuilder
}

TraceReporter Interface

Interface for exporting spans to external tracing systems, similar to MetricReporter but for trace data.

/**
 * Trace reporters are used to export Spans to an external backend.
 * Reporters are instantiated via a TraceReporterFactory.
 */
@Experimental
public interface TraceReporter {
    
    /**
     * Configures this reporter. If the reporter was instantiated generically 
     * and hence parameter-less, this method is the place where the reporter 
     * sets its basic fields based on configuration values.
     * This method is always called first on a newly instantiated reporter.
     * @param config A properties object that contains all parameters set for this reporter
     */
    void open(MetricConfig config);
    
    /**
     * Closes this reporter. Should be used to close channels, streams and release resources.
     */
    void close();
    
    /**
     * Called when a new Span is added.
     * @param span the span that was added
     */
    void notifyOfAddedSpan(Span span);
}

Usage Examples:

// Custom trace reporter implementation
public class JaegerTraceReporter implements TraceReporter {
    private JaegerTracer tracer;
    private String serviceName;
    private String jaegerEndpoint;
    
    @Override
    public void open(MetricConfig config) {
        this.serviceName = config.getString("service.name", "flink-application");
        this.jaegerEndpoint = config.getString("jaeger.endpoint", "http://localhost:14268/api/traces");
        
        // Initialize Jaeger tracer
        this.tracer = Configuration.fromEnv(serviceName)
            .withSampling(Configuration.SamplerConfiguration.fromEnv()
                .withType(ConstSampler.TYPE)
                .withParam(1))
            .withReporter(Configuration.ReporterConfiguration.fromEnv()
                .withSender(Configuration.SenderConfiguration.fromEnv()
                    .withEndpoint(jaegerEndpoint)))
            .getTracer();
    }
    
    @Override
    public void notifyOfAddedSpan(Span span) {
        // Convert Flink span to Jaeger span
        io.opentracing.Span jaegerSpan = tracer.buildSpan(span.getName())
            .withStartTimestamp(span.getStartTsMillis() * 1000) // Convert to microseconds
            .start();
        
        // Add attributes as tags
        Map<String, Object> attributes = span.getAttributes();
        for (Map.Entry<String, Object> entry : attributes.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            
            if (value instanceof String) {
                jaegerSpan.setTag(key, (String) value);
            } else if (value instanceof Number) {
                jaegerSpan.setTag(key, (Number) value);
            } else if (value instanceof Boolean) {
                jaegerSpan.setTag(key, (Boolean) value);
            }
        }
        
        // Set scope as a tag
        jaegerSpan.setTag("scope", span.getScope());
        
        // Finish span with end timestamp
        jaegerSpan.finish(span.getEndTsMillis() * 1000);
    }
    
    @Override
    public void close() {
        if (tracer != null) {
            tracer.close();
        }
    }
}

// Console trace reporter for debugging
public class ConsoleTraceReporter implements TraceReporter {
    private boolean includeAttributes;
    private String dateFormat;
    
    @Override
    public void open(MetricConfig config) {
        this.includeAttributes = config.getBoolean("include.attributes", true);
        this.dateFormat = config.getString("date.format", "yyyy-MM-dd HH:mm:ss.SSS");
    }
    
    @Override
    public void notifyOfAddedSpan(Span span) {
        SimpleDateFormat formatter = new SimpleDateFormat(dateFormat);
        
        System.out.println("=== SPAN ===");
        System.out.println("Name: " + span.getName());
        System.out.println("Scope: " + span.getScope());
        System.out.println("Start: " + formatter.format(new Date(span.getStartTsMillis())));
        System.out.println("End: " + formatter.format(new Date(span.getEndTsMillis())));
        System.out.println("Duration: " + (span.getEndTsMillis() - span.getStartTsMillis()) + "ms");
        
        if (includeAttributes && !span.getAttributes().isEmpty()) {
            System.out.println("Attributes:");
            span.getAttributes().forEach((key, value) -> 
                System.out.println("  " + key + ": " + value));
        }
        
        System.out.println("============");
    }
    
    @Override
    public void close() {
        System.out.println("Console trace reporter closed");
    }
}

// Batching trace reporter
public class BatchingTraceReporter implements TraceReporter {
    private final List<Span> spanBuffer = new ArrayList<>();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private int batchSize;
    private long flushInterval;
    private String endpoint;
    
    @Override
    public void open(MetricConfig config) {
        this.batchSize = config.getInteger("batch.size", 100);
        this.flushInterval = config.getLong("flush.interval", 5000); // 5 seconds
        this.endpoint = config.getString("endpoint", "http://localhost:9411/api/v2/spans");
        
        // Schedule periodic flush
        scheduler.scheduleAtFixedRate(this::flushSpans, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
    }
    
    @Override
    public synchronized void notifyOfAddedSpan(Span span) {
        spanBuffer.add(span);
        
        if (spanBuffer.size() >= batchSize) {
            flushSpans();
        }
    }
    
    private synchronized void flushSpans() {
        if (spanBuffer.isEmpty()) {
            return;
        }
        
        List<Span> toFlush = new ArrayList<>(spanBuffer);
        spanBuffer.clear();
        
        // Send spans asynchronously
        CompletableFuture.runAsync(() -> sendSpans(toFlush));
    }
    
    private void sendSpans(List<Span> spans) {
        try {
            // Convert spans to JSON and send to endpoint
            String json = convertSpansToJson(spans);
            sendToEndpoint(json);
        } catch (Exception e) {
            System.err.println("Failed to send spans: " + e.getMessage());
        }
    }
    
    @Override
    public void close() {
        flushSpans(); // Flush remaining spans
        scheduler.shutdown();
    }
}

TraceReporterFactory Interface

Factory interface for creating trace reporters.

/**
 * Factory for creating TraceReporter instances.
 */
@Experimental
public interface TraceReporterFactory {
    /**
     * Creates a new trace reporter.
     * @param properties configured properties for the reporter
     * @return created trace reporter
     */
    TraceReporter createTraceReporter(Properties properties);
}

Usage Examples:

// Factory implementation
public class JaegerTraceReporterFactory implements TraceReporterFactory {
    @Override
    public TraceReporter createTraceReporter(Properties properties) {
        return new JaegerTraceReporter();
    }
}

// Configurable factory
public class ConfigurableTraceReporterFactory implements TraceReporterFactory {
    @Override
    public TraceReporter createTraceReporter(Properties properties) {
        String type = properties.getProperty("type", "console");
        
        switch (type.toLowerCase()) {
            case "jaeger":
                return new JaegerTraceReporter();
            case "zipkin":
                return new ZipkinTraceReporter();
            case "console":
                return new ConsoleTraceReporter();
            default:
                throw new IllegalArgumentException("Unknown trace reporter type: " + type);
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-metrics-core

docs

configuration.md

core-metrics.md

implementations.md

index.md

metric-groups.md

reporters.md

specialized-groups.md

tracing.md

tile.json