Core utility classes and functions for Apache Spark including exception handling, logging, storage configuration, and Java API integration
npx @tessl/cli install tessl/maven-spark-common-utils@3.5.0Apache Spark Common Utils provides essential foundational components for the Apache Spark ecosystem. This library contains core utilities for exception handling, storage configuration, logging, Java API integration, and various utility functions that serve as building blocks across all Spark modules.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-common-utils_2.13</artifactId>
<version>3.5.6</version>
</dependency>import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.internal.Logging
import org.apache.spark.api.java.function._For Java users:
import org.apache.spark.SparkException;
import org.apache.spark.SparkThrowable;
import org.apache.spark.QueryContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.api.java.function.*;import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.internal.Logging
// Exception handling
try {
// Some Spark operation
} catch {
case ex: SparkException =>
println(s"Error class: ${ex.getErrorClass}")
println(s"Parameters: ${ex.getMessageParameters}")
}
// Storage level configuration
val storageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
println(s"Uses disk: ${storageLevel.useDisk}")
println(s"Uses memory: ${storageLevel.useMemory}")
println(s"Replication: ${storageLevel.replication}")
// Logging in a class
class MySparkComponent extends Logging {
def processData(): Unit = {
logInfo("Starting data processing")
logWarning("This is a warning message")
}
}Apache Spark Common Utils is structured around several key architectural components:
Comprehensive exception handling system with structured error reporting, error classes, and detailed context information.
class SparkException(
message: String,
cause: Throwable = null,
errorClass: Option[String] = None,
messageParameters: Map[String, String] = Map.empty,
context: Array[QueryContext] = Array.empty
) extends Exception(message, cause) with SparkThrowable
trait SparkThrowable {
def getErrorClass(): String
def getSqlState(): String
def isInternalError(): Boolean
def getMessageParameters(): java.util.Map[String, String]
def getQueryContext(): Array[QueryContext]
}
interface QueryContext {
String objectType();
String objectName();
int startIndex();
int stopIndex();
String fragment();
}Storage level definitions for controlling RDD and Dataset persistence, including memory, disk, serialization, and replication options.
class StorageLevel(
useDisk: Boolean,
useMemory: Boolean,
useOffHeap: Boolean,
deserialized: Boolean,
replication: Int
) {
def isValid: Boolean
def clone(): StorageLevel
def description: String
}
object StorageLevel {
val NONE: StorageLevel
val DISK_ONLY: StorageLevel
val DISK_ONLY_2: StorageLevel
val MEMORY_ONLY: StorageLevel
val MEMORY_ONLY_2: StorageLevel
val MEMORY_ONLY_SER: StorageLevel
val MEMORY_AND_DISK: StorageLevel
val MEMORY_AND_DISK_2: StorageLevel
val MEMORY_AND_DISK_SER: StorageLevel
val OFF_HEAP: StorageLevel
}SLF4J-based logging trait providing consistent logging methods across Spark components.
trait Logging {
protected def logInfo(msg: => String): Unit
protected def logDebug(msg: => String): Unit
protected def logTrace(msg: => String): Unit
protected def logWarning(msg: => String): Unit
protected def logError(msg: => String): Unit
protected def logWarning(msg: => String, throwable: Throwable): Unit
protected def logError(msg: => String, throwable: Throwable): Unit
protected def isTraceEnabled(): Boolean
protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit
}Comprehensive functional interfaces for Spark's Java API, enabling type-safe lambda expressions and functional programming patterns.
@FunctionalInterface
public interface Function<T1, R> extends Serializable {
R call(T1 v1) throws Exception;
}
@FunctionalInterface
public interface Function2<T1, T2, R> extends Serializable {
R call(T1 v1, T2 v2) throws Exception;
}
@FunctionalInterface
public interface VoidFunction<T> extends Serializable {
void call(T t) throws Exception;
}
@FunctionalInterface
public interface PairFunction<T, K, V> extends Serializable {
Tuple2<K, V> call(T t) throws Exception;
}
@FunctionalInterface
public interface FlatMapFunction<T, R> extends Serializable {
Iterator<R> call(T t) throws Exception;
}Essential utilities for network operations and common Java tasks.
public class JavaUtils {
public static final long DEFAULT_DRIVER_MEM_MB = 1024;
public static void closeQuietly(Closeable closeable);
public static int nonNegativeHash(Object obj);
public static ByteBuffer stringToBytes(String s);
public static String bytesToString(ByteBuffer b);
public static void deleteRecursively(File file);
}
public enum MemoryMode {
ON_HEAP, OFF_HEAP
}The exception system supports structured error reporting with error classes and parameterized messages:
class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) {
def getErrorMessage(errorClass: String, messageParameters: Map[String, String]): String
def getMessageTemplate(errorClass: String): String
def getSqlState(errorClass: String): String
}// Storage level configuration
case class StorageLevel(
useDisk: Boolean,
useMemory: Boolean,
useOffHeap: Boolean,
deserialized: Boolean,
replication: Int
)
// Exception context information
trait QueryContext {
def objectType(): String
def objectName(): String
def startIndex(): Int
def stopIndex(): Int
def fragment(): String
}// Memory allocation modes
public enum MemoryMode {
ON_HEAP,
OFF_HEAP
}
// Java functional interface base types
public interface Function<T1, R> extends Serializable {
R call(T1 v1) throws Exception;
}
public interface VoidFunction<T> extends Serializable {
void call(T t) throws Exception;
}