or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

build-info.mdexception-handling.mdindex.mdjava-functions.mdlogging.mdstorage-management.mdutilities.md

utilities.mddocs/

0

# Utility Classes

1

2

Collection of utility classes providing common functionality for JSON processing, class loading, collections manipulation, serialization, file operations, and network utilities used throughout the Spark ecosystem.

3

4

## Capabilities

5

6

### JSON Utilities

7

8

JSON serialization and processing utilities using Jackson library for consistent JSON handling across Spark components.

9

10

```scala { .api }

11

/**

12

* JSON processing utilities using Jackson ObjectMapper

13

* Provides standardized JSON serialization for Spark components

14

*/

15

private[spark] trait JsonUtils {

16

/** Jackson ObjectMapper configured for Spark use */

17

protected val mapper: ObjectMapper

18

19

/**

20

* Convert data to JSON string using a generator block

21

* @param block Function that writes JSON using JsonGenerator

22

* @return JSON string representation

23

*/

24

def toJsonString(block: JsonGenerator => Unit): String

25

}

26

27

private[spark] object JsonUtils extends JsonUtils

28

```

29

30

**Usage Examples:**

31

32

```scala

33

import org.apache.spark.util.JsonUtils

34

35

// Simple JSON generation

36

val jsonString = JsonUtils.toJsonString { generator =>

37

generator.writeStartObject()

38

generator.writeStringField("name", "John Doe")

39

generator.writeNumberField("age", 30)

40

generator.writeBooleanField("active", true)

41

generator.writeEndObject()

42

}

43

// Result: {"name":"John Doe","age":30,"active":true}

44

45

// Complex JSON structure

46

val configJson = JsonUtils.toJsonString { generator =>

47

generator.writeStartObject()

48

generator.writeStringField("appName", "MySparkApp")

49

50

generator.writeArrayFieldStart("executors")

51

for (i <- 1 to 3) {

52

generator.writeStartObject()

53

generator.writeStringField("id", s"executor-$i")

54

generator.writeNumberField("cores", 4)

55

generator.writeStringField("memory", "2g")

56

generator.writeEndObject()

57

}

58

generator.writeEndArray()

59

60

generator.writeEndObject()

61

}

62

63

// JSON for logging and debugging

64

case class TaskInfo(id: String, stage: Int, partition: Int)

65

66

def taskInfoToJson(task: TaskInfo): String = {

67

JsonUtils.toJsonString { generator =>

68

generator.writeStartObject()

69

generator.writeStringField("taskId", task.id)

70

generator.writeNumberField("stageId", task.stage)

71

generator.writeNumberField("partitionId", task.partition)

72

generator.writeEndObject()

73

}

74

}

75

```

76

77

### Class Loading Utilities

78

79

Class loading and reflection utilities providing safe and consistent class loading across different environments and class loaders.

80

81

```scala { .api }

82

/**

83

* Class loading and reflection utilities for Spark

84

* Handles class loading in distributed environments with proper fallbacks

85

*/

86

private[spark] trait SparkClassUtils {

87

/** Random instance for various utility operations */

88

val random: Random

89

90

/** Get the Spark class loader */

91

def getSparkClassLoader: ClassLoader

92

93

/** Get context class loader with Spark fallback */

94

def getContextOrSparkClassLoader: ClassLoader

95

96

/**

97

* Load class by name with proper class loader handling

98

* @param className Fully qualified class name

99

* @param initialize Whether to initialize the class

100

* @param noSparkClassLoader Whether to avoid Spark class loader

101

* @return Loaded class

102

*/

103

def classForName[C](

104

className: String,

105

initialize: Boolean = true,

106

noSparkClassLoader: Boolean = false

107

): Class[C]

108

109

/**

110

* Check if a class can be loaded

111

* @param clazz Class name to check

112

* @return true if class is loadable

113

*/

114

def classIsLoadable(clazz: String): Boolean

115

}

116

117

private[spark] object SparkClassUtils extends SparkClassUtils

118

```

119

120

**Usage Examples:**

121

122

```scala

123

import org.apache.spark.util.SparkClassUtils

124

125

// Safe class loading

126

try {

127

val clazz = SparkClassUtils.classForName[MyCustomSerializer](

128

"com.example.MyCustomSerializer"

129

)

130

val instance = clazz.getDeclaredConstructor().newInstance()

131

} catch {

132

case _: ClassNotFoundException =>

133

logWarning("Custom serializer not found, using default")

134

// Fallback logic

135

}

136

137

// Check class availability

138

if (SparkClassUtils.classIsLoadable("org.apache.hadoop.fs.FileSystem")) {

139

logInfo("Hadoop FileSystem available")

140

// Use Hadoop filesystem

141

} else {

142

logWarning("Hadoop not in classpath, using local filesystem")

143

// Fallback to local filesystem

144

}

145

146

// Class loader hierarchy inspection

147

val sparkClassLoader = SparkClassUtils.getSparkClassLoader

148

val contextClassLoader = SparkClassUtils.getContextOrSparkClassLoader

149

150

logDebug(s"Spark ClassLoader: $sparkClassLoader")

151

logDebug(s"Context ClassLoader: $contextClassLoader")

152

153

// Plugin loading pattern

154

def loadPlugin[T](pluginClassName: String, baseClass: Class[T]): Option[T] = {

155

try {

156

val pluginClass = SparkClassUtils.classForName[T](pluginClassName)

157

if (baseClass.isAssignableFrom(pluginClass)) {

158

Some(pluginClass.getDeclaredConstructor().newInstance())

159

} else {

160

logError(s"Plugin $pluginClassName does not extend ${baseClass.getName}")

161

None

162

}

163

} catch {

164

case e: Exception =>

165

logError(s"Failed to load plugin $pluginClassName", e)

166

None

167

}

168

}

169

```

170

171

### Collection Utilities

172

173

Collection manipulation utilities providing performance-optimized operations for common data structure transformations.

174

175

```scala { .api }

176

/**

177

* Collection utility methods for performance-optimized operations

178

* Provides alternatives to standard library methods with better performance characteristics

179

*/

180

private[spark] trait SparkCollectionUtils {

181

/**

182

* Create indexed map from keys with better performance than zipWithIndex.toMap

183

* @param keys Iterable of keys

184

* @return Map from key to index

185

*/

186

def toMapWithIndex[K](keys: Iterable[K]): Map[K, Int]

187

}

188

189

private[spark] object SparkCollectionUtils extends SparkCollectionUtils

190

```

191

192

**Usage Examples:**

193

194

```scala

195

import org.apache.spark.util.SparkCollectionUtils

196

197

// Efficient key indexing

198

val columnNames = Seq("id", "name", "age", "email", "department")

199

val columnIndexMap = SparkCollectionUtils.toMapWithIndex(columnNames)

200

// Result: Map("id" -> 0, "name" -> 1, "age" -> 2, "email" -> 3, "department" -> 4)

201

202

// Use in schema processing

203

case class Schema(fields: Seq[String]) {

204

lazy val fieldIndexMap: Map[String, Int] =

205

SparkCollectionUtils.toMapWithIndex(fields)

206

207

def getFieldIndex(fieldName: String): Option[Int] =

208

fieldIndexMap.get(fieldName)

209

}

210

211

val schema = Schema(Seq("user_id", "timestamp", "event_type", "properties"))

212

val timestampIndex = schema.getFieldIndex("timestamp") // Some(1)

213

214

// Performance comparison demonstration

215

def compareIndexingMethods[K](keys: Seq[K]): Unit = {

216

val start1 = System.nanoTime()

217

val map1 = keys.zipWithIndex.toMap

218

val time1 = System.nanoTime() - start1

219

220

val start2 = System.nanoTime()

221

val map2 = SparkCollectionUtils.toMapWithIndex(keys)

222

val time2 = System.nanoTime() - start2

223

224

logInfo(s"zipWithIndex.toMap: ${time1}ns")

225

logInfo(s"toMapWithIndex: ${time2}ns")

226

logInfo(s"Performance improvement: ${time1.toDouble / time2}x")

227

}

228

```

229

230

### Network Utilities

231

232

Network-related utilities including byte unit conversions and Java utility methods for network operations.

233

234

```java { .api }

235

/**

236

* Byte unit enumeration for size conversions

237

* Provides binary unit conversions (powers of 2)

238

*/

239

public enum ByteUnit {

240

BYTE(1),

241

KiB(1L << 10), // 1024 bytes

242

MiB(1L << 20), // 1024^2 bytes

243

GiB(1L << 30), // 1024^3 bytes

244

TiB(1L << 40), // 1024^4 bytes

245

PiB(1L << 50); // 1024^5 bytes

246

247

/**

248

* Convert from another unit to this unit

249

* @param d Value in source unit

250

* @param u Source unit

251

* @return Value converted to this unit

252

*/

253

public long convertFrom(long d, ByteUnit u);

254

255

/**

256

* Convert from this unit to another unit

257

* @param d Value in this unit

258

* @param u Target unit

259

* @return Value converted to target unit

260

*/

261

public long convertTo(long d, ByteUnit u);

262

263

/** Convert to bytes */

264

public long toBytes(long d);

265

266

/** Convert to kibibytes */

267

public long toKiB(long d);

268

269

/** Convert to mebibytes */

270

public long toMiB(long d);

271

272

/** Convert to gibibytes */

273

public long toGiB(long d);

274

275

/** Convert to tebibytes */

276

public long toTiB(long d);

277

278

/** Convert to pebibytes */

279

public long toPiB(long d);

280

}

281

282

/**

283

* Java utility methods for network operations

284

*/

285

public class JavaUtils {

286

// Network-related utility methods

287

}

288

```

289

290

**Usage Examples:**

291

292

```java

293

import org.apache.spark.network.util.ByteUnit;

294

295

// Memory size calculations

296

long memoryInBytes = 8L * 1024 * 1024 * 1024; // 8 GB

297

long memoryInGiB = ByteUnit.BYTE.toGiB(memoryInBytes); // 8

298

299

// Configuration parsing

300

String configValue = "512m";

301

long configBytes;

302

if (configValue.endsWith("k") || configValue.endsWith("K")) {

303

long value = Long.parseLong(configValue.substring(0, configValue.length() - 1));

304

configBytes = ByteUnit.KiB.toBytes(value);

305

} else if (configValue.endsWith("m") || configValue.endsWith("M")) {

306

long value = Long.parseLong(configValue.substring(0, configValue.length() - 1));

307

configBytes = ByteUnit.MiB.toBytes(value);

308

} else if (configValue.endsWith("g") || configValue.endsWith("G")) {

309

long value = Long.parseLong(configValue.substring(0, configValue.length() - 1));

310

configBytes = ByteUnit.GiB.toBytes(value);

311

}

312

313

// Buffer size optimization

314

public class BufferSizeCalculator {

315

public static long calculateOptimalBufferSize(long dataSize) {

316

// Use different buffer sizes based on data size

317

if (dataSize < ByteUnit.MiB.toBytes(10)) {

318

return ByteUnit.KiB.toBytes(64); // 64 KiB for small data

319

} else if (dataSize < ByteUnit.GiB.toBytes(1)) {

320

return ByteUnit.MiB.toBytes(1); // 1 MiB for medium data

321

} else {

322

return ByteUnit.MiB.toBytes(8); // 8 MiB for large data

323

}

324

}

325

}

326

327

// Memory usage reporting

328

public class MemoryReporter {

329

public void reportMemoryUsage(long usedMemory, long totalMemory) {

330

double usageRatio = (double) usedMemory / totalMemory;

331

332

String usedFormatted = formatBytes(usedMemory);

333

String totalFormatted = formatBytes(totalMemory);

334

335

System.out.printf("Memory usage: %s / %s (%.1f%%)%n",

336

usedFormatted, totalFormatted, usageRatio * 100);

337

}

338

339

private String formatBytes(long bytes) {

340

if (bytes >= ByteUnit.PiB.toBytes(1)) {

341

return String.format("%.1f PiB", ByteUnit.BYTE.toPiB(bytes));

342

} else if (bytes >= ByteUnit.TiB.toBytes(1)) {

343

return String.format("%.1f TiB", ByteUnit.BYTE.toTiB(bytes));

344

} else if (bytes >= ByteUnit.GiB.toBytes(1)) {

345

return String.format("%.1f GiB", ByteUnit.BYTE.toGiB(bytes));

346

} else if (bytes >= ByteUnit.MiB.toBytes(1)) {

347

return String.format("%.1f MiB", ByteUnit.BYTE.toMiB(bytes));

348

} else if (bytes >= ByteUnit.KiB.toBytes(1)) {

349

return String.format("%.1f KiB", ByteUnit.BYTE.toKiB(bytes));

350

} else {

351

return bytes + " bytes";

352

}

353

}

354

}

355

```

356

357

### Error Handling Utilities

358

359

Error handling and exception management utilities for consistent error processing across Spark components.

360

361

```scala { .api }

362

/**

363

* Error handling utilities for Spark operations

364

* Provides consistent error handling patterns and exception management

365

*/

366

private[spark] object SparkErrorUtils {

367

/**

368

* Execute operation with proper error handling for IO operations

369

* @param block Operation to execute

370

* @return Result of operation

371

* @throws IOException If operation fails

372

*/

373

def tryOrIOException[T](block: => T): T

374

}

375

376

/**

377

* Fatal exception for non-recoverable errors

378

* Used to indicate errors that should terminate the application

379

*/

380

private[spark] class SparkFatalException(message: String, cause: Throwable = null)

381

extends RuntimeException(message, cause)

382

```

383

384

**Usage Examples:**

385

386

```scala

387

import org.apache.spark.util.{SparkErrorUtils, SparkFatalException}

388

389

// Safe IO operations

390

def readConfigFile(path: String): Properties = {

391

SparkErrorUtils.tryOrIOException {

392

val props = new Properties()

393

val stream = new FileInputStream(path)

394

try {

395

props.load(stream)

396

props

397

} finally {

398

stream.close()

399

}

400

}

401

}

402

403

// Fatal error handling

404

def validateCriticalConfiguration(config: Map[String, String]): Unit = {

405

val requiredKeys = Set("spark.app.name", "spark.master")

406

val missingKeys = requiredKeys -- config.keySet

407

408

if (missingKeys.nonEmpty) {

409

throw new SparkFatalException(

410

s"Missing required configuration keys: ${missingKeys.mkString(", ")}"

411

)

412

}

413

}

414

415

// Resource management with error handling

416

class ResourceManager {

417

def withResource[T, R](resource: T)(cleanup: T => Unit)(operation: T => R): R = {

418

try {

419

operation(resource)

420

} catch {

421

case e: Exception =>

422

logError("Operation failed, cleaning up resource", e)

423

throw e

424

} finally {

425

try {

426

cleanup(resource)

427

} catch {

428

case e: Exception =>

429

logError("Failed to cleanup resource", e)

430

// Don't suppress original exception

431

}

432

}

433

}

434

}

435

```

436

437

### Additional Utility Objects

438

439

Other specialized utility objects for specific domains within Spark.

440

441

```scala { .api }

442

/** File operation utilities */

443

private[spark] object SparkFileUtils {

444

// File system operations and path handling utilities

445

}

446

447

/** Schema validation and utility methods */

448

private[spark] object SparkSchemaUtils {

449

// Schema comparison, validation, and transformation utilities

450

}

451

452

/** Serialization/deserialization utilities */

453

private[spark] object SparkSerDeUtils {

454

// Serialization utilities for distributed computing

455

}

456

457

/** Thread and executor utilities */

458

private[spark] object SparkThreadUtils {

459

// Thread pool management and concurrent execution utilities

460

}

461

462

/** Array utilities for low-level operations */

463

public class ByteArrayUtils {

464

// Unsafe array operations for performance-critical code

465

}

466

```

467

468

## Integration Patterns

469

470

### Utility Composition

471

472

```scala

473

class SparkDataProcessor extends Logging {

474

def processData(inputPath: String): Unit = {

475

// Use multiple utilities together

476

val config = SparkErrorUtils.tryOrIOException {

477

loadConfiguration(inputPath)

478

}

479

480

val schema = parseSchema(config)

481

val fieldIndexMap = SparkCollectionUtils.toMapWithIndex(schema.fields)

482

483

logInfo(s"Processing data with schema: ${JsonUtils.toJsonString { gen =>

484

gen.writeStartObject()

485

gen.writeArrayFieldStart("fields")

486

schema.fields.foreach(gen.writeString)

487

gen.writeEndArray()

488

gen.writeEndObject()

489

}}")

490

}

491

}

492

```

493

494

### Performance Optimization

495

496

```scala

497

class PerformanceOptimizedProcessor {

498

// Use utility classes for optimal performance

499

def optimizeCollectionOperations[K](keys: Seq[K]): Map[K, Int] = {

500

// Prefer SparkCollectionUtils over standard library for large collections

501

if (keys.size > 1000) {

502

SparkCollectionUtils.toMapWithIndex(keys)

503

} else {

504

keys.zipWithIndex.toMap // Standard library is fine for small collections

505

}

506

}

507

508

def optimizeClassLoading(className: String): Boolean = {

509

// Cache class availability checks

510

SparkClassUtils.classIsLoadable(className)

511

}

512

}