0
# Utilities and Helpers
1
2
This section covers the utility classes, helper functions, and support APIs that complement the core Catalyst functionality. These utilities provide essential building blocks for configuration management, data processing, error handling, and performance optimization.
3
4
## Configuration Management
5
6
### CaseInsensitiveStringMap
7
8
A case-insensitive map implementation for configuration options:
9
10
```java { .api }
11
package org.apache.spark.sql.util;
12
13
public class CaseInsensitiveStringMap implements Map<String, String> {
14
/**
15
* Get string value by key
16
*/
17
public String get(String key);
18
19
/**
20
* Get boolean value with default
21
*/
22
public boolean getBoolean(String key, boolean defaultValue);
23
24
/**
25
* Get integer value with default
26
*/
27
public int getInt(String key, int defaultValue);
28
29
/**
30
* Get long value with default
31
*/
32
public long getLong(String key, long defaultValue);
33
34
/**
35
* Get double value with default
36
*/
37
public double getDouble(String key, double defaultValue);
38
39
// Standard Map interface methods
40
public boolean containsKey(Object key);
41
public Set<String> keySet();
42
public Collection<String> values();
43
public Set<Map.Entry<String, String>> entrySet();
44
}
45
```
46
47
**Usage Examples:**
48
49
```java
50
// Creating configuration maps
51
Map<String, String> options = Map.of(
52
"Format", "parquet",
53
"COMPRESSION", "snappy",
54
"merge.Schema", "true"
55
);
56
57
CaseInsensitiveStringMap config = new CaseInsensitiveStringMap(options);
58
59
// Case-insensitive access
60
String format = config.get("format"); // "parquet"
61
String compression = config.get("compression"); // "snappy"
62
boolean mergeSchema = config.getBoolean("merge.schema", false); // true
63
64
// Type conversion with defaults
65
int batchSize = config.getInt("batch.size", 1000);
66
long maxFileSize = config.getLong("max.file.size", 134217728L); // 128MB default
67
double samplingRatio = config.getDouble("sampling.ratio", 0.1);
68
```
69
70
### Configuration Builder Utility
71
72
```java
73
public class ConfigurationBuilder {
74
private final Map<String, String> options = new HashMap<>();
75
76
public ConfigurationBuilder set(String key, String value) {
77
options.put(key, value);
78
return this;
79
}
80
81
public ConfigurationBuilder set(String key, boolean value) {
82
options.put(key, String.valueOf(value));
83
return this;
84
}
85
86
public ConfigurationBuilder set(String key, int value) {
87
options.put(key, String.valueOf(value));
88
return this;
89
}
90
91
public ConfigurationBuilder set(String key, long value) {
92
options.put(key, String.valueOf(value));
93
return this;
94
}
95
96
public ConfigurationBuilder set(String key, double value) {
97
options.put(key, String.valueOf(value));
98
return this;
99
}
100
101
public CaseInsensitiveStringMap build() {
102
return new CaseInsensitiveStringMap(options);
103
}
104
105
public static ConfigurationBuilder create() {
106
return new ConfigurationBuilder();
107
}
108
}
109
110
// Usage
111
CaseInsensitiveStringMap config = ConfigurationBuilder.create()
112
.set("format", "delta")
113
.set("merge.schema", true)
114
.set("batch.size", 5000)
115
.set("max.file.size", 268435456L) // 256MB
116
.set("compression.ratio", 0.8)
117
.build();
118
```
119
120
## Internal Configuration APIs
121
122
### SQLConf and StaticSQLConf
123
124
Configuration management for SQL-related settings:
125
126
```scala { .api }
127
package org.apache.spark.sql.internal
128
129
class SQLConf extends Serializable {
130
// Dynamic configuration that can be changed at runtime
131
def getConf[T](entry: ConfigEntry[T]): T
132
def setConf[T](entry: ConfigEntry[T], value: T): Unit
133
def unsetConf(key: String): Unit
134
def getAllConfs: Map[String, String]
135
}
136
137
object StaticSQLConf {
138
// Static configuration entries that cannot be changed at runtime
139
val WAREHOUSE_PATH: ConfigEntry[String]
140
val CATALOG_IMPLEMENTATION: ConfigEntry[String]
141
val GLOBAL_TEMP_DATABASE: ConfigEntry[String]
142
}
143
```
144
145
**Usage Examples:**
146
147
```scala
148
import org.apache.spark.sql.internal.SQLConf
149
150
// Access current SQL configuration
151
val sqlConf = SQLConf.get
152
153
// Get configuration values
154
val adaptiveEnabled = sqlConf.adaptiveExecutionEnabled
155
val codegenEnabled = sqlConf.wholeStageCodegenEnabled
156
val broadcastThreshold = sqlConf.autoBroadcastJoinThreshold
157
158
// Set configuration (if mutable)
159
sqlConf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)
160
sqlConf.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 20971520L) // 20MB
161
```
162
163
## Data Type Utilities
164
165
### Data Type Conversion Helpers
166
167
```java
168
public class DataTypeUtils {
169
170
public static DataType fromString(String typeString) {
171
switch (typeString.toLowerCase()) {
172
case "boolean": return DataTypes.BooleanType;
173
case "byte": return DataTypes.ByteType;
174
case "short": return DataTypes.ShortType;
175
case "int": case "integer": return DataTypes.IntegerType;
176
case "long": case "bigint": return DataTypes.LongType;
177
case "float": return DataTypes.FloatType;
178
case "double": return DataTypes.DoubleType;
179
case "decimal": return DataTypes.createDecimalType();
180
case "string": return DataTypes.StringType;
181
case "binary": return DataTypes.BinaryType;
182
case "date": return DataTypes.DateType;
183
case "timestamp": return DataTypes.TimestampType;
184
default:
185
throw new IllegalArgumentException("Unknown data type: " + typeString);
186
}
187
}
188
189
public static boolean isNumericType(DataType dataType) {
190
return dataType instanceof NumericType;
191
}
192
193
public static boolean isStringType(DataType dataType) {
194
return dataType instanceof StringType;
195
}
196
197
public static boolean isComplexType(DataType dataType) {
198
return dataType instanceof ArrayType ||
199
dataType instanceof MapType ||
200
dataType instanceof StructType;
201
}
202
203
public static int sizeOf(DataType dataType) {
204
if (dataType instanceof BooleanType) return 1;
205
if (dataType instanceof ByteType) return 1;
206
if (dataType instanceof ShortType) return 2;
207
if (dataType instanceof IntegerType) return 4;
208
if (dataType instanceof LongType) return 8;
209
if (dataType instanceof FloatType) return 4;
210
if (dataType instanceof DoubleType) return 8;
211
if (dataType instanceof DateType) return 4;
212
if (dataType instanceof TimestampType) return 8;
213
return 8; // Default size for complex types
214
}
215
216
public static Object getDefaultValue(DataType dataType) {
217
if (dataType instanceof BooleanType) return false;
218
if (dataType instanceof ByteType) return (byte) 0;
219
if (dataType instanceof ShortType) return (short) 0;
220
if (dataType instanceof IntegerType) return 0;
221
if (dataType instanceof LongType) return 0L;
222
if (dataType instanceof FloatType) return 0.0f;
223
if (dataType instanceof DoubleType) return 0.0;
224
if (dataType instanceof StringType) return "";
225
if (dataType instanceof BinaryType) return new byte[0];
226
return null; // For nullable or complex types
227
}
228
}
229
```
230
231
### Schema Utilities
232
233
```java
234
public class SchemaUtils {
235
236
public static Column[] toColumns(StructType schema) {
237
return Arrays.stream(schema.fields())
238
.map(SchemaUtils::toColumn)
239
.toArray(Column[]::new);
240
}
241
242
public static Column toColumn(StructField field) {
243
return new Column() {
244
@Override
245
public String name() {
246
return field.name();
247
}
248
249
@Override
250
public DataType dataType() {
251
return field.dataType();
252
}
253
254
@Override
255
public boolean nullable() {
256
return field.nullable();
257
}
258
259
@Override
260
public String comment() {
261
return field.getComment().orElse(null);
262
}
263
264
@Override
265
public ColumnDefaultValue defaultValue() {
266
return null; // V1 schemas don't support default values
267
}
268
269
@Override
270
public MetadataColumn metadataColumn() {
271
return null;
272
}
273
};
274
}
275
276
public static StructType fromColumns(Column[] columns) {
277
StructField[] fields = Arrays.stream(columns)
278
.map(col -> StructField.apply(
279
col.name(),
280
col.dataType(),
281
col.nullable(),
282
Metadata.empty()
283
))
284
.toArray(StructField[]::new);
285
return StructType.apply(fields);
286
}
287
288
public static StructType projectSchema(StructType schema, String[] requiredColumns) {
289
List<StructField> projectedFields = new ArrayList<>();
290
291
for (String columnName : requiredColumns) {
292
try {
293
StructField field = schema.apply(columnName);
294
projectedFields.add(field);
295
} catch (IllegalArgumentException e) {
296
throw new IllegalArgumentException("Column not found: " + columnName);
297
}
298
}
299
300
return StructType.apply(projectedFields.toArray(new StructField[0]));
301
}
302
303
public static boolean isCompatible(StructType source, StructType target) {
304
if (source.length() != target.length()) {
305
return false;
306
}
307
308
for (int i = 0; i < source.length(); i++) {
309
StructField sourceField = source.fields()[i];
310
StructField targetField = target.fields()[i];
311
312
if (!sourceField.name().equals(targetField.name()) ||
313
!isTypeCompatible(sourceField.dataType(), targetField.dataType())) {
314
return false;
315
}
316
}
317
318
return true;
319
}
320
321
private static boolean isTypeCompatible(DataType source, DataType target) {
322
// Exact match
323
if (source.equals(target)) {
324
return true;
325
}
326
327
// Numeric type promotions
328
if (source instanceof IntegerType && target instanceof LongType) {
329
return true;
330
}
331
if (source instanceof FloatType && target instanceof DoubleType) {
332
return true;
333
}
334
if (source instanceof IntegerType && target instanceof DoubleType) {
335
return true;
336
}
337
338
return false;
339
}
340
}
341
```
342
343
## Numeric and Statistical Utilities
344
345
### NumericHistogram
346
347
Histogram implementation for numeric data analysis:
348
349
```java { .api }
350
public class NumericHistogram {
351
/**
352
* Create histogram with specified number of buckets
353
*/
354
public NumericHistogram(int maxBuckets);
355
356
/**
357
* Add value to histogram
358
*/
359
public void add(double value);
360
361
/**
362
* Add value with frequency
363
*/
364
public void add(double value, long frequency);
365
366
/**
367
* Get quantile value
368
*/
369
public double quantile(double quantile);
370
371
/**
372
* Merge with another histogram
373
*/
374
public void merge(NumericHistogram other);
375
376
/**
377
* Get number of buckets
378
*/
379
public int getNumBuckets();
380
381
/**
382
* Get total count
383
*/
384
public long getTotalCount();
385
}
386
```
387
388
**Usage Examples:**
389
390
```java
391
// Create histogram for data analysis
392
NumericHistogram histogram = new NumericHistogram(100);
393
394
// Add data points
395
double[] data = {1.0, 2.5, 3.7, 4.2, 5.1, 6.8, 7.3, 8.9, 9.4, 10.0};
396
for (double value : data) {
397
histogram.add(value);
398
}
399
400
// Calculate statistics
401
double median = histogram.quantile(0.5);
402
double p95 = histogram.quantile(0.95);
403
double p99 = histogram.quantile(0.99);
404
405
System.out.printf("Median: %.2f, P95: %.2f, P99: %.2f%n", median, p95, p99);
406
```
407
408
### Statistical Functions
409
410
```java
411
public class StatisticalUtils {
412
413
public static double mean(double[] values) {
414
return Arrays.stream(values).average().orElse(0.0);
415
}
416
417
public static double standardDeviation(double[] values) {
418
double mean = mean(values);
419
double variance = Arrays.stream(values)
420
.map(x -> Math.pow(x - mean, 2))
421
.average()
422
.orElse(0.0);
423
return Math.sqrt(variance);
424
}
425
426
public static double[] percentiles(double[] values, double[] percentiles) {
427
double[] sorted = Arrays.copyOf(values, values.length);
428
Arrays.sort(sorted);
429
430
return Arrays.stream(percentiles)
431
.map(p -> calculatePercentile(sorted, p))
432
.toArray();
433
}
434
435
private static double calculatePercentile(double[] sortedValues, double percentile) {
436
if (sortedValues.length == 0) return 0.0;
437
if (percentile <= 0) return sortedValues[0];
438
if (percentile >= 100) return sortedValues[sortedValues.length - 1];
439
440
double index = (percentile / 100.0) * (sortedValues.length - 1);
441
int lowerIndex = (int) Math.floor(index);
442
int upperIndex = (int) Math.ceil(index);
443
444
if (lowerIndex == upperIndex) {
445
return sortedValues[lowerIndex];
446
}
447
448
double weight = index - lowerIndex;
449
return sortedValues[lowerIndex] * (1 - weight) + sortedValues[upperIndex] * weight;
450
}
451
}
452
```
453
454
## Hash and Encoding Utilities
455
456
### XXH64 Hash Implementation
457
458
Fast hash function for data processing:
459
460
```java { .api }
461
public class XXH64 {
462
/**
463
* Hash byte array with default seed
464
*/
465
public static long hashBytes(byte[] input);
466
467
/**
468
* Hash byte array with custom seed
469
*/
470
public static long hashBytes(byte[] input, long seed);
471
472
/**
473
* Hash string with default seed
474
*/
475
public static long hashString(String input);
476
477
/**
478
* Hash long value
479
*/
480
public static long hashLong(long input);
481
482
/**
483
* Hash integer value
484
*/
485
public static long hashInt(int input);
486
}
487
```
488
489
**Usage Examples:**
490
491
```java
492
// Hash various data types
493
long stringHash = XXH64.hashString("hello world");
494
long longHash = XXH64.hashLong(12345L);
495
long intHash = XXH64.hashInt(42);
496
497
// Hash with custom seed for consistent partitioning
498
long seed = 42L;
499
byte[] data = "test data".getBytes();
500
long customHash = XXH64.hashBytes(data, seed);
501
502
// Use for data partitioning
503
public int getPartition(String key, int numPartitions) {
504
long hash = XXH64.hashString(key);
505
return (int) (Math.abs(hash) % numPartitions);
506
}
507
```
508
509
### CharVarchar Utilities
510
511
Utilities for CHAR/VARCHAR type handling:
512
513
```java { .api }
514
public class CharVarcharCodegenUtils {
515
/**
516
* Read string with proper CHAR/VARCHAR semantics
517
*/
518
public static UTF8String readSidePadding(UTF8String input, int length);
519
520
/**
521
* Write string with proper CHAR/VARCHAR semantics
522
*/
523
public static UTF8String writeSidePadding(UTF8String input, int length);
524
525
/**
526
* Validate string length for CHAR/VARCHAR
527
*/
528
public static boolean validateLength(UTF8String input, int maxLength);
529
}
530
```
531
532
## Memory and Performance Utilities
533
534
### Memory Management Helpers
535
536
```java
537
public class MemoryUtils {
538
539
public static long estimateObjectSize(Object obj) {
540
if (obj == null) return 8; // Reference size
541
542
if (obj instanceof String) {
543
return 24 + ((String) obj).length() * 2; // Object header + char array
544
}
545
546
if (obj instanceof byte[]) {
547
return 24 + ((byte[]) obj).length; // Object header + array
548
}
549
550
if (obj instanceof int[]) {
551
return 24 + ((int[]) obj).length * 4;
552
}
553
554
if (obj instanceof long[]) {
555
return 24 + ((long[]) obj).length * 8;
556
}
557
558
return 24; // Default object header size
559
}
560
561
public static String formatBytes(long bytes) {
562
if (bytes < 1024) return bytes + " B";
563
if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0);
564
if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / (1024.0 * 1024));
565
return String.format("%.1f GB", bytes / (1024.0 * 1024 * 1024));
566
}
567
568
public static void printMemoryUsage(String prefix) {
569
Runtime runtime = Runtime.getRuntime();
570
long totalMemory = runtime.totalMemory();
571
long freeMemory = runtime.freeMemory();
572
long usedMemory = totalMemory - freeMemory;
573
long maxMemory = runtime.maxMemory();
574
575
System.out.printf("%s Memory: Used=%s, Free=%s, Total=%s, Max=%s%n",
576
prefix,
577
formatBytes(usedMemory),
578
formatBytes(freeMemory),
579
formatBytes(totalMemory),
580
formatBytes(maxMemory));
581
}
582
}
583
```
584
585
### Performance Measurement
586
587
```java
588
public class PerformanceTimer {
589
private final Map<String, Long> startTimes = new ConcurrentHashMap<>();
590
private final Map<String, Long> durations = new ConcurrentHashMap<>();
591
private final Map<String, Long> counts = new ConcurrentHashMap<>();
592
593
public void start(String operationName) {
594
startTimes.put(operationName, System.nanoTime());
595
}
596
597
public long stop(String operationName) {
598
Long startTime = startTimes.remove(operationName);
599
if (startTime == null) {
600
throw new IllegalStateException("No start time for operation: " + operationName);
601
}
602
603
long duration = System.nanoTime() - startTime;
604
durations.merge(operationName, duration, Long::sum);
605
counts.merge(operationName, 1L, Long::sum);
606
607
return duration;
608
}
609
610
public void time(String operationName, Runnable operation) {
611
start(operationName);
612
try {
613
operation.run();
614
} finally {
615
stop(operationName);
616
}
617
}
618
619
public <T> T time(String operationName, Supplier<T> operation) {
620
start(operationName);
621
try {
622
return operation.get();
623
} finally {
624
stop(operationName);
625
}
626
}
627
628
public void printStatistics() {
629
System.out.println("Performance Statistics:");
630
System.out.println("=====================");
631
632
for (String operation : durations.keySet()) {
633
long totalDuration = durations.get(operation);
634
long count = counts.get(operation);
635
long avgDuration = totalDuration / count;
636
637
System.out.printf("%-30s: Count=%6d, Total=%8.2fms, Avg=%8.2fms%n",
638
operation, count,
639
totalDuration / 1_000_000.0,
640
avgDuration / 1_000_000.0);
641
}
642
}
643
644
public void reset() {
645
startTimes.clear();
646
durations.clear();
647
counts.clear();
648
}
649
}
650
```
651
652
## Error Handling and Logging
653
654
### Custom Exceptions
655
656
```java
657
public class CatalystException extends RuntimeException {
658
private final String errorClass;
659
private final Map<String, String> messageParameters;
660
661
public CatalystException(String errorClass, String message) {
662
super(message);
663
this.errorClass = errorClass;
664
this.messageParameters = Collections.emptyMap();
665
}
666
667
public CatalystException(String errorClass, String message, Throwable cause) {
668
super(message, cause);
669
this.errorClass = errorClass;
670
this.messageParameters = Collections.emptyMap();
671
}
672
673
public CatalystException(String errorClass, String message,
674
Map<String, String> messageParameters) {
675
super(message);
676
this.errorClass = errorClass;
677
this.messageParameters = Collections.unmodifiableMap(messageParameters);
678
}
679
680
public String getErrorClass() {
681
return errorClass;
682
}
683
684
public Map<String, String> getMessageParameters() {
685
return messageParameters;
686
}
687
}
688
689
public class QueryExecutionException extends CatalystException {
690
public QueryExecutionException(String message) {
691
super("QUERY_EXECUTION_ERROR", message);
692
}
693
694
public QueryExecutionException(String message, Throwable cause) {
695
super("QUERY_EXECUTION_ERROR", message, cause);
696
}
697
}
698
```
699
700
### Utility Logger
701
702
```java
703
public class CatalystLogger {
704
private static final Logger logger = LoggerFactory.getLogger(CatalystLogger.class);
705
706
public static void logInfo(String message, Object... args) {
707
if (logger.isInfoEnabled()) {
708
logger.info(String.format(message, args));
709
}
710
}
711
712
public static void logWarning(String message, Object... args) {
713
if (logger.isWarnEnabled()) {
714
logger.warn(String.format(message, args));
715
}
716
}
717
718
public static void logError(String message, Throwable throwable, Object... args) {
719
if (logger.isErrorEnabled()) {
720
logger.error(String.format(message, args), throwable);
721
}
722
}
723
724
public static void logDebug(String message, Object... args) {
725
if (logger.isDebugEnabled()) {
726
logger.debug(String.format(message, args));
727
}
728
}
729
730
public static <T> T logTiming(String operation, Supplier<T> supplier) {
731
long startTime = System.currentTimeMillis();
732
try {
733
T result = supplier.get();
734
long duration = System.currentTimeMillis() - startTime;
735
logInfo("Operation %s completed in %d ms", operation, duration);
736
return result;
737
} catch (Exception e) {
738
long duration = System.currentTimeMillis() - startTime;
739
logError("Operation %s failed after %d ms", e, operation, duration);
740
throw e;
741
}
742
}
743
}
744
```
745
746
These utilities and helpers provide essential building blocks for working with Apache Spark Catalyst, offering practical solutions for common tasks like configuration management, data type handling, performance monitoring, and error handling.