This section covers the utility classes, helper functions, and support APIs that complement the core Catalyst functionality. These utilities provide essential building blocks for configuration management, data processing, error handling, and performance optimization.
A case-insensitive map implementation for configuration options:
package org.apache.spark.sql.util;
public class CaseInsensitiveStringMap implements Map<String, String> {
/**
* Get string value by key
*/
public String get(String key);
/**
* Get boolean value with default
*/
public boolean getBoolean(String key, boolean defaultValue);
/**
* Get integer value with default
*/
public int getInt(String key, int defaultValue);
/**
* Get long value with default
*/
public long getLong(String key, long defaultValue);
/**
* Get double value with default
*/
public double getDouble(String key, double defaultValue);
// Standard Map interface methods
public boolean containsKey(Object key);
public Set<String> keySet();
public Collection<String> values();
public Set<Map.Entry<String, String>> entrySet();
}Usage Examples:
// Creating configuration maps
Map<String, String> options = Map.of(
"Format", "parquet",
"COMPRESSION", "snappy",
"merge.Schema", "true"
);
CaseInsensitiveStringMap config = new CaseInsensitiveStringMap(options);
// Case-insensitive access
String format = config.get("format"); // "parquet"
String compression = config.get("compression"); // "snappy"
boolean mergeSchema = config.getBoolean("merge.schema", false); // true
// Type conversion with defaults
int batchSize = config.getInt("batch.size", 1000);
long maxFileSize = config.getLong("max.file.size", 134217728L); // 128MB default
double samplingRatio = config.getDouble("sampling.ratio", 0.1);public class ConfigurationBuilder {
private final Map<String, String> options = new HashMap<>();
public ConfigurationBuilder set(String key, String value) {
options.put(key, value);
return this;
}
public ConfigurationBuilder set(String key, boolean value) {
options.put(key, String.valueOf(value));
return this;
}
public ConfigurationBuilder set(String key, int value) {
options.put(key, String.valueOf(value));
return this;
}
public ConfigurationBuilder set(String key, long value) {
options.put(key, String.valueOf(value));
return this;
}
public ConfigurationBuilder set(String key, double value) {
options.put(key, String.valueOf(value));
return this;
}
public CaseInsensitiveStringMap build() {
return new CaseInsensitiveStringMap(options);
}
public static ConfigurationBuilder create() {
return new ConfigurationBuilder();
}
}
// Usage
CaseInsensitiveStringMap config = ConfigurationBuilder.create()
.set("format", "delta")
.set("merge.schema", true)
.set("batch.size", 5000)
.set("max.file.size", 268435456L) // 256MB
.set("compression.ratio", 0.8)
.build();Configuration management for SQL-related settings:
package org.apache.spark.sql.internal
class SQLConf extends Serializable {
// Dynamic configuration that can be changed at runtime
def getConf[T](entry: ConfigEntry[T]): T
def setConf[T](entry: ConfigEntry[T], value: T): Unit
def unsetConf(key: String): Unit
def getAllConfs: Map[String, String]
}
object StaticSQLConf {
// Static configuration entries that cannot be changed at runtime
val WAREHOUSE_PATH: ConfigEntry[String]
val CATALOG_IMPLEMENTATION: ConfigEntry[String]
val GLOBAL_TEMP_DATABASE: ConfigEntry[String]
}Usage Examples:
import org.apache.spark.sql.internal.SQLConf
// Access current SQL configuration
val sqlConf = SQLConf.get
// Get configuration values
val adaptiveEnabled = sqlConf.adaptiveExecutionEnabled
val codegenEnabled = sqlConf.wholeStageCodegenEnabled
val broadcastThreshold = sqlConf.autoBroadcastJoinThreshold
// Set configuration (if mutable)
sqlConf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)
sqlConf.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 20971520L) // 20MBpublic class DataTypeUtils {
public static DataType fromString(String typeString) {
switch (typeString.toLowerCase()) {
case "boolean": return DataTypes.BooleanType;
case "byte": return DataTypes.ByteType;
case "short": return DataTypes.ShortType;
case "int": case "integer": return DataTypes.IntegerType;
case "long": case "bigint": return DataTypes.LongType;
case "float": return DataTypes.FloatType;
case "double": return DataTypes.DoubleType;
case "decimal": return DataTypes.createDecimalType();
case "string": return DataTypes.StringType;
case "binary": return DataTypes.BinaryType;
case "date": return DataTypes.DateType;
case "timestamp": return DataTypes.TimestampType;
default:
throw new IllegalArgumentException("Unknown data type: " + typeString);
}
}
public static boolean isNumericType(DataType dataType) {
return dataType instanceof NumericType;
}
public static boolean isStringType(DataType dataType) {
return dataType instanceof StringType;
}
public static boolean isComplexType(DataType dataType) {
return dataType instanceof ArrayType ||
dataType instanceof MapType ||
dataType instanceof StructType;
}
public static int sizeOf(DataType dataType) {
if (dataType instanceof BooleanType) return 1;
if (dataType instanceof ByteType) return 1;
if (dataType instanceof ShortType) return 2;
if (dataType instanceof IntegerType) return 4;
if (dataType instanceof LongType) return 8;
if (dataType instanceof FloatType) return 4;
if (dataType instanceof DoubleType) return 8;
if (dataType instanceof DateType) return 4;
if (dataType instanceof TimestampType) return 8;
return 8; // Default size for complex types
}
public static Object getDefaultValue(DataType dataType) {
if (dataType instanceof BooleanType) return false;
if (dataType instanceof ByteType) return (byte) 0;
if (dataType instanceof ShortType) return (short) 0;
if (dataType instanceof IntegerType) return 0;
if (dataType instanceof LongType) return 0L;
if (dataType instanceof FloatType) return 0.0f;
if (dataType instanceof DoubleType) return 0.0;
if (dataType instanceof StringType) return "";
if (dataType instanceof BinaryType) return new byte[0];
return null; // For nullable or complex types
}
}public class SchemaUtils {
public static Column[] toColumns(StructType schema) {
return Arrays.stream(schema.fields())
.map(SchemaUtils::toColumn)
.toArray(Column[]::new);
}
public static Column toColumn(StructField field) {
return new Column() {
@Override
public String name() {
return field.name();
}
@Override
public DataType dataType() {
return field.dataType();
}
@Override
public boolean nullable() {
return field.nullable();
}
@Override
public String comment() {
return field.getComment().orElse(null);
}
@Override
public ColumnDefaultValue defaultValue() {
return null; // V1 schemas don't support default values
}
@Override
public MetadataColumn metadataColumn() {
return null;
}
};
}
public static StructType fromColumns(Column[] columns) {
StructField[] fields = Arrays.stream(columns)
.map(col -> StructField.apply(
col.name(),
col.dataType(),
col.nullable(),
Metadata.empty()
))
.toArray(StructField[]::new);
return StructType.apply(fields);
}
public static StructType projectSchema(StructType schema, String[] requiredColumns) {
List<StructField> projectedFields = new ArrayList<>();
for (String columnName : requiredColumns) {
try {
StructField field = schema.apply(columnName);
projectedFields.add(field);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Column not found: " + columnName);
}
}
return StructType.apply(projectedFields.toArray(new StructField[0]));
}
public static boolean isCompatible(StructType source, StructType target) {
if (source.length() != target.length()) {
return false;
}
for (int i = 0; i < source.length(); i++) {
StructField sourceField = source.fields()[i];
StructField targetField = target.fields()[i];
if (!sourceField.name().equals(targetField.name()) ||
!isTypeCompatible(sourceField.dataType(), targetField.dataType())) {
return false;
}
}
return true;
}
private static boolean isTypeCompatible(DataType source, DataType target) {
// Exact match
if (source.equals(target)) {
return true;
}
// Numeric type promotions
if (source instanceof IntegerType && target instanceof LongType) {
return true;
}
if (source instanceof FloatType && target instanceof DoubleType) {
return true;
}
if (source instanceof IntegerType && target instanceof DoubleType) {
return true;
}
return false;
}
}Histogram implementation for numeric data analysis:
public class NumericHistogram {
/**
* Create histogram with specified number of buckets
*/
public NumericHistogram(int maxBuckets);
/**
* Add value to histogram
*/
public void add(double value);
/**
* Add value with frequency
*/
public void add(double value, long frequency);
/**
* Get quantile value
*/
public double quantile(double quantile);
/**
* Merge with another histogram
*/
public void merge(NumericHistogram other);
/**
* Get number of buckets
*/
public int getNumBuckets();
/**
* Get total count
*/
public long getTotalCount();
}Usage Examples:
// Create histogram for data analysis
NumericHistogram histogram = new NumericHistogram(100);
// Add data points
double[] data = {1.0, 2.5, 3.7, 4.2, 5.1, 6.8, 7.3, 8.9, 9.4, 10.0};
for (double value : data) {
histogram.add(value);
}
// Calculate statistics
double median = histogram.quantile(0.5);
double p95 = histogram.quantile(0.95);
double p99 = histogram.quantile(0.99);
System.out.printf("Median: %.2f, P95: %.2f, P99: %.2f%n", median, p95, p99);public class StatisticalUtils {
public static double mean(double[] values) {
return Arrays.stream(values).average().orElse(0.0);
}
public static double standardDeviation(double[] values) {
double mean = mean(values);
double variance = Arrays.stream(values)
.map(x -> Math.pow(x - mean, 2))
.average()
.orElse(0.0);
return Math.sqrt(variance);
}
public static double[] percentiles(double[] values, double[] percentiles) {
double[] sorted = Arrays.copyOf(values, values.length);
Arrays.sort(sorted);
return Arrays.stream(percentiles)
.map(p -> calculatePercentile(sorted, p))
.toArray();
}
private static double calculatePercentile(double[] sortedValues, double percentile) {
if (sortedValues.length == 0) return 0.0;
if (percentile <= 0) return sortedValues[0];
if (percentile >= 100) return sortedValues[sortedValues.length - 1];
double index = (percentile / 100.0) * (sortedValues.length - 1);
int lowerIndex = (int) Math.floor(index);
int upperIndex = (int) Math.ceil(index);
if (lowerIndex == upperIndex) {
return sortedValues[lowerIndex];
}
double weight = index - lowerIndex;
return sortedValues[lowerIndex] * (1 - weight) + sortedValues[upperIndex] * weight;
}
}Fast hash function for data processing:
public class XXH64 {
/**
* Hash byte array with default seed
*/
public static long hashBytes(byte[] input);
/**
* Hash byte array with custom seed
*/
public static long hashBytes(byte[] input, long seed);
/**
* Hash string with default seed
*/
public static long hashString(String input);
/**
* Hash long value
*/
public static long hashLong(long input);
/**
* Hash integer value
*/
public static long hashInt(int input);
}Usage Examples:
// Hash various data types
long stringHash = XXH64.hashString("hello world");
long longHash = XXH64.hashLong(12345L);
long intHash = XXH64.hashInt(42);
// Hash with custom seed for consistent partitioning
long seed = 42L;
byte[] data = "test data".getBytes();
long customHash = XXH64.hashBytes(data, seed);
// Use for data partitioning
public int getPartition(String key, int numPartitions) {
long hash = XXH64.hashString(key);
return (int) (Math.abs(hash) % numPartitions);
}Utilities for CHAR/VARCHAR type handling:
public class CharVarcharCodegenUtils {
/**
* Read string with proper CHAR/VARCHAR semantics
*/
public static UTF8String readSidePadding(UTF8String input, int length);
/**
* Write string with proper CHAR/VARCHAR semantics
*/
public static UTF8String writeSidePadding(UTF8String input, int length);
/**
* Validate string length for CHAR/VARCHAR
*/
public static boolean validateLength(UTF8String input, int maxLength);
}public class MemoryUtils {
public static long estimateObjectSize(Object obj) {
if (obj == null) return 8; // Reference size
if (obj instanceof String) {
return 24 + ((String) obj).length() * 2; // Object header + char array
}
if (obj instanceof byte[]) {
return 24 + ((byte[]) obj).length; // Object header + array
}
if (obj instanceof int[]) {
return 24 + ((int[]) obj).length * 4;
}
if (obj instanceof long[]) {
return 24 + ((long[]) obj).length * 8;
}
return 24; // Default object header size
}
public static String formatBytes(long bytes) {
if (bytes < 1024) return bytes + " B";
if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0);
if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / (1024.0 * 1024));
return String.format("%.1f GB", bytes / (1024.0 * 1024 * 1024));
}
public static void printMemoryUsage(String prefix) {
Runtime runtime = Runtime.getRuntime();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;
long maxMemory = runtime.maxMemory();
System.out.printf("%s Memory: Used=%s, Free=%s, Total=%s, Max=%s%n",
prefix,
formatBytes(usedMemory),
formatBytes(freeMemory),
formatBytes(totalMemory),
formatBytes(maxMemory));
}
}public class PerformanceTimer {
private final Map<String, Long> startTimes = new ConcurrentHashMap<>();
private final Map<String, Long> durations = new ConcurrentHashMap<>();
private final Map<String, Long> counts = new ConcurrentHashMap<>();
public void start(String operationName) {
startTimes.put(operationName, System.nanoTime());
}
public long stop(String operationName) {
Long startTime = startTimes.remove(operationName);
if (startTime == null) {
throw new IllegalStateException("No start time for operation: " + operationName);
}
long duration = System.nanoTime() - startTime;
durations.merge(operationName, duration, Long::sum);
counts.merge(operationName, 1L, Long::sum);
return duration;
}
public void time(String operationName, Runnable operation) {
start(operationName);
try {
operation.run();
} finally {
stop(operationName);
}
}
public <T> T time(String operationName, Supplier<T> operation) {
start(operationName);
try {
return operation.get();
} finally {
stop(operationName);
}
}
public void printStatistics() {
System.out.println("Performance Statistics:");
System.out.println("=====================");
for (String operation : durations.keySet()) {
long totalDuration = durations.get(operation);
long count = counts.get(operation);
long avgDuration = totalDuration / count;
System.out.printf("%-30s: Count=%6d, Total=%8.2fms, Avg=%8.2fms%n",
operation, count,
totalDuration / 1_000_000.0,
avgDuration / 1_000_000.0);
}
}
public void reset() {
startTimes.clear();
durations.clear();
counts.clear();
}
}public class CatalystException extends RuntimeException {
private final String errorClass;
private final Map<String, String> messageParameters;
public CatalystException(String errorClass, String message) {
super(message);
this.errorClass = errorClass;
this.messageParameters = Collections.emptyMap();
}
public CatalystException(String errorClass, String message, Throwable cause) {
super(message, cause);
this.errorClass = errorClass;
this.messageParameters = Collections.emptyMap();
}
public CatalystException(String errorClass, String message,
Map<String, String> messageParameters) {
super(message);
this.errorClass = errorClass;
this.messageParameters = Collections.unmodifiableMap(messageParameters);
}
public String getErrorClass() {
return errorClass;
}
public Map<String, String> getMessageParameters() {
return messageParameters;
}
}
public class QueryExecutionException extends CatalystException {
public QueryExecutionException(String message) {
super("QUERY_EXECUTION_ERROR", message);
}
public QueryExecutionException(String message, Throwable cause) {
super("QUERY_EXECUTION_ERROR", message, cause);
}
}public class CatalystLogger {
private static final Logger logger = LoggerFactory.getLogger(CatalystLogger.class);
public static void logInfo(String message, Object... args) {
if (logger.isInfoEnabled()) {
logger.info(String.format(message, args));
}
}
public static void logWarning(String message, Object... args) {
if (logger.isWarnEnabled()) {
logger.warn(String.format(message, args));
}
}
public static void logError(String message, Throwable throwable, Object... args) {
if (logger.isErrorEnabled()) {
logger.error(String.format(message, args), throwable);
}
}
public static void logDebug(String message, Object... args) {
if (logger.isDebugEnabled()) {
logger.debug(String.format(message, args));
}
}
public static <T> T logTiming(String operation, Supplier<T> supplier) {
long startTime = System.currentTimeMillis();
try {
T result = supplier.get();
long duration = System.currentTimeMillis() - startTime;
logInfo("Operation %s completed in %d ms", operation, duration);
return result;
} catch (Exception e) {
long duration = System.currentTimeMillis() - startTime;
logError("Operation %s failed after %d ms", e, operation, duration);
throw e;
}
}
}These utilities and helpers provide essential building blocks for working with Apache Spark Catalyst, offering practical solutions for common tasks like configuration management, data type handling, performance monitoring, and error handling.