Core utility classes and functions for Apache Spark including exception handling, logging, storage configuration, and Java API integration
—
SLF4J-based logging trait providing consistent logging methods and formatting across all Spark components, with support for lazy evaluation and configurable log levels.
Core logging functionality that can be mixed into any class to provide standardized logging methods with SLF4J integration.
/**
* Utility trait for classes that want to log data using SLF4J
* Provides standardized logging methods with lazy evaluation
*/
trait Logging {
/**
* Logs info level message with lazy evaluation
* @param msg - Message to log (evaluated only if info logging is enabled)
*/
protected def logInfo(msg: => String): Unit
/**
* Logs debug level message with lazy evaluation
* @param msg - Message to log (evaluated only if debug logging is enabled)
*/
protected def logDebug(msg: => String): Unit
/**
* Logs trace level message with lazy evaluation
* @param msg - Message to log (evaluated only if trace logging is enabled)
*/
protected def logTrace(msg: => String): Unit
/**
* Logs warning level message with lazy evaluation
* @param msg - Message to log (evaluated only if warn logging is enabled)
*/
protected def logWarning(msg: => String): Unit
/**
* Logs error level message with lazy evaluation
* @param msg - Message to log (evaluated only if error logging is enabled)
*/
protected def logError(msg: => String): Unit
/**
* Logs warning level message with exception
* @param msg - Message to log
* @param throwable - Exception to log with stack trace
*/
protected def logWarning(msg: => String, throwable: Throwable): Unit
/**
* Logs error level message with exception
* @param msg - Message to log
* @param throwable - Exception to log with stack trace
*/
protected def logError(msg: => String, throwable: Throwable): Unit
/**
* Checks if trace logging is enabled
* @return true if trace level logging is enabled
*/
protected def isTraceEnabled(): Boolean
/**
* Initializes logging if necessary
* @param isInterpreter - Whether running in interpreter mode
*/
protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit
}Usage Examples:
import org.apache.spark.internal.Logging
class MySparkComponent extends Logging {
def processData(data: List[String]): List[String] = {
logInfo(s"Starting to process ${data.length} items")
try {
val result = data.map(_.toUpperCase)
logDebug(s"Processed data: ${result.take(5)}")
result
} catch {
case ex: Exception =>
logError("Failed to process data", ex)
throw ex
}
}
def expensiveOperation(): Unit = {
// Lazy evaluation - only computed if debug is enabled
logDebug(s"Debug info: ${computeExpensiveDebugInfo()}")
if (isTraceEnabled()) {
logTrace("Detailed trace information")
}
}
private def computeExpensiveDebugInfo(): String = {
// This is only called if debug logging is enabled
Thread.sleep(100) // Simulate expensive computation
"Expensive debug computation result"
}
}Core logging methods for different severity levels with lazy message evaluation.
// Info level logging for general information
protected def logInfo(msg: => String): Unit
// Debug level logging for detailed debugging information
protected def logDebug(msg: => String): Unit
// Trace level logging for very detailed execution traces
protected def logTrace(msg: => String): Unit
// Warning level logging for potentially problematic situations
protected def logWarning(msg: => String): Unit
// Error level logging for error conditions
protected def logError(msg: => String): UnitUsage Examples:
class DataProcessor extends Logging {
def run(): Unit = {
logInfo("DataProcessor starting up")
logDebug("Loading configuration from environment")
val config = loadConfig()
logTrace(s"Configuration details: $config")
if (config.isEmpty) {
logWarning("No configuration found, using defaults")
}
try {
process(config)
logInfo("DataProcessor completed successfully")
} catch {
case ex: Exception =>
logError("DataProcessor failed")
throw ex
}
}
}Specialized logging methods for capturing exceptions with stack traces.
/**
* Logs warning with exception details
* @param msg - Warning message
* @param throwable - Exception to include in log
*/
protected def logWarning(msg: => String, throwable: Throwable): Unit
/**
* Logs error with exception details
* @param msg - Error message
* @param throwable - Exception to include in log with full stack trace
*/
protected def logError(msg: => String, throwable: Throwable): UnitUsage Examples:
class NetworkClient extends Logging {
def connect(): Unit = {
try {
establishConnection()
} catch {
case ex: java.net.ConnectException =>
logWarning("Connection failed, will retry", ex)
scheduleRetry()
case ex: java.io.IOException =>
logError("Fatal I/O error during connection", ex)
throw ex
}
}
def processRequest(request: String): String = {
try {
sendRequest(request)
} catch {
case ex: Exception =>
logError(s"Failed to process request: $request", ex)
throw new RuntimeException("Request processing failed", ex)
}
}
}Methods for controlling logging behavior and checking log level configuration.
/**
* Checks if trace logging is enabled to avoid expensive trace operations
* @return true if trace level logging is enabled
*/
protected def isTraceEnabled(): Boolean
/**
* Initializes logging configuration if necessary
* @param isInterpreter - Whether running in Spark shell/interpreter mode
*/
protected def initializeLogIfNecessary(isInterpreter: Boolean): UnitUsage Examples:
class PerformanceMonitor extends Logging {
def monitorOperation[T](operation: => T): T = {
val startTime = System.currentTimeMillis()
// Only do expensive trace logging if enabled
if (isTraceEnabled()) {
logTrace(s"Starting operation at $startTime")
logTrace(s"Thread: ${Thread.currentThread().getName}")
logTrace(s"Memory: ${Runtime.getRuntime.freeMemory()}")
}
try {
val result = operation
val duration = System.currentTimeMillis() - startTime
logInfo(s"Operation completed in ${duration}ms")
if (isTraceEnabled()) {
logTrace(s"Result type: ${result.getClass.getSimpleName}")
}
result
} catch {
case ex: Exception =>
val duration = System.currentTimeMillis() - startTime
logError(s"Operation failed after ${duration}ms", ex)
throw ex
}
}
def initialize(): Unit = {
// Initialize logging for interpreter if needed
initializeLogIfNecessary(isInterpreter = false)
logInfo("PerformanceMonitor initialized")
}
}Take advantage of lazy evaluation to avoid expensive string operations when logging is disabled:
class QueryEngine extends Logging {
def executeQuery(sql: String): DataFrame = {
// Good: Message only computed if debug is enabled
logDebug(s"Query plan: ${analyzeQuery(sql)}")
// Bad: analyzeQuery always called even if debug disabled
// logDebug("Query plan: " + analyzeQuery(sql))
// Good: Conditional expensive operations
if (isTraceEnabled()) {
logTrace(s"Detailed metrics: ${computeDetailedMetrics()}")
}
executeSQL(sql)
}
}Use consistent log message formats for better parsing and monitoring:
class TaskManager extends Logging {
def submitTask(taskId: String, taskType: String): Unit = {
logInfo(s"TASK_SUBMIT task_id=$taskId task_type=$taskType")
try {
executeTask(taskId, taskType)
logInfo(s"TASK_COMPLETE task_id=$taskId duration=${getDuration()}ms")
} catch {
case ex: Exception =>
logError(s"TASK_FAILED task_id=$taskId error=${ex.getMessage}", ex)
throw ex
}
}
}Choose appropriate log levels for different types of messages:
class DataLoader extends Logging {
def loadData(path: String): Dataset[Row] = {
// Info: High-level operations users should know about
logInfo(s"Loading data from $path")
// Debug: Detailed information useful for debugging
logDebug(s"Using schema inference: ${shouldInferSchema}")
logDebug(s"Partitions: ${getPartitionCount(path)}")
// Trace: Very detailed execution information
if (isTraceEnabled()) {
logTrace(s"File list: ${listFiles(path).take(10)}")
logTrace(s"Memory usage: ${getMemoryUsage()}")
}
// Warning: Potential issues that don't prevent operation
if (isDeprecatedFormat(path)) {
logWarning(s"Using deprecated file format for $path")
}
// Error: Actual problems that prevent operation
if (!exists(path)) {
logError(s"Data path does not exist: $path")
throw new FileNotFoundException(path)
}
readData(path)
}
}The Logging trait integrates with SLF4J, allowing configuration through standard logging frameworks:
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Spark-specific logging levels -->
<logger name="org.apache.spark" level="INFO"/>
<logger name="org.apache.spark.sql" level="DEBUG"/>
<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
</configuration># Root logger
log4j.rootLogger=WARN, console
# Console appender
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} %-5p %c{1}:%L - %m%n
# Spark-specific levels
log4j.logger.org.apache.spark=INFO
log4j.logger.org.apache.spark.sql=DEBUG// Core logging trait for SLF4J integration
trait Logging {
// Basic logging methods with lazy evaluation
protected def logInfo(msg: => String): Unit
protected def logDebug(msg: => String): Unit
protected def logTrace(msg: => String): Unit
protected def logWarning(msg: => String): Unit
protected def logError(msg: => String): Unit
// Exception logging methods
protected def logWarning(msg: => String, throwable: Throwable): Unit
protected def logError(msg: => String, throwable: Throwable): Unit
// Logging control methods
protected def isTraceEnabled(): Boolean
protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit
}Install with Tessl CLI
npx tessl i tessl/maven-spark-common-utils