or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md

utilities-helpers.mddocs/

0

# Utilities and Helpers

1

2

This section covers the utility classes, helper functions, and support APIs that complement the core Catalyst functionality. These utilities provide essential building blocks for configuration management, data processing, error handling, and performance optimization.

3

4

## Configuration Management

5

6

### CaseInsensitiveStringMap

7

8

A case-insensitive map implementation for configuration options:

9

10

```java { .api }

11

package org.apache.spark.sql.util;

12

13

public class CaseInsensitiveStringMap implements Map<String, String> {

14

/**

15

* Get string value by key

16

*/

17

public String get(String key);

18

19

/**

20

* Get boolean value with default

21

*/

22

public boolean getBoolean(String key, boolean defaultValue);

23

24

/**

25

* Get integer value with default

26

*/

27

public int getInt(String key, int defaultValue);

28

29

/**

30

* Get long value with default

31

*/

32

public long getLong(String key, long defaultValue);

33

34

/**

35

* Get double value with default

36

*/

37

public double getDouble(String key, double defaultValue);

38

39

// Standard Map interface methods

40

public boolean containsKey(Object key);

41

public Set<String> keySet();

42

public Collection<String> values();

43

public Set<Map.Entry<String, String>> entrySet();

44

}

45

```

46

47

**Usage Examples:**

48

49

```java

50

// Creating configuration maps

51

Map<String, String> options = Map.of(

52

"Format", "parquet",

53

"COMPRESSION", "snappy",

54

"merge.Schema", "true"

55

);

56

57

CaseInsensitiveStringMap config = new CaseInsensitiveStringMap(options);

58

59

// Case-insensitive access

60

String format = config.get("format"); // "parquet"

61

String compression = config.get("compression"); // "snappy"

62

boolean mergeSchema = config.getBoolean("merge.schema", false); // true

63

64

// Type conversion with defaults

65

int batchSize = config.getInt("batch.size", 1000);

66

long maxFileSize = config.getLong("max.file.size", 134217728L); // 128MB default

67

double samplingRatio = config.getDouble("sampling.ratio", 0.1);

68

```

69

70

### Configuration Builder Utility

71

72

```java

73

public class ConfigurationBuilder {

74

private final Map<String, String> options = new HashMap<>();

75

76

public ConfigurationBuilder set(String key, String value) {

77

options.put(key, value);

78

return this;

79

}

80

81

public ConfigurationBuilder set(String key, boolean value) {

82

options.put(key, String.valueOf(value));

83

return this;

84

}

85

86

public ConfigurationBuilder set(String key, int value) {

87

options.put(key, String.valueOf(value));

88

return this;

89

}

90

91

public ConfigurationBuilder set(String key, long value) {

92

options.put(key, String.valueOf(value));

93

return this;

94

}

95

96

public ConfigurationBuilder set(String key, double value) {

97

options.put(key, String.valueOf(value));

98

return this;

99

}

100

101

public CaseInsensitiveStringMap build() {

102

return new CaseInsensitiveStringMap(options);

103

}

104

105

public static ConfigurationBuilder create() {

106

return new ConfigurationBuilder();

107

}

108

}

109

110

// Usage

111

CaseInsensitiveStringMap config = ConfigurationBuilder.create()

112

.set("format", "delta")

113

.set("merge.schema", true)

114

.set("batch.size", 5000)

115

.set("max.file.size", 268435456L) // 256MB

116

.set("compression.ratio", 0.8)

117

.build();

118

```

119

120

## Internal Configuration APIs

121

122

### SQLConf and StaticSQLConf

123

124

Configuration management for SQL-related settings:

125

126

```scala { .api }

127

package org.apache.spark.sql.internal

128

129

class SQLConf extends Serializable {

130

// Dynamic configuration that can be changed at runtime

131

def getConf[T](entry: ConfigEntry[T]): T

132

def setConf[T](entry: ConfigEntry[T], value: T): Unit

133

def unsetConf(key: String): Unit

134

def getAllConfs: Map[String, String]

135

}

136

137

object StaticSQLConf {

138

// Static configuration entries that cannot be changed at runtime

139

val WAREHOUSE_PATH: ConfigEntry[String]

140

val CATALOG_IMPLEMENTATION: ConfigEntry[String]

141

val GLOBAL_TEMP_DATABASE: ConfigEntry[String]

142

}

143

```

144

145

**Usage Examples:**

146

147

```scala

148

import org.apache.spark.sql.internal.SQLConf

149

150

// Access current SQL configuration

151

val sqlConf = SQLConf.get

152

153

// Get configuration values

154

val adaptiveEnabled = sqlConf.adaptiveExecutionEnabled

155

val codegenEnabled = sqlConf.wholeStageCodegenEnabled

156

val broadcastThreshold = sqlConf.autoBroadcastJoinThreshold

157

158

// Set configuration (if mutable)

159

sqlConf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)

160

sqlConf.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 20971520L) // 20MB

161

```

162

163

## Data Type Utilities

164

165

### Data Type Conversion Helpers

166

167

```java

168

public class DataTypeUtils {

169

170

public static DataType fromString(String typeString) {

171

switch (typeString.toLowerCase()) {

172

case "boolean": return DataTypes.BooleanType;

173

case "byte": return DataTypes.ByteType;

174

case "short": return DataTypes.ShortType;

175

case "int": case "integer": return DataTypes.IntegerType;

176

case "long": case "bigint": return DataTypes.LongType;

177

case "float": return DataTypes.FloatType;

178

case "double": return DataTypes.DoubleType;

179

case "decimal": return DataTypes.createDecimalType();

180

case "string": return DataTypes.StringType;

181

case "binary": return DataTypes.BinaryType;

182

case "date": return DataTypes.DateType;

183

case "timestamp": return DataTypes.TimestampType;

184

default:

185

throw new IllegalArgumentException("Unknown data type: " + typeString);

186

}

187

}

188

189

public static boolean isNumericType(DataType dataType) {

190

return dataType instanceof NumericType;

191

}

192

193

public static boolean isStringType(DataType dataType) {

194

return dataType instanceof StringType;

195

}

196

197

public static boolean isComplexType(DataType dataType) {

198

return dataType instanceof ArrayType ||

199

dataType instanceof MapType ||

200

dataType instanceof StructType;

201

}

202

203

public static int sizeOf(DataType dataType) {

204

if (dataType instanceof BooleanType) return 1;

205

if (dataType instanceof ByteType) return 1;

206

if (dataType instanceof ShortType) return 2;

207

if (dataType instanceof IntegerType) return 4;

208

if (dataType instanceof LongType) return 8;

209

if (dataType instanceof FloatType) return 4;

210

if (dataType instanceof DoubleType) return 8;

211

if (dataType instanceof DateType) return 4;

212

if (dataType instanceof TimestampType) return 8;

213

return 8; // Default size for complex types

214

}

215

216

public static Object getDefaultValue(DataType dataType) {

217

if (dataType instanceof BooleanType) return false;

218

if (dataType instanceof ByteType) return (byte) 0;

219

if (dataType instanceof ShortType) return (short) 0;

220

if (dataType instanceof IntegerType) return 0;

221

if (dataType instanceof LongType) return 0L;

222

if (dataType instanceof FloatType) return 0.0f;

223

if (dataType instanceof DoubleType) return 0.0;

224

if (dataType instanceof StringType) return "";

225

if (dataType instanceof BinaryType) return new byte[0];

226

return null; // For nullable or complex types

227

}

228

}

229

```

230

231

### Schema Utilities

232

233

```java

234

public class SchemaUtils {

235

236

public static Column[] toColumns(StructType schema) {

237

return Arrays.stream(schema.fields())

238

.map(SchemaUtils::toColumn)

239

.toArray(Column[]::new);

240

}

241

242

public static Column toColumn(StructField field) {

243

return new Column() {

244

@Override

245

public String name() {

246

return field.name();

247

}

248

249

@Override

250

public DataType dataType() {

251

return field.dataType();

252

}

253

254

@Override

255

public boolean nullable() {

256

return field.nullable();

257

}

258

259

@Override

260

public String comment() {

261

return field.getComment().orElse(null);

262

}

263

264

@Override

265

public ColumnDefaultValue defaultValue() {

266

return null; // V1 schemas don't support default values

267

}

268

269

@Override

270

public MetadataColumn metadataColumn() {

271

return null;

272

}

273

};

274

}

275

276

public static StructType fromColumns(Column[] columns) {

277

StructField[] fields = Arrays.stream(columns)

278

.map(col -> StructField.apply(

279

col.name(),

280

col.dataType(),

281

col.nullable(),

282

Metadata.empty()

283

))

284

.toArray(StructField[]::new);

285

return StructType.apply(fields);

286

}

287

288

public static StructType projectSchema(StructType schema, String[] requiredColumns) {

289

List<StructField> projectedFields = new ArrayList<>();

290

291

for (String columnName : requiredColumns) {

292

try {

293

StructField field = schema.apply(columnName);

294

projectedFields.add(field);

295

} catch (IllegalArgumentException e) {

296

throw new IllegalArgumentException("Column not found: " + columnName);

297

}

298

}

299

300

return StructType.apply(projectedFields.toArray(new StructField[0]));

301

}

302

303

public static boolean isCompatible(StructType source, StructType target) {

304

if (source.length() != target.length()) {

305

return false;

306

}

307

308

for (int i = 0; i < source.length(); i++) {

309

StructField sourceField = source.fields()[i];

310

StructField targetField = target.fields()[i];

311

312

if (!sourceField.name().equals(targetField.name()) ||

313

!isTypeCompatible(sourceField.dataType(), targetField.dataType())) {

314

return false;

315

}

316

}

317

318

return true;

319

}

320

321

private static boolean isTypeCompatible(DataType source, DataType target) {

322

// Exact match

323

if (source.equals(target)) {

324

return true;

325

}

326

327

// Numeric type promotions

328

if (source instanceof IntegerType && target instanceof LongType) {

329

return true;

330

}

331

if (source instanceof FloatType && target instanceof DoubleType) {

332

return true;

333

}

334

if (source instanceof IntegerType && target instanceof DoubleType) {

335

return true;

336

}

337

338

return false;

339

}

340

}

341

```

342

343

## Numeric and Statistical Utilities

344

345

### NumericHistogram

346

347

Histogram implementation for numeric data analysis:

348

349

```java { .api }

350

public class NumericHistogram {

351

/**

352

* Create histogram with specified number of buckets

353

*/

354

public NumericHistogram(int maxBuckets);

355

356

/**

357

* Add value to histogram

358

*/

359

public void add(double value);

360

361

/**

362

* Add value with frequency

363

*/

364

public void add(double value, long frequency);

365

366

/**

367

* Get quantile value

368

*/

369

public double quantile(double quantile);

370

371

/**

372

* Merge with another histogram

373

*/

374

public void merge(NumericHistogram other);

375

376

/**

377

* Get number of buckets

378

*/

379

public int getNumBuckets();

380

381

/**

382

* Get total count

383

*/

384

public long getTotalCount();

385

}

386

```

387

388

**Usage Examples:**

389

390

```java

391

// Create histogram for data analysis

392

NumericHistogram histogram = new NumericHistogram(100);

393

394

// Add data points

395

double[] data = {1.0, 2.5, 3.7, 4.2, 5.1, 6.8, 7.3, 8.9, 9.4, 10.0};

396

for (double value : data) {

397

histogram.add(value);

398

}

399

400

// Calculate statistics

401

double median = histogram.quantile(0.5);

402

double p95 = histogram.quantile(0.95);

403

double p99 = histogram.quantile(0.99);

404

405

System.out.printf("Median: %.2f, P95: %.2f, P99: %.2f%n", median, p95, p99);

406

```

407

408

### Statistical Functions

409

410

```java

411

public class StatisticalUtils {

412

413

public static double mean(double[] values) {

414

return Arrays.stream(values).average().orElse(0.0);

415

}

416

417

public static double standardDeviation(double[] values) {

418

double mean = mean(values);

419

double variance = Arrays.stream(values)

420

.map(x -> Math.pow(x - mean, 2))

421

.average()

422

.orElse(0.0);

423

return Math.sqrt(variance);

424

}

425

426

public static double[] percentiles(double[] values, double[] percentiles) {

427

double[] sorted = Arrays.copyOf(values, values.length);

428

Arrays.sort(sorted);

429

430

return Arrays.stream(percentiles)

431

.map(p -> calculatePercentile(sorted, p))

432

.toArray();

433

}

434

435

private static double calculatePercentile(double[] sortedValues, double percentile) {

436

if (sortedValues.length == 0) return 0.0;

437

if (percentile <= 0) return sortedValues[0];

438

if (percentile >= 100) return sortedValues[sortedValues.length - 1];

439

440

double index = (percentile / 100.0) * (sortedValues.length - 1);

441

int lowerIndex = (int) Math.floor(index);

442

int upperIndex = (int) Math.ceil(index);

443

444

if (lowerIndex == upperIndex) {

445

return sortedValues[lowerIndex];

446

}

447

448

double weight = index - lowerIndex;

449

return sortedValues[lowerIndex] * (1 - weight) + sortedValues[upperIndex] * weight;

450

}

451

}

452

```

453

454

## Hash and Encoding Utilities

455

456

### XXH64 Hash Implementation

457

458

Fast hash function for data processing:

459

460

```java { .api }

461

public class XXH64 {

462

/**

463

* Hash byte array with default seed

464

*/

465

public static long hashBytes(byte[] input);

466

467

/**

468

* Hash byte array with custom seed

469

*/

470

public static long hashBytes(byte[] input, long seed);

471

472

/**

473

* Hash string with default seed

474

*/

475

public static long hashString(String input);

476

477

/**

478

* Hash long value

479

*/

480

public static long hashLong(long input);

481

482

/**

483

* Hash integer value

484

*/

485

public static long hashInt(int input);

486

}

487

```

488

489

**Usage Examples:**

490

491

```java

492

// Hash various data types

493

long stringHash = XXH64.hashString("hello world");

494

long longHash = XXH64.hashLong(12345L);

495

long intHash = XXH64.hashInt(42);

496

497

// Hash with custom seed for consistent partitioning

498

long seed = 42L;

499

byte[] data = "test data".getBytes();

500

long customHash = XXH64.hashBytes(data, seed);

501

502

// Use for data partitioning

503

public int getPartition(String key, int numPartitions) {

504

long hash = XXH64.hashString(key);

505

return (int) (Math.abs(hash) % numPartitions);

506

}

507

```

508

509

### CharVarchar Utilities

510

511

Utilities for CHAR/VARCHAR type handling:

512

513

```java { .api }

514

public class CharVarcharCodegenUtils {

515

/**

516

* Read string with proper CHAR/VARCHAR semantics

517

*/

518

public static UTF8String readSidePadding(UTF8String input, int length);

519

520

/**

521

* Write string with proper CHAR/VARCHAR semantics

522

*/

523

public static UTF8String writeSidePadding(UTF8String input, int length);

524

525

/**

526

* Validate string length for CHAR/VARCHAR

527

*/

528

public static boolean validateLength(UTF8String input, int maxLength);

529

}

530

```

531

532

## Memory and Performance Utilities

533

534

### Memory Management Helpers

535

536

```java

537

public class MemoryUtils {

538

539

public static long estimateObjectSize(Object obj) {

540

if (obj == null) return 8; // Reference size

541

542

if (obj instanceof String) {

543

return 24 + ((String) obj).length() * 2; // Object header + char array

544

}

545

546

if (obj instanceof byte[]) {

547

return 24 + ((byte[]) obj).length; // Object header + array

548

}

549

550

if (obj instanceof int[]) {

551

return 24 + ((int[]) obj).length * 4;

552

}

553

554

if (obj instanceof long[]) {

555

return 24 + ((long[]) obj).length * 8;

556

}

557

558

return 24; // Default object header size

559

}

560

561

public static String formatBytes(long bytes) {

562

if (bytes < 1024) return bytes + " B";

563

if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0);

564

if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / (1024.0 * 1024));

565

return String.format("%.1f GB", bytes / (1024.0 * 1024 * 1024));

566

}

567

568

public static void printMemoryUsage(String prefix) {

569

Runtime runtime = Runtime.getRuntime();

570

long totalMemory = runtime.totalMemory();

571

long freeMemory = runtime.freeMemory();

572

long usedMemory = totalMemory - freeMemory;

573

long maxMemory = runtime.maxMemory();

574

575

System.out.printf("%s Memory: Used=%s, Free=%s, Total=%s, Max=%s%n",

576

prefix,

577

formatBytes(usedMemory),

578

formatBytes(freeMemory),

579

formatBytes(totalMemory),

580

formatBytes(maxMemory));

581

}

582

}

583

```

584

585

### Performance Measurement

586

587

```java

588

public class PerformanceTimer {

589

private final Map<String, Long> startTimes = new ConcurrentHashMap<>();

590

private final Map<String, Long> durations = new ConcurrentHashMap<>();

591

private final Map<String, Long> counts = new ConcurrentHashMap<>();

592

593

public void start(String operationName) {

594

startTimes.put(operationName, System.nanoTime());

595

}

596

597

public long stop(String operationName) {

598

Long startTime = startTimes.remove(operationName);

599

if (startTime == null) {

600

throw new IllegalStateException("No start time for operation: " + operationName);

601

}

602

603

long duration = System.nanoTime() - startTime;

604

durations.merge(operationName, duration, Long::sum);

605

counts.merge(operationName, 1L, Long::sum);

606

607

return duration;

608

}

609

610

public void time(String operationName, Runnable operation) {

611

start(operationName);

612

try {

613

operation.run();

614

} finally {

615

stop(operationName);

616

}

617

}

618

619

public <T> T time(String operationName, Supplier<T> operation) {

620

start(operationName);

621

try {

622

return operation.get();

623

} finally {

624

stop(operationName);

625

}

626

}

627

628

public void printStatistics() {

629

System.out.println("Performance Statistics:");

630

System.out.println("=====================");

631

632

for (String operation : durations.keySet()) {

633

long totalDuration = durations.get(operation);

634

long count = counts.get(operation);

635

long avgDuration = totalDuration / count;

636

637

System.out.printf("%-30s: Count=%6d, Total=%8.2fms, Avg=%8.2fms%n",

638

operation, count,

639

totalDuration / 1_000_000.0,

640

avgDuration / 1_000_000.0);

641

}

642

}

643

644

public void reset() {

645

startTimes.clear();

646

durations.clear();

647

counts.clear();

648

}

649

}

650

```

651

652

## Error Handling and Logging

653

654

### Custom Exceptions

655

656

```java

657

public class CatalystException extends RuntimeException {

658

private final String errorClass;

659

private final Map<String, String> messageParameters;

660

661

public CatalystException(String errorClass, String message) {

662

super(message);

663

this.errorClass = errorClass;

664

this.messageParameters = Collections.emptyMap();

665

}

666

667

public CatalystException(String errorClass, String message, Throwable cause) {

668

super(message, cause);

669

this.errorClass = errorClass;

670

this.messageParameters = Collections.emptyMap();

671

}

672

673

public CatalystException(String errorClass, String message,

674

Map<String, String> messageParameters) {

675

super(message);

676

this.errorClass = errorClass;

677

this.messageParameters = Collections.unmodifiableMap(messageParameters);

678

}

679

680

public String getErrorClass() {

681

return errorClass;

682

}

683

684

public Map<String, String> getMessageParameters() {

685

return messageParameters;

686

}

687

}

688

689

public class QueryExecutionException extends CatalystException {

690

public QueryExecutionException(String message) {

691

super("QUERY_EXECUTION_ERROR", message);

692

}

693

694

public QueryExecutionException(String message, Throwable cause) {

695

super("QUERY_EXECUTION_ERROR", message, cause);

696

}

697

}

698

```

699

700

### Utility Logger

701

702

```java

703

public class CatalystLogger {

704

private static final Logger logger = LoggerFactory.getLogger(CatalystLogger.class);

705

706

public static void logInfo(String message, Object... args) {

707

if (logger.isInfoEnabled()) {

708

logger.info(String.format(message, args));

709

}

710

}

711

712

public static void logWarning(String message, Object... args) {

713

if (logger.isWarnEnabled()) {

714

logger.warn(String.format(message, args));

715

}

716

}

717

718

public static void logError(String message, Throwable throwable, Object... args) {

719

if (logger.isErrorEnabled()) {

720

logger.error(String.format(message, args), throwable);

721

}

722

}

723

724

public static void logDebug(String message, Object... args) {

725

if (logger.isDebugEnabled()) {

726

logger.debug(String.format(message, args));

727

}

728

}

729

730

public static <T> T logTiming(String operation, Supplier<T> supplier) {

731

long startTime = System.currentTimeMillis();

732

try {

733

T result = supplier.get();

734

long duration = System.currentTimeMillis() - startTime;

735

logInfo("Operation %s completed in %d ms", operation, duration);

736

return result;

737

} catch (Exception e) {

738

long duration = System.currentTimeMillis() - startTime;

739

logError("Operation %s failed after %d ms", e, operation, duration);

740

throw e;

741

}

742

}

743

}

744

```

745

746

These utilities and helpers provide essential building blocks for working with Apache Spark Catalyst, offering practical solutions for common tasks like configuration management, data type handling, performance monitoring, and error handling.