Collection of utility classes providing common functionality for JSON processing, class loading, collections manipulation, serialization, file operations, and network utilities used throughout the Spark ecosystem.
JSON serialization and processing utilities using Jackson library for consistent JSON handling across Spark components.
/**
* JSON processing utilities using Jackson ObjectMapper
* Provides standardized JSON serialization for Spark components
*/
private[spark] trait JsonUtils {
/** Jackson ObjectMapper configured for Spark use */
protected val mapper: ObjectMapper
/**
* Convert data to JSON string using a generator block
* @param block Function that writes JSON using JsonGenerator
* @return JSON string representation
*/
def toJsonString(block: JsonGenerator => Unit): String
}
private[spark] object JsonUtils extends JsonUtilsUsage Examples:
import org.apache.spark.util.JsonUtils
// Simple JSON generation
val jsonString = JsonUtils.toJsonString { generator =>
generator.writeStartObject()
generator.writeStringField("name", "John Doe")
generator.writeNumberField("age", 30)
generator.writeBooleanField("active", true)
generator.writeEndObject()
}
// Result: {"name":"John Doe","age":30,"active":true}
// Complex JSON structure
val configJson = JsonUtils.toJsonString { generator =>
generator.writeStartObject()
generator.writeStringField("appName", "MySparkApp")
generator.writeArrayFieldStart("executors")
for (i <- 1 to 3) {
generator.writeStartObject()
generator.writeStringField("id", s"executor-$i")
generator.writeNumberField("cores", 4)
generator.writeStringField("memory", "2g")
generator.writeEndObject()
}
generator.writeEndArray()
generator.writeEndObject()
}
// JSON for logging and debugging
case class TaskInfo(id: String, stage: Int, partition: Int)
def taskInfoToJson(task: TaskInfo): String = {
JsonUtils.toJsonString { generator =>
generator.writeStartObject()
generator.writeStringField("taskId", task.id)
generator.writeNumberField("stageId", task.stage)
generator.writeNumberField("partitionId", task.partition)
generator.writeEndObject()
}
}Class loading and reflection utilities providing safe and consistent class loading across different environments and class loaders.
/**
* Class loading and reflection utilities for Spark
* Handles class loading in distributed environments with proper fallbacks
*/
private[spark] trait SparkClassUtils {
/** Random instance for various utility operations */
val random: Random
/** Get the Spark class loader */
def getSparkClassLoader: ClassLoader
/** Get context class loader with Spark fallback */
def getContextOrSparkClassLoader: ClassLoader
/**
* Load class by name with proper class loader handling
* @param className Fully qualified class name
* @param initialize Whether to initialize the class
* @param noSparkClassLoader Whether to avoid Spark class loader
* @return Loaded class
*/
def classForName[C](
className: String,
initialize: Boolean = true,
noSparkClassLoader: Boolean = false
): Class[C]
/**
* Check if a class can be loaded
* @param clazz Class name to check
* @return true if class is loadable
*/
def classIsLoadable(clazz: String): Boolean
}
private[spark] object SparkClassUtils extends SparkClassUtilsUsage Examples:
import org.apache.spark.util.SparkClassUtils
// Safe class loading
try {
val clazz = SparkClassUtils.classForName[MyCustomSerializer](
"com.example.MyCustomSerializer"
)
val instance = clazz.getDeclaredConstructor().newInstance()
} catch {
case _: ClassNotFoundException =>
logWarning("Custom serializer not found, using default")
// Fallback logic
}
// Check class availability
if (SparkClassUtils.classIsLoadable("org.apache.hadoop.fs.FileSystem")) {
logInfo("Hadoop FileSystem available")
// Use Hadoop filesystem
} else {
logWarning("Hadoop not in classpath, using local filesystem")
// Fallback to local filesystem
}
// Class loader hierarchy inspection
val sparkClassLoader = SparkClassUtils.getSparkClassLoader
val contextClassLoader = SparkClassUtils.getContextOrSparkClassLoader
logDebug(s"Spark ClassLoader: $sparkClassLoader")
logDebug(s"Context ClassLoader: $contextClassLoader")
// Plugin loading pattern
def loadPlugin[T](pluginClassName: String, baseClass: Class[T]): Option[T] = {
try {
val pluginClass = SparkClassUtils.classForName[T](pluginClassName)
if (baseClass.isAssignableFrom(pluginClass)) {
Some(pluginClass.getDeclaredConstructor().newInstance())
} else {
logError(s"Plugin $pluginClassName does not extend ${baseClass.getName}")
None
}
} catch {
case e: Exception =>
logError(s"Failed to load plugin $pluginClassName", e)
None
}
}Collection manipulation utilities providing performance-optimized operations for common data structure transformations.
/**
* Collection utility methods for performance-optimized operations
* Provides alternatives to standard library methods with better performance characteristics
*/
private[spark] trait SparkCollectionUtils {
/**
* Create indexed map from keys with better performance than zipWithIndex.toMap
* @param keys Iterable of keys
* @return Map from key to index
*/
def toMapWithIndex[K](keys: Iterable[K]): Map[K, Int]
}
private[spark] object SparkCollectionUtils extends SparkCollectionUtilsUsage Examples:
import org.apache.spark.util.SparkCollectionUtils
// Efficient key indexing
val columnNames = Seq("id", "name", "age", "email", "department")
val columnIndexMap = SparkCollectionUtils.toMapWithIndex(columnNames)
// Result: Map("id" -> 0, "name" -> 1, "age" -> 2, "email" -> 3, "department" -> 4)
// Use in schema processing
case class Schema(fields: Seq[String]) {
lazy val fieldIndexMap: Map[String, Int] =
SparkCollectionUtils.toMapWithIndex(fields)
def getFieldIndex(fieldName: String): Option[Int] =
fieldIndexMap.get(fieldName)
}
val schema = Schema(Seq("user_id", "timestamp", "event_type", "properties"))
val timestampIndex = schema.getFieldIndex("timestamp") // Some(1)
// Performance comparison demonstration
def compareIndexingMethods[K](keys: Seq[K]): Unit = {
val start1 = System.nanoTime()
val map1 = keys.zipWithIndex.toMap
val time1 = System.nanoTime() - start1
val start2 = System.nanoTime()
val map2 = SparkCollectionUtils.toMapWithIndex(keys)
val time2 = System.nanoTime() - start2
logInfo(s"zipWithIndex.toMap: ${time1}ns")
logInfo(s"toMapWithIndex: ${time2}ns")
logInfo(s"Performance improvement: ${time1.toDouble / time2}x")
}Network-related utilities including byte unit conversions and Java utility methods for network operations.
/**
* Byte unit enumeration for size conversions
* Provides binary unit conversions (powers of 2)
*/
public enum ByteUnit {
BYTE(1),
KiB(1L << 10), // 1024 bytes
MiB(1L << 20), // 1024^2 bytes
GiB(1L << 30), // 1024^3 bytes
TiB(1L << 40), // 1024^4 bytes
PiB(1L << 50); // 1024^5 bytes
/**
* Convert from another unit to this unit
* @param d Value in source unit
* @param u Source unit
* @return Value converted to this unit
*/
public long convertFrom(long d, ByteUnit u);
/**
* Convert from this unit to another unit
* @param d Value in this unit
* @param u Target unit
* @return Value converted to target unit
*/
public long convertTo(long d, ByteUnit u);
/** Convert to bytes */
public long toBytes(long d);
/** Convert to kibibytes */
public long toKiB(long d);
/** Convert to mebibytes */
public long toMiB(long d);
/** Convert to gibibytes */
public long toGiB(long d);
/** Convert to tebibytes */
public long toTiB(long d);
/** Convert to pebibytes */
public long toPiB(long d);
}
/**
* Java utility methods for network operations
*/
public class JavaUtils {
// Network-related utility methods
}Usage Examples:
import org.apache.spark.network.util.ByteUnit;
// Memory size calculations
long memoryInBytes = 8L * 1024 * 1024 * 1024; // 8 GB
long memoryInGiB = ByteUnit.BYTE.toGiB(memoryInBytes); // 8
// Configuration parsing
String configValue = "512m";
long configBytes;
if (configValue.endsWith("k") || configValue.endsWith("K")) {
long value = Long.parseLong(configValue.substring(0, configValue.length() - 1));
configBytes = ByteUnit.KiB.toBytes(value);
} else if (configValue.endsWith("m") || configValue.endsWith("M")) {
long value = Long.parseLong(configValue.substring(0, configValue.length() - 1));
configBytes = ByteUnit.MiB.toBytes(value);
} else if (configValue.endsWith("g") || configValue.endsWith("G")) {
long value = Long.parseLong(configValue.substring(0, configValue.length() - 1));
configBytes = ByteUnit.GiB.toBytes(value);
}
// Buffer size optimization
public class BufferSizeCalculator {
public static long calculateOptimalBufferSize(long dataSize) {
// Use different buffer sizes based on data size
if (dataSize < ByteUnit.MiB.toBytes(10)) {
return ByteUnit.KiB.toBytes(64); // 64 KiB for small data
} else if (dataSize < ByteUnit.GiB.toBytes(1)) {
return ByteUnit.MiB.toBytes(1); // 1 MiB for medium data
} else {
return ByteUnit.MiB.toBytes(8); // 8 MiB for large data
}
}
}
// Memory usage reporting
public class MemoryReporter {
public void reportMemoryUsage(long usedMemory, long totalMemory) {
double usageRatio = (double) usedMemory / totalMemory;
String usedFormatted = formatBytes(usedMemory);
String totalFormatted = formatBytes(totalMemory);
System.out.printf("Memory usage: %s / %s (%.1f%%)%n",
usedFormatted, totalFormatted, usageRatio * 100);
}
private String formatBytes(long bytes) {
if (bytes >= ByteUnit.PiB.toBytes(1)) {
return String.format("%.1f PiB", ByteUnit.BYTE.toPiB(bytes));
} else if (bytes >= ByteUnit.TiB.toBytes(1)) {
return String.format("%.1f TiB", ByteUnit.BYTE.toTiB(bytes));
} else if (bytes >= ByteUnit.GiB.toBytes(1)) {
return String.format("%.1f GiB", ByteUnit.BYTE.toGiB(bytes));
} else if (bytes >= ByteUnit.MiB.toBytes(1)) {
return String.format("%.1f MiB", ByteUnit.BYTE.toMiB(bytes));
} else if (bytes >= ByteUnit.KiB.toBytes(1)) {
return String.format("%.1f KiB", ByteUnit.BYTE.toKiB(bytes));
} else {
return bytes + " bytes";
}
}
}Error handling and exception management utilities for consistent error processing across Spark components.
/**
* Error handling utilities for Spark operations
* Provides consistent error handling patterns and exception management
*/
private[spark] object SparkErrorUtils {
/**
* Execute operation with proper error handling for IO operations
* @param block Operation to execute
* @return Result of operation
* @throws IOException If operation fails
*/
def tryOrIOException[T](block: => T): T
}
/**
* Fatal exception for non-recoverable errors
* Used to indicate errors that should terminate the application
*/
private[spark] class SparkFatalException(message: String, cause: Throwable = null)
extends RuntimeException(message, cause)Usage Examples:
import org.apache.spark.util.{SparkErrorUtils, SparkFatalException}
// Safe IO operations
def readConfigFile(path: String): Properties = {
SparkErrorUtils.tryOrIOException {
val props = new Properties()
val stream = new FileInputStream(path)
try {
props.load(stream)
props
} finally {
stream.close()
}
}
}
// Fatal error handling
def validateCriticalConfiguration(config: Map[String, String]): Unit = {
val requiredKeys = Set("spark.app.name", "spark.master")
val missingKeys = requiredKeys -- config.keySet
if (missingKeys.nonEmpty) {
throw new SparkFatalException(
s"Missing required configuration keys: ${missingKeys.mkString(", ")}"
)
}
}
// Resource management with error handling
class ResourceManager {
def withResource[T, R](resource: T)(cleanup: T => Unit)(operation: T => R): R = {
try {
operation(resource)
} catch {
case e: Exception =>
logError("Operation failed, cleaning up resource", e)
throw e
} finally {
try {
cleanup(resource)
} catch {
case e: Exception =>
logError("Failed to cleanup resource", e)
// Don't suppress original exception
}
}
}
}Other specialized utility objects for specific domains within Spark.
/** File operation utilities */
private[spark] object SparkFileUtils {
// File system operations and path handling utilities
}
/** Schema validation and utility methods */
private[spark] object SparkSchemaUtils {
// Schema comparison, validation, and transformation utilities
}
/** Serialization/deserialization utilities */
private[spark] object SparkSerDeUtils {
// Serialization utilities for distributed computing
}
/** Thread and executor utilities */
private[spark] object SparkThreadUtils {
// Thread pool management and concurrent execution utilities
}
/** Array utilities for low-level operations */
public class ByteArrayUtils {
// Unsafe array operations for performance-critical code
}class SparkDataProcessor extends Logging {
def processData(inputPath: String): Unit = {
// Use multiple utilities together
val config = SparkErrorUtils.tryOrIOException {
loadConfiguration(inputPath)
}
val schema = parseSchema(config)
val fieldIndexMap = SparkCollectionUtils.toMapWithIndex(schema.fields)
logInfo(s"Processing data with schema: ${JsonUtils.toJsonString { gen =>
gen.writeStartObject()
gen.writeArrayFieldStart("fields")
schema.fields.foreach(gen.writeString)
gen.writeEndArray()
gen.writeEndObject()
}}")
}
}class PerformanceOptimizedProcessor {
// Use utility classes for optimal performance
def optimizeCollectionOperations[K](keys: Seq[K]): Map[K, Int] = {
// Prefer SparkCollectionUtils over standard library for large collections
if (keys.size > 1000) {
SparkCollectionUtils.toMapWithIndex(keys)
} else {
keys.zipWithIndex.toMap // Standard library is fine for small collections
}
}
def optimizeClassLoading(className: String): Boolean = {
// Cache class availability checks
SparkClassUtils.classIsLoadable(className)
}
}