0
# Utility Classes
1
2
Collection of utility classes providing common functionality for JSON processing, class loading, collections manipulation, serialization, file operations, and network utilities used throughout the Spark ecosystem.
3
4
## Capabilities
5
6
### JSON Utilities
7
8
JSON serialization and processing utilities using Jackson library for consistent JSON handling across Spark components.
9
10
```scala { .api }
11
/**
12
* JSON processing utilities using Jackson ObjectMapper
13
* Provides standardized JSON serialization for Spark components
14
*/
15
private[spark] trait JsonUtils {
16
/** Jackson ObjectMapper configured for Spark use */
17
protected val mapper: ObjectMapper
18
19
/**
20
* Convert data to JSON string using a generator block
21
* @param block Function that writes JSON using JsonGenerator
22
* @return JSON string representation
23
*/
24
def toJsonString(block: JsonGenerator => Unit): String
25
}
26
27
private[spark] object JsonUtils extends JsonUtils
28
```
29
30
**Usage Examples:**
31
32
```scala
33
import org.apache.spark.util.JsonUtils
34
35
// Simple JSON generation
36
val jsonString = JsonUtils.toJsonString { generator =>
37
generator.writeStartObject()
38
generator.writeStringField("name", "John Doe")
39
generator.writeNumberField("age", 30)
40
generator.writeBooleanField("active", true)
41
generator.writeEndObject()
42
}
43
// Result: {"name":"John Doe","age":30,"active":true}
44
45
// Complex JSON structure
46
val configJson = JsonUtils.toJsonString { generator =>
47
generator.writeStartObject()
48
generator.writeStringField("appName", "MySparkApp")
49
50
generator.writeArrayFieldStart("executors")
51
for (i <- 1 to 3) {
52
generator.writeStartObject()
53
generator.writeStringField("id", s"executor-$i")
54
generator.writeNumberField("cores", 4)
55
generator.writeStringField("memory", "2g")
56
generator.writeEndObject()
57
}
58
generator.writeEndArray()
59
60
generator.writeEndObject()
61
}
62
63
// JSON for logging and debugging
64
case class TaskInfo(id: String, stage: Int, partition: Int)
65
66
def taskInfoToJson(task: TaskInfo): String = {
67
JsonUtils.toJsonString { generator =>
68
generator.writeStartObject()
69
generator.writeStringField("taskId", task.id)
70
generator.writeNumberField("stageId", task.stage)
71
generator.writeNumberField("partitionId", task.partition)
72
generator.writeEndObject()
73
}
74
}
75
```
76
77
### Class Loading Utilities
78
79
Class loading and reflection utilities providing safe and consistent class loading across different environments and class loaders.
80
81
```scala { .api }
82
/**
83
* Class loading and reflection utilities for Spark
84
* Handles class loading in distributed environments with proper fallbacks
85
*/
86
private[spark] trait SparkClassUtils {
87
/** Random instance for various utility operations */
88
val random: Random
89
90
/** Get the Spark class loader */
91
def getSparkClassLoader: ClassLoader
92
93
/** Get context class loader with Spark fallback */
94
def getContextOrSparkClassLoader: ClassLoader
95
96
/**
97
* Load class by name with proper class loader handling
98
* @param className Fully qualified class name
99
* @param initialize Whether to initialize the class
100
* @param noSparkClassLoader Whether to avoid Spark class loader
101
* @return Loaded class
102
*/
103
def classForName[C](
104
className: String,
105
initialize: Boolean = true,
106
noSparkClassLoader: Boolean = false
107
): Class[C]
108
109
/**
110
* Check if a class can be loaded
111
* @param clazz Class name to check
112
* @return true if class is loadable
113
*/
114
def classIsLoadable(clazz: String): Boolean
115
}
116
117
private[spark] object SparkClassUtils extends SparkClassUtils
118
```
119
120
**Usage Examples:**
121
122
```scala
123
import org.apache.spark.util.SparkClassUtils
124
125
// Safe class loading
126
try {
127
val clazz = SparkClassUtils.classForName[MyCustomSerializer](
128
"com.example.MyCustomSerializer"
129
)
130
val instance = clazz.getDeclaredConstructor().newInstance()
131
} catch {
132
case _: ClassNotFoundException =>
133
logWarning("Custom serializer not found, using default")
134
// Fallback logic
135
}
136
137
// Check class availability
138
if (SparkClassUtils.classIsLoadable("org.apache.hadoop.fs.FileSystem")) {
139
logInfo("Hadoop FileSystem available")
140
// Use Hadoop filesystem
141
} else {
142
logWarning("Hadoop not in classpath, using local filesystem")
143
// Fallback to local filesystem
144
}
145
146
// Class loader hierarchy inspection
147
val sparkClassLoader = SparkClassUtils.getSparkClassLoader
148
val contextClassLoader = SparkClassUtils.getContextOrSparkClassLoader
149
150
logDebug(s"Spark ClassLoader: $sparkClassLoader")
151
logDebug(s"Context ClassLoader: $contextClassLoader")
152
153
// Plugin loading pattern
154
def loadPlugin[T](pluginClassName: String, baseClass: Class[T]): Option[T] = {
155
try {
156
val pluginClass = SparkClassUtils.classForName[T](pluginClassName)
157
if (baseClass.isAssignableFrom(pluginClass)) {
158
Some(pluginClass.getDeclaredConstructor().newInstance())
159
} else {
160
logError(s"Plugin $pluginClassName does not extend ${baseClass.getName}")
161
None
162
}
163
} catch {
164
case e: Exception =>
165
logError(s"Failed to load plugin $pluginClassName", e)
166
None
167
}
168
}
169
```
170
171
### Collection Utilities
172
173
Collection manipulation utilities providing performance-optimized operations for common data structure transformations.
174
175
```scala { .api }
176
/**
177
* Collection utility methods for performance-optimized operations
178
* Provides alternatives to standard library methods with better performance characteristics
179
*/
180
private[spark] trait SparkCollectionUtils {
181
/**
182
* Create indexed map from keys with better performance than zipWithIndex.toMap
183
* @param keys Iterable of keys
184
* @return Map from key to index
185
*/
186
def toMapWithIndex[K](keys: Iterable[K]): Map[K, Int]
187
}
188
189
private[spark] object SparkCollectionUtils extends SparkCollectionUtils
190
```
191
192
**Usage Examples:**
193
194
```scala
195
import org.apache.spark.util.SparkCollectionUtils
196
197
// Efficient key indexing
198
val columnNames = Seq("id", "name", "age", "email", "department")
199
val columnIndexMap = SparkCollectionUtils.toMapWithIndex(columnNames)
200
// Result: Map("id" -> 0, "name" -> 1, "age" -> 2, "email" -> 3, "department" -> 4)
201
202
// Use in schema processing
203
case class Schema(fields: Seq[String]) {
204
lazy val fieldIndexMap: Map[String, Int] =
205
SparkCollectionUtils.toMapWithIndex(fields)
206
207
def getFieldIndex(fieldName: String): Option[Int] =
208
fieldIndexMap.get(fieldName)
209
}
210
211
val schema = Schema(Seq("user_id", "timestamp", "event_type", "properties"))
212
val timestampIndex = schema.getFieldIndex("timestamp") // Some(1)
213
214
// Performance comparison demonstration
215
def compareIndexingMethods[K](keys: Seq[K]): Unit = {
216
val start1 = System.nanoTime()
217
val map1 = keys.zipWithIndex.toMap
218
val time1 = System.nanoTime() - start1
219
220
val start2 = System.nanoTime()
221
val map2 = SparkCollectionUtils.toMapWithIndex(keys)
222
val time2 = System.nanoTime() - start2
223
224
logInfo(s"zipWithIndex.toMap: ${time1}ns")
225
logInfo(s"toMapWithIndex: ${time2}ns")
226
logInfo(s"Performance improvement: ${time1.toDouble / time2}x")
227
}
228
```
229
230
### Network Utilities
231
232
Network-related utilities including byte unit conversions and Java utility methods for network operations.
233
234
```java { .api }
235
/**
236
* Byte unit enumeration for size conversions
237
* Provides binary unit conversions (powers of 2)
238
*/
239
public enum ByteUnit {
240
BYTE(1),
241
KiB(1L << 10), // 1024 bytes
242
MiB(1L << 20), // 1024^2 bytes
243
GiB(1L << 30), // 1024^3 bytes
244
TiB(1L << 40), // 1024^4 bytes
245
PiB(1L << 50); // 1024^5 bytes
246
247
/**
248
* Convert from another unit to this unit
249
* @param d Value in source unit
250
* @param u Source unit
251
* @return Value converted to this unit
252
*/
253
public long convertFrom(long d, ByteUnit u);
254
255
/**
256
* Convert from this unit to another unit
257
* @param d Value in this unit
258
* @param u Target unit
259
* @return Value converted to target unit
260
*/
261
public long convertTo(long d, ByteUnit u);
262
263
/** Convert to bytes */
264
public long toBytes(long d);
265
266
/** Convert to kibibytes */
267
public long toKiB(long d);
268
269
/** Convert to mebibytes */
270
public long toMiB(long d);
271
272
/** Convert to gibibytes */
273
public long toGiB(long d);
274
275
/** Convert to tebibytes */
276
public long toTiB(long d);
277
278
/** Convert to pebibytes */
279
public long toPiB(long d);
280
}
281
282
/**
283
* Java utility methods for network operations
284
*/
285
public class JavaUtils {
286
// Network-related utility methods
287
}
288
```
289
290
**Usage Examples:**
291
292
```java
293
import org.apache.spark.network.util.ByteUnit;
294
295
// Memory size calculations
296
long memoryInBytes = 8L * 1024 * 1024 * 1024; // 8 GB
297
long memoryInGiB = ByteUnit.BYTE.toGiB(memoryInBytes); // 8
298
299
// Configuration parsing
300
String configValue = "512m";
301
long configBytes;
302
if (configValue.endsWith("k") || configValue.endsWith("K")) {
303
long value = Long.parseLong(configValue.substring(0, configValue.length() - 1));
304
configBytes = ByteUnit.KiB.toBytes(value);
305
} else if (configValue.endsWith("m") || configValue.endsWith("M")) {
306
long value = Long.parseLong(configValue.substring(0, configValue.length() - 1));
307
configBytes = ByteUnit.MiB.toBytes(value);
308
} else if (configValue.endsWith("g") || configValue.endsWith("G")) {
309
long value = Long.parseLong(configValue.substring(0, configValue.length() - 1));
310
configBytes = ByteUnit.GiB.toBytes(value);
311
}
312
313
// Buffer size optimization
314
public class BufferSizeCalculator {
315
public static long calculateOptimalBufferSize(long dataSize) {
316
// Use different buffer sizes based on data size
317
if (dataSize < ByteUnit.MiB.toBytes(10)) {
318
return ByteUnit.KiB.toBytes(64); // 64 KiB for small data
319
} else if (dataSize < ByteUnit.GiB.toBytes(1)) {
320
return ByteUnit.MiB.toBytes(1); // 1 MiB for medium data
321
} else {
322
return ByteUnit.MiB.toBytes(8); // 8 MiB for large data
323
}
324
}
325
}
326
327
// Memory usage reporting
328
public class MemoryReporter {
329
public void reportMemoryUsage(long usedMemory, long totalMemory) {
330
double usageRatio = (double) usedMemory / totalMemory;
331
332
String usedFormatted = formatBytes(usedMemory);
333
String totalFormatted = formatBytes(totalMemory);
334
335
System.out.printf("Memory usage: %s / %s (%.1f%%)%n",
336
usedFormatted, totalFormatted, usageRatio * 100);
337
}
338
339
private String formatBytes(long bytes) {
340
if (bytes >= ByteUnit.PiB.toBytes(1)) {
341
return String.format("%.1f PiB", ByteUnit.BYTE.toPiB(bytes));
342
} else if (bytes >= ByteUnit.TiB.toBytes(1)) {
343
return String.format("%.1f TiB", ByteUnit.BYTE.toTiB(bytes));
344
} else if (bytes >= ByteUnit.GiB.toBytes(1)) {
345
return String.format("%.1f GiB", ByteUnit.BYTE.toGiB(bytes));
346
} else if (bytes >= ByteUnit.MiB.toBytes(1)) {
347
return String.format("%.1f MiB", ByteUnit.BYTE.toMiB(bytes));
348
} else if (bytes >= ByteUnit.KiB.toBytes(1)) {
349
return String.format("%.1f KiB", ByteUnit.BYTE.toKiB(bytes));
350
} else {
351
return bytes + " bytes";
352
}
353
}
354
}
355
```
356
357
### Error Handling Utilities
358
359
Error handling and exception management utilities for consistent error processing across Spark components.
360
361
```scala { .api }
362
/**
363
* Error handling utilities for Spark operations
364
* Provides consistent error handling patterns and exception management
365
*/
366
private[spark] object SparkErrorUtils {
367
/**
368
* Execute operation with proper error handling for IO operations
369
* @param block Operation to execute
370
* @return Result of operation
371
* @throws IOException If operation fails
372
*/
373
def tryOrIOException[T](block: => T): T
374
}
375
376
/**
377
* Fatal exception for non-recoverable errors
378
* Used to indicate errors that should terminate the application
379
*/
380
private[spark] class SparkFatalException(message: String, cause: Throwable = null)
381
extends RuntimeException(message, cause)
382
```
383
384
**Usage Examples:**
385
386
```scala
387
import org.apache.spark.util.{SparkErrorUtils, SparkFatalException}
388
389
// Safe IO operations
390
def readConfigFile(path: String): Properties = {
391
SparkErrorUtils.tryOrIOException {
392
val props = new Properties()
393
val stream = new FileInputStream(path)
394
try {
395
props.load(stream)
396
props
397
} finally {
398
stream.close()
399
}
400
}
401
}
402
403
// Fatal error handling
404
def validateCriticalConfiguration(config: Map[String, String]): Unit = {
405
val requiredKeys = Set("spark.app.name", "spark.master")
406
val missingKeys = requiredKeys -- config.keySet
407
408
if (missingKeys.nonEmpty) {
409
throw new SparkFatalException(
410
s"Missing required configuration keys: ${missingKeys.mkString(", ")}"
411
)
412
}
413
}
414
415
// Resource management with error handling
416
class ResourceManager {
417
def withResource[T, R](resource: T)(cleanup: T => Unit)(operation: T => R): R = {
418
try {
419
operation(resource)
420
} catch {
421
case e: Exception =>
422
logError("Operation failed, cleaning up resource", e)
423
throw e
424
} finally {
425
try {
426
cleanup(resource)
427
} catch {
428
case e: Exception =>
429
logError("Failed to cleanup resource", e)
430
// Don't suppress original exception
431
}
432
}
433
}
434
}
435
```
436
437
### Additional Utility Objects
438
439
Other specialized utility objects for specific domains within Spark.
440
441
```scala { .api }
442
/** File operation utilities */
443
private[spark] object SparkFileUtils {
444
// File system operations and path handling utilities
445
}
446
447
/** Schema validation and utility methods */
448
private[spark] object SparkSchemaUtils {
449
// Schema comparison, validation, and transformation utilities
450
}
451
452
/** Serialization/deserialization utilities */
453
private[spark] object SparkSerDeUtils {
454
// Serialization utilities for distributed computing
455
}
456
457
/** Thread and executor utilities */
458
private[spark] object SparkThreadUtils {
459
// Thread pool management and concurrent execution utilities
460
}
461
462
/** Array utilities for low-level operations */
463
public class ByteArrayUtils {
464
// Unsafe array operations for performance-critical code
465
}
466
```
467
468
## Integration Patterns
469
470
### Utility Composition
471
472
```scala
473
class SparkDataProcessor extends Logging {
474
def processData(inputPath: String): Unit = {
475
// Use multiple utilities together
476
val config = SparkErrorUtils.tryOrIOException {
477
loadConfiguration(inputPath)
478
}
479
480
val schema = parseSchema(config)
481
val fieldIndexMap = SparkCollectionUtils.toMapWithIndex(schema.fields)
482
483
logInfo(s"Processing data with schema: ${JsonUtils.toJsonString { gen =>
484
gen.writeStartObject()
485
gen.writeArrayFieldStart("fields")
486
schema.fields.foreach(gen.writeString)
487
gen.writeEndArray()
488
gen.writeEndObject()
489
}}")
490
}
491
}
492
```
493
494
### Performance Optimization
495
496
```scala
497
class PerformanceOptimizedProcessor {
498
// Use utility classes for optimal performance
499
def optimizeCollectionOperations[K](keys: Seq[K]): Map[K, Int] = {
500
// Prefer SparkCollectionUtils over standard library for large collections
501
if (keys.size > 1000) {
502
SparkCollectionUtils.toMapWithIndex(keys)
503
} else {
504
keys.zipWithIndex.toMap // Standard library is fine for small collections
505
}
506
}
507
508
def optimizeClassLoading(className: String): Boolean = {
509
// Cache class availability checks
510
SparkClassUtils.classIsLoadable(className)
511
}
512
}