The Apache Spark Catalyst Metrics API provides a comprehensive framework for custom metrics collection in data sources. This API enables data sources to define, collect, and aggregate custom metrics during query execution, providing valuable insights into data source performance and behavior.
The base interface for defining custom metrics that aggregate task-level metrics at the driver:
package org.apache.spark.sql.connector.metric;
public interface CustomMetric {
/**
* Returns the name of custom metric
*/
String name();
/**
* Returns the description of custom metric
*/
String description();
/**
* The initial value of this metric
*/
long initialValue = 0L;
/**
* Given an array of task metric values, returns aggregated final metric value
*/
String aggregateTaskMetrics(long[] taskMetrics);
}Task-level metric representation collected at the executor side:
package org.apache.spark.sql.connector.metric;
public interface CustomTaskMetric {
/**
* Returns the name of custom task metric
*/
String name();
/**
* Returns the long value of custom task metric
*/
long value();
}Abstract base class for metrics that sum up values across tasks:
package org.apache.spark.sql.connector.metric;
public abstract class CustomSumMetric implements CustomMetric {
@Override
public String aggregateTaskMetrics(long[] taskMetrics) {
long sum = 0L;
for (long taskMetric : taskMetrics) {
sum += taskMetric;
}
return String.valueOf(sum);
}
}Abstract base class for metrics that compute averages across tasks:
package org.apache.spark.sql.connector.metric;
import java.text.DecimalFormat;
public abstract class CustomAvgMetric implements CustomMetric {
@Override
public String aggregateTaskMetrics(long[] taskMetrics) {
if (taskMetrics.length > 0) {
long sum = 0L;
for (long taskMetric : taskMetrics) {
sum += taskMetric;
}
double average = ((double) sum) / taskMetrics.length;
return new DecimalFormat("#0.000").format(average);
} else {
return "0";
}
}
}Interface for streaming data sources to report metrics:
package org.apache.spark.sql.connector.read.streaming;
import java.util.Map;
import java.util.Optional;
public interface ReportsSourceMetrics extends SparkDataStream {
/**
* Returns the metrics reported by the streaming source with respect to
* the latest consumed offset
*/
Map<String, String> metrics(Optional<Offset> latestConsumedOffset);
}Interface for streaming sinks to report metrics:
package org.apache.spark.sql.connector.read.streaming;
import java.util.Map;
public interface ReportsSinkMetrics {
/**
* Returns the metrics reported by the sink for this micro-batch
*/
Map<String, String> metrics();
}Data sources integrate metrics through the Scan interface:
package org.apache.spark.sql.connector.read;
public interface Scan {
/**
* Returns custom metrics that this scan supports
*/
default CustomMetric[] supportedCustomMetrics() {
return new CustomMetric[]{};
}
/**
* Returns custom task metrics reported from driver side.
* Note that these metrics must be included in the supported custom metrics
* reported by supportedCustomMetrics.
*/
default CustomTaskMetric[] reportDriverMetrics() {
return new CustomTaskMetric[]{};
}
}Partition readers report task-level metrics:
package org.apache.spark.sql.connector.read;
public interface PartitionReader<T> extends Closeable {
/**
* Returns current custom task metric values
*/
default CustomTaskMetric[] currentMetricsValues() {
CustomTaskMetric[] NO_METRICS = {};
return NO_METRICS;
}
}Data writers can also report task-level metrics:
package org.apache.spark.sql.connector.write;
public interface DataWriter<T> {
/**
* Returns current custom task metric values
*/
default CustomTaskMetric[] currentMetricsValues() {
return new CustomTaskMetric[]{};
}
}import org.apache.spark.sql.connector.metric.CustomSumMetric;
public class RecordsProcessedMetric extends CustomSumMetric {
@Override
public String name() {
return "recordsProcessed";
}
@Override
public String description() {
return "Total number of records processed across all tasks";
}
}import org.apache.spark.sql.connector.metric.CustomAvgMetric;
public class ProcessingTimeAvgMetric extends CustomAvgMetric {
@Override
public String name() {
return "avgProcessingTime";
}
@Override
public String description() {
return "Average processing time per task in milliseconds";
}
}import org.apache.spark.sql.connector.metric.CustomTaskMetric;
public class TaskRecordsProcessed implements CustomTaskMetric {
private final long recordCount;
public TaskRecordsProcessed(long recordCount) {
this.recordCount = recordCount;
}
@Override
public String name() {
return "recordsProcessed";
}
@Override
public long value() {
return recordCount;
}
}import org.apache.spark.sql.connector.read.*;
import org.apache.spark.sql.connector.metric.*;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
public class MyDataSource implements Table, SupportsRead {
private final StructType schema;
public MyDataSource(StructType schema) {
this.schema = schema;
}
@Override
public String name() {
return "my-data-source";
}
@Override
public StructType schema() {
return schema;
}
@Override
public Set<TableCapability> capabilities() {
return Set.of(TableCapability.BATCH_READ);
}
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new MyScanBuilder(schema);
}
private static class MyScanBuilder implements ScanBuilder {
private final StructType schema;
public MyScanBuilder(StructType schema) {
this.schema = schema;
}
@Override
public Scan build() {
return new MyScan(schema);
}
}
private static class MyScan implements Scan {
private final StructType schema;
public MyScan(StructType schema) {
this.schema = schema;
}
@Override
public StructType readSchema() {
return schema;
}
@Override
public CustomMetric[] supportedCustomMetrics() {
return new CustomMetric[]{
new RecordsProcessedMetric(),
new ProcessingTimeAvgMetric()
};
}
@Override
public Batch toBatch() {
return new MyBatch(schema);
}
}
private static class MyBatch implements Batch {
private final StructType schema;
public MyBatch(StructType schema) {
this.schema = schema;
}
@Override
public InputPartition[] planInputPartitions() {
return new InputPartition[]{new MyInputPartition()};
}
@Override
public PartitionReaderFactory createReaderFactory() {
return new MyReaderFactory(schema);
}
}
private static class MyReaderFactory implements PartitionReaderFactory {
private final StructType schema;
public MyReaderFactory(StructType schema) {
this.schema = schema;
}
@Override
public PartitionReader<InternalRow> createReader(InputPartition partition) {
return new MyPartitionReader();
}
}
private static class MyPartitionReader implements PartitionReader<InternalRow> {
private long recordsProcessed = 0;
private long startTime = System.currentTimeMillis();
@Override
public boolean next() throws IOException {
// Read next record logic
recordsProcessed++;
return hasMoreRecords();
}
@Override
public InternalRow get() {
// Return current record
return getCurrentRecord();
}
@Override
public CustomTaskMetric[] currentMetricsValues() {
long processingTime = System.currentTimeMillis() - startTime;
return new CustomTaskMetric[]{
new TaskRecordsProcessed(recordsProcessed),
new TaskProcessingTime(processingTime)
};
}
@Override
public void close() throws IOException {
// Cleanup resources
}
private boolean hasMoreRecords() {
// Implementation specific logic
return false;
}
private InternalRow getCurrentRecord() {
// Implementation specific logic
return null;
}
}
private static class TaskProcessingTime implements CustomTaskMetric {
private final long processingTime;
public TaskProcessingTime(long processingTime) {
this.processingTime = processingTime;
}
@Override
public String name() {
return "avgProcessingTime";
}
@Override
public long value() {
return processingTime;
}
}
private static class MyInputPartition implements InputPartition {
// Partition implementation
}
}import org.apache.spark.sql.connector.read.streaming.*;
import org.apache.spark.sql.connector.metric.*;
import java.util.Map;
import java.util.HashMap;
import java.util.Optional;
public class MyStreamingSource implements SparkDataStream, ReportsSourceMetrics {
private long totalRecordsRead = 0;
private long lastBatchRecords = 0;
@Override
public Map<String, String> metrics(Optional<Offset> latestConsumedOffset) {
Map<String, String> metrics = new HashMap<>();
metrics.put("totalRecordsRead", String.valueOf(totalRecordsRead));
metrics.put("lastBatchRecords", String.valueOf(lastBatchRecords));
metrics.put("avgRecordsPerBatch", calculateAverageRecordsPerBatch());
return metrics;
}
private String calculateAverageRecordsPerBatch() {
// Calculate average based on historical data
return "1000";
}
// Other streaming methods...
}The metrics API recognizes certain special metric names that integrate with Spark's built-in task metrics:
bytesWritten: Updates the corresponding task metric for bytes writtenrecordsWritten: Updates the corresponding task metric for records writtenWhen data sources define custom metrics with these names, the values are automatically propagated to Spark's internal task metrics system.
aggregateTaskMetrics methodTo use the Metrics API in your data source implementation, include these imports:
// Core metric interfaces
import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
// Built-in metric implementations
import org.apache.spark.sql.connector.metric.CustomSumMetric;
import org.apache.spark.sql.connector.metric.CustomAvgMetric;
// Streaming metrics
import org.apache.spark.sql.connector.read.streaming.ReportsSourceMetrics;
import org.apache.spark.sql.connector.read.streaming.ReportsSinkMetrics;
// Reader integration
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.Scan;
// Writer integration
import org.apache.spark.sql.connector.write.DataWriter;
// Utility imports
import java.util.Map;
import java.util.Optional;
import java.text.DecimalFormat;