Apache Spark Common Utils library providing core utilities and shared functionality for distributed computing operations
npx @tessl/cli install tessl/maven-org-apache-spark--spark-common-utils_2-12@3.5.0Apache Spark Common Utils is a core utility library providing essential infrastructure and shared functionality for Apache Spark's distributed computing ecosystem. It includes exception handling, storage management, Java functional interfaces, logging infrastructure, and various utility classes for collections, files, serialization, and schema operations.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-common-utils_2.12</artifactId>
<version>3.5.6</version>
</dependency>For Scala applications:
import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.internal.LoggingFor Java applications:
import org.apache.spark.SparkException;
import org.apache.spark.SparkThrowable;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.internal.Logging
// Exception handling with error classes
try {
// Some Spark operation
} catch {
case e: SparkException =>
println(s"Spark error: ${e.getErrorClass}")
println(s"Parameters: ${e.getMessageParameters}")
}
// Storage level configuration
val storageLevel = StorageLevel.MEMORY_AND_DISK_SER
println(s"Storage config: ${storageLevel.description}")
// Logging in Spark components
class MySparkComponent extends Logging {
def processData(): Unit = {
logInfo("Starting data processing")
// Processing logic here
logDebug("Data processing completed")
}
}Apache Spark Common Utils is organized around several key architectural components:
SparkException and SparkThrowable interface providing error classes, message parameters, and query contextStorageLevel class controlling RDD persistence strategies across memory, disk, and off-heap storageLogging trait providing SLF4J-based logging with lazy evaluation and level checkingComprehensive exception handling system with standardized error classes and contextual information for debugging distributed operations.
class SparkException(
message: String,
cause: Throwable,
errorClass: Option[String],
messageParameters: Map[String, String],
context: Array[QueryContext]
) extends Exception with SparkThrowable
object SparkException {
def internalError(msg: String, context: Array[QueryContext], summary: String): SparkException
def internalError(msg: String, context: Array[QueryContext], summary: String, category: Option[String]): SparkException
def internalError(msg: String): SparkException
def internalError(msg: String, category: String): SparkException
def internalError(msg: String, cause: Throwable): SparkException
}
interface SparkThrowable {
String getErrorClass();
default String getSqlState() { return SparkThrowableHelper.getSqlState(this.getErrorClass()); }
default boolean isInternalError() { return SparkThrowableHelper.isInternalError(this.getErrorClass()); }
default java.util.Map<String, String> getMessageParameters() { return new java.util.HashMap<>(); }
default QueryContext[] getQueryContext() { return new QueryContext[0]; }
}RDD storage level management for controlling persistence strategies across different storage tiers with replication options.
class StorageLevel(
useDisk: Boolean,
useMemory: Boolean,
useOffHeap: Boolean,
deserialized: Boolean,
replication: Int = 1
) extends Externalizable
class StorageLevel {
def useDisk: Boolean
def useMemory: Boolean
def useOffHeap: Boolean
def deserialized: Boolean
def replication: Int
def isValid: Boolean
def toInt: Int
def description: String
def clone(): StorageLevel
private[spark] def memoryMode: MemoryMode
}
object StorageLevel {
val NONE: StorageLevel
val DISK_ONLY: StorageLevel
val DISK_ONLY_2: StorageLevel
val DISK_ONLY_3: StorageLevel
val MEMORY_ONLY: StorageLevel
val MEMORY_ONLY_2: StorageLevel
val MEMORY_ONLY_SER: StorageLevel
val MEMORY_ONLY_SER_2: StorageLevel
val MEMORY_AND_DISK: StorageLevel
val MEMORY_AND_DISK_2: StorageLevel
val MEMORY_AND_DISK_SER: StorageLevel
val MEMORY_AND_DISK_SER_2: StorageLevel
val OFF_HEAP: StorageLevel
def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean, deserialized: Boolean, replication: Int): StorageLevel
def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean, replication: Int = 1): StorageLevel
def apply(flags: Int, replication: Int): StorageLevel
def fromString(s: String): StorageLevel
}Comprehensive set of functional interfaces for Java integration with Spark's functional programming model, enabling lambda expressions and method references.
@FunctionalInterface
interface Function<T1, R> extends Serializable {
R call(T1 v1) throws Exception;
}
@FunctionalInterface
interface MapFunction<T, U> extends Serializable {
U call(T value) throws Exception;
}
@FunctionalInterface
interface FilterFunction<T> extends Serializable {
boolean call(T value) throws Exception;
}SLF4J-based logging infrastructure with lazy evaluation, level checking, and Spark-specific configuration management.
trait Logging {
protected def logInfo(msg: => String): Unit
protected def logDebug(msg: => String): Unit
protected def logWarning(msg: => String): Unit
protected def logError(msg: => String): Unit
protected def logError(msg: => String, throwable: Throwable): Unit
protected def isTraceEnabled(): Boolean
}Build metadata and version information for runtime introspection and compatibility checking.
private[spark] object SparkBuildInfo {
val spark_version: String
val spark_branch: String
val spark_revision: String
val spark_build_date: String
val spark_build_user: String
val spark_repo_url: String
val spark_doc_root: String
}Collection of utility classes for common operations including JSON processing, class loading, collections manipulation, and network utilities.
private[spark] object JsonUtils extends JsonUtils {
def toJsonString(block: JsonGenerator => Unit): String
}
private[spark] object SparkClassUtils extends SparkClassUtils {
def getSparkClassLoader: ClassLoader
def classForName[C](className: String, initialize: Boolean): Class[C]
}interface QueryContext {
String objectType();
String objectName();
int startIndex();
int stopIndex();
String fragment();
}
class SparkDriverExecutionException(cause: Throwable) extends SparkException
class SparkUserAppException(exitCode: Int) extends SparkException
class ExecutorDeadException(message: String) extends SparkException
class SparkUpgradeException(message: String, cause: Option[Throwable]) extends RuntimeException with SparkThrowable
class SparkArithmeticException(message: String) extends ArithmeticException with SparkThrowable
class SparkUnsupportedOperationException(message: String) extends UnsupportedOperationException with SparkThrowable
class SparkClassNotFoundException(errorClass: String, messageParameters: Map[String, String], cause: Throwable) extends ClassNotFoundException with SparkThrowable
class SparkConcurrentModificationException(errorClass: String, messageParameters: Map[String, String], cause: Throwable) extends ConcurrentModificationException with SparkThrowable
class SparkDateTimeException(message: String) extends DateTimeException with SparkThrowable
class SparkFileNotFoundException(errorClass: String, messageParameters: Map[String, String]) extends FileNotFoundException with SparkThrowable
class SparkNumberFormatException(message: String) extends NumberFormatException with SparkThrowable
class SparkIllegalArgumentException(message: String, cause: Option[Throwable]) extends IllegalArgumentException with SparkThrowable
class SparkRuntimeException(message: String, cause: Option[Throwable]) extends RuntimeException with SparkThrowable
class SparkNoSuchElementException(errorClass: String, messageParameters: Map[String, String]) extends NoSuchElementException with SparkThrowable
class SparkSecurityException(errorClass: String, messageParameters: Map[String, String]) extends SecurityException with SparkThrowable
class SparkArrayIndexOutOfBoundsException(message: String) extends ArrayIndexOutOfBoundsException with SparkThrowable
class SparkSQLException(errorClass: String, messageParameters: Map[String, String]) extends SQLException with SparkThrowable
class SparkSQLFeatureNotSupportedException(errorClass: String, messageParameters: Map[String, String]) extends SQLFeatureNotSupportedException with SparkThrowableenum MemoryMode {
ON_HEAP,
OFF_HEAP
}
enum ByteUnit {
BYTE(1),
KiB(1L << 10),
MiB(1L << 20),
GiB(1L << 30),
TiB(1L << 40),
PiB(1L << 50);
long convertFrom(long d, ByteUnit u);
long convertTo(long d, ByteUnit u);
long toBytes(long d);
long toKiB(long d);
long toMiB(long d);
long toGiB(long d);
long toTiB(long d);
long toPiB(long d);
}