or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md
tile.json

metrics-api.mddocs/

Apache Spark Catalyst Metrics API

The Apache Spark Catalyst Metrics API provides a comprehensive framework for custom metrics collection in data sources. This API enables data sources to define, collect, and aggregate custom metrics during query execution, providing valuable insights into data source performance and behavior.

Core Metrics Interfaces

CustomMetric

The base interface for defining custom metrics that aggregate task-level metrics at the driver:

package org.apache.spark.sql.connector.metric;

public interface CustomMetric {
    /**
     * Returns the name of custom metric
     */
    String name();
    
    /**
     * Returns the description of custom metric
     */
    String description();
    
    /**
     * The initial value of this metric
     */
    long initialValue = 0L;
    
    /**
     * Given an array of task metric values, returns aggregated final metric value
     */
    String aggregateTaskMetrics(long[] taskMetrics);
}

CustomTaskMetric

Task-level metric representation collected at the executor side:

package org.apache.spark.sql.connector.metric;

public interface CustomTaskMetric {
    /**
     * Returns the name of custom task metric
     */
    String name();
    
    /**
     * Returns the long value of custom task metric
     */
    long value();
}

Built-in Metric Implementations

CustomSumMetric

Abstract base class for metrics that sum up values across tasks:

package org.apache.spark.sql.connector.metric;

public abstract class CustomSumMetric implements CustomMetric {
    @Override
    public String aggregateTaskMetrics(long[] taskMetrics) {
        long sum = 0L;
        for (long taskMetric : taskMetrics) {
            sum += taskMetric;
        }
        return String.valueOf(sum);
    }
}

CustomAvgMetric

Abstract base class for metrics that compute averages across tasks:

package org.apache.spark.sql.connector.metric;
import java.text.DecimalFormat;

public abstract class CustomAvgMetric implements CustomMetric {
    @Override
    public String aggregateTaskMetrics(long[] taskMetrics) {
        if (taskMetrics.length > 0) {
            long sum = 0L;
            for (long taskMetric : taskMetrics) {
                sum += taskMetric;
            }
            double average = ((double) sum) / taskMetrics.length;
            return new DecimalFormat("#0.000").format(average);
        } else {
            return "0";
        }
    }
}

Streaming Metrics Interfaces

ReportsSourceMetrics

Interface for streaming data sources to report metrics:

package org.apache.spark.sql.connector.read.streaming;
import java.util.Map;
import java.util.Optional;

public interface ReportsSourceMetrics extends SparkDataStream {
    /**
     * Returns the metrics reported by the streaming source with respect to
     * the latest consumed offset
     */
    Map<String, String> metrics(Optional<Offset> latestConsumedOffset);
}

ReportsSinkMetrics

Interface for streaming sinks to report metrics:

package org.apache.spark.sql.connector.read.streaming;
import java.util.Map;

public interface ReportsSinkMetrics {
    /**
     * Returns the metrics reported by the sink for this micro-batch
     */
    Map<String, String> metrics();
}

Integration with Data Source APIs

Scan Interface Integration

Data sources integrate metrics through the Scan interface:

package org.apache.spark.sql.connector.read;

public interface Scan {
    /**
     * Returns custom metrics that this scan supports
     */
    default CustomMetric[] supportedCustomMetrics() {
        return new CustomMetric[]{};
    }
    
    /**
     * Returns custom task metrics reported from driver side.
     * Note that these metrics must be included in the supported custom metrics
     * reported by supportedCustomMetrics.
     */
    default CustomTaskMetric[] reportDriverMetrics() {
        return new CustomTaskMetric[]{};
    }
}

PartitionReader Integration

Partition readers report task-level metrics:

package org.apache.spark.sql.connector.read;

public interface PartitionReader<T> extends Closeable {
    /**
     * Returns current custom task metric values
     */
    default CustomTaskMetric[] currentMetricsValues() {
        CustomTaskMetric[] NO_METRICS = {};
        return NO_METRICS;
    }
}

DataWriter Integration

Data writers can also report task-level metrics:

package org.apache.spark.sql.connector.write;

public interface DataWriter<T> {
    /**
     * Returns current custom task metric values
     */
    default CustomTaskMetric[] currentMetricsValues() {
        return new CustomTaskMetric[]{};
    }
}

Complete Implementation Examples

Custom Sum Metric Implementation

import org.apache.spark.sql.connector.metric.CustomSumMetric;

public class RecordsProcessedMetric extends CustomSumMetric {
    @Override
    public String name() {
        return "recordsProcessed";
    }
    
    @Override
    public String description() {
        return "Total number of records processed across all tasks";
    }
}

Custom Average Metric Implementation

import org.apache.spark.sql.connector.metric.CustomAvgMetric;

public class ProcessingTimeAvgMetric extends CustomAvgMetric {
    @Override
    public String name() {
        return "avgProcessingTime";
    }
    
    @Override
    public String description() {
        return "Average processing time per task in milliseconds";
    }
}

Task Metric Implementation

import org.apache.spark.sql.connector.metric.CustomTaskMetric;

public class TaskRecordsProcessed implements CustomTaskMetric {
    private final long recordCount;
    
    public TaskRecordsProcessed(long recordCount) {
        this.recordCount = recordCount;
    }
    
    @Override
    public String name() {
        return "recordsProcessed";
    }
    
    @Override
    public long value() {
        return recordCount;
    }
}

Complete Data Source with Metrics

import org.apache.spark.sql.connector.read.*;
import org.apache.spark.sql.connector.metric.*;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;

public class MyDataSource implements Table, SupportsRead {
    private final StructType schema;
    
    public MyDataSource(StructType schema) {
        this.schema = schema;
    }
    
    @Override
    public String name() {
        return "my-data-source";
    }
    
    @Override
    public StructType schema() {
        return schema;
    }
    
    @Override
    public Set<TableCapability> capabilities() {
        return Set.of(TableCapability.BATCH_READ);
    }
    
    @Override
    public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
        return new MyScanBuilder(schema);
    }
    
    private static class MyScanBuilder implements ScanBuilder {
        private final StructType schema;
        
        public MyScanBuilder(StructType schema) {
            this.schema = schema;
        }
        
        @Override
        public Scan build() {
            return new MyScan(schema);
        }
    }
    
    private static class MyScan implements Scan {
        private final StructType schema;
        
        public MyScan(StructType schema) {
            this.schema = schema;
        }
        
        @Override
        public StructType readSchema() {
            return schema;
        }
        
        @Override
        public CustomMetric[] supportedCustomMetrics() {
            return new CustomMetric[]{
                new RecordsProcessedMetric(),
                new ProcessingTimeAvgMetric()
            };
        }
        
        @Override
        public Batch toBatch() {
            return new MyBatch(schema);
        }
    }
    
    private static class MyBatch implements Batch {
        private final StructType schema;
        
        public MyBatch(StructType schema) {
            this.schema = schema;
        }
        
        @Override
        public InputPartition[] planInputPartitions() {
            return new InputPartition[]{new MyInputPartition()};
        }
        
        @Override
        public PartitionReaderFactory createReaderFactory() {
            return new MyReaderFactory(schema);
        }
    }
    
    private static class MyReaderFactory implements PartitionReaderFactory {
        private final StructType schema;
        
        public MyReaderFactory(StructType schema) {
            this.schema = schema;
        }
        
        @Override
        public PartitionReader<InternalRow> createReader(InputPartition partition) {
            return new MyPartitionReader();
        }
    }
    
    private static class MyPartitionReader implements PartitionReader<InternalRow> {
        private long recordsProcessed = 0;
        private long startTime = System.currentTimeMillis();
        
        @Override
        public boolean next() throws IOException {
            // Read next record logic
            recordsProcessed++;
            return hasMoreRecords();
        }
        
        @Override
        public InternalRow get() {
            // Return current record
            return getCurrentRecord();
        }
        
        @Override
        public CustomTaskMetric[] currentMetricsValues() {
            long processingTime = System.currentTimeMillis() - startTime;
            return new CustomTaskMetric[]{
                new TaskRecordsProcessed(recordsProcessed),
                new TaskProcessingTime(processingTime)
            };
        }
        
        @Override
        public void close() throws IOException {
            // Cleanup resources
        }
        
        private boolean hasMoreRecords() {
            // Implementation specific logic
            return false;
        }
        
        private InternalRow getCurrentRecord() {
            // Implementation specific logic
            return null;
        }
    }
    
    private static class TaskProcessingTime implements CustomTaskMetric {
        private final long processingTime;
        
        public TaskProcessingTime(long processingTime) {
            this.processingTime = processingTime;
        }
        
        @Override
        public String name() {
            return "avgProcessingTime";
        }
        
        @Override
        public long value() {
            return processingTime;
        }
    }
    
    private static class MyInputPartition implements InputPartition {
        // Partition implementation
    }
}

Streaming Data Source with Metrics

import org.apache.spark.sql.connector.read.streaming.*;
import org.apache.spark.sql.connector.metric.*;
import java.util.Map;
import java.util.HashMap;
import java.util.Optional;

public class MyStreamingSource implements SparkDataStream, ReportsSourceMetrics {
    private long totalRecordsRead = 0;
    private long lastBatchRecords = 0;
    
    @Override
    public Map<String, String> metrics(Optional<Offset> latestConsumedOffset) {
        Map<String, String> metrics = new HashMap<>();
        metrics.put("totalRecordsRead", String.valueOf(totalRecordsRead));
        metrics.put("lastBatchRecords", String.valueOf(lastBatchRecords));
        metrics.put("avgRecordsPerBatch", calculateAverageRecordsPerBatch());
        return metrics;
    }
    
    private String calculateAverageRecordsPerBatch() {
        // Calculate average based on historical data
        return "1000";
    }
    
    // Other streaming methods...
}

Special Metric Names

The metrics API recognizes certain special metric names that integrate with Spark's built-in task metrics:

  • bytesWritten: Updates the corresponding task metric for bytes written
  • recordsWritten: Updates the corresponding task metric for records written

When data sources define custom metrics with these names, the values are automatically propagated to Spark's internal task metrics system.

Key Features

Automatic Aggregation

  • Spark automatically collects task metrics from all partitions
  • Driver-side aggregation using the aggregateTaskMetrics method
  • Built-in support for sum and average aggregations

Reflection-Based Instantiation

  • Custom metric classes must have a no-argument constructor
  • Spark uses reflection to instantiate metric classes during aggregation
  • Thread-safe aggregation across distributed tasks

UI Integration

  • Final aggregated metrics appear in the Spark UI
  • Integrated with data source scan operators
  • Streaming metrics available per micro-batch

Extensibility

  • Easy to create custom aggregation logic
  • Support for complex metric calculations
  • Integration with both batch and streaming workloads

Import Statements

To use the Metrics API in your data source implementation, include these imports:

// Core metric interfaces
import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.metric.CustomTaskMetric;

// Built-in metric implementations
import org.apache.spark.sql.connector.metric.CustomSumMetric;
import org.apache.spark.sql.connector.metric.CustomAvgMetric;

// Streaming metrics
import org.apache.spark.sql.connector.read.streaming.ReportsSourceMetrics;
import org.apache.spark.sql.connector.read.streaming.ReportsSinkMetrics;

// Reader integration
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.Scan;

// Writer integration
import org.apache.spark.sql.connector.write.DataWriter;

// Utility imports
import java.util.Map;
import java.util.Optional;
import java.text.DecimalFormat;