SLF4J-based logging infrastructure with lazy evaluation, level checking, and Spark-specific configuration management for distributed computing environments.
Core logging functionality that provides SLF4J-based logging with performance optimizations for distributed computing.
/**
* Utility trait for classes that want to log data
* Creates a SLF4J logger for the class and allows logging messages at different levels
* using methods that only evaluate parameters lazily if the log level is enabled
*/
trait Logging {
/** Protected method to get the logger name for this object */
protected def logName: String
/** Protected method to get or create the logger for this object */
protected def log: Logger
/** Log info message with lazy evaluation */
protected def logInfo(msg: => String): Unit
/** Log debug message with lazy evaluation */
protected def logDebug(msg: => String): Unit
/** Log trace message with lazy evaluation */
protected def logTrace(msg: => String): Unit
/** Log warning message with lazy evaluation */
protected def logWarning(msg: => String): Unit
/** Log error message with lazy evaluation */
protected def logError(msg: => String): Unit
/** Log info message with throwable */
protected def logInfo(msg: => String, throwable: Throwable): Unit
/** Log debug message with throwable */
protected def logDebug(msg: => String, throwable: Throwable): Unit
/** Log trace message with throwable */
protected def logTrace(msg: => String, throwable: Throwable): Unit
/** Log warning message with throwable */
protected def logWarning(msg: => String, throwable: Throwable): Unit
/** Log error message with throwable */
protected def logError(msg: => String, throwable: Throwable): Unit
/** Check if trace logging is enabled */
protected def isTraceEnabled(): Boolean
/** Initialize logging if necessary */
protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit
/** Initialize logging if necessary with silence option */
protected def initializeLogIfNecessary(isInterpreter: Boolean, silent: Boolean): Boolean
}Usage Examples:
import org.apache.spark.internal.Logging
// Extend Logging trait in your class
class DataProcessor extends Logging {
def processData(data: Seq[String]): Seq[String] = {
logInfo(s"Starting to process ${data.size} records")
if (log.isDebugEnabled) {
logDebug(s"Input data: ${data.take(5).mkString(", ")}...")
}
try {
val result = data.map(_.toUpperCase)
logInfo(s"Successfully processed ${result.size} records")
result
} catch {
case e: Exception =>
logError("Failed to process data", e)
throw e
}
}
def validateData(data: String): Boolean = {
logTrace(s"Validating data: $data")
val isValid = data.nonEmpty && data.length < 1000
if (!isValid) {
logWarning(s"Data validation failed for: ${data.take(50)}...")
}
isValid
}
}
// Usage in Spark components
class SparkTaskProcessor extends Logging {
def executeTask(taskId: String): Unit = {
logInfo(s"Executing task: $taskId")
// Expensive logging computation only happens if debug is enabled
logDebug(s"Task details: ${computeExpensiveTaskInfo()}")
try {
// Task execution logic
logInfo(s"Task $taskId completed successfully")
} catch {
case e: Exception =>
logError(s"Task $taskId failed", e)
throw e
}
}
private def computeExpensiveTaskInfo(): String = {
// This expensive computation only runs if debug logging is enabled
// due to the lazy evaluation of the message parameter
Thread.sleep(100) // Simulate expensive computation
"Detailed task information"
}
}Configuration and management utilities for the Spark logging system.
private[spark] object Logging {
/** Lock for synchronizing logging initialization */
val initLock: Object
/** Reset the logging system to its initial state */
def uninitialize(): Unit
/** Internal logging filter for Spark shell */
private[spark] class SparkShellLoggingFilter extends AbstractFilter {
override def filter(logEvent: LogEvent): Filter.Result
override def getState: LifeCycle.State
override def initialize(): Unit
override def start(): Unit
override def stop(): Unit
override def isStarted: Boolean
override def isStopped: Boolean
}
}Configuration Examples:
import org.apache.spark.internal.Logging
// Manual logging system management (advanced usage)
object LoggingManager {
def resetLogging(): Unit = {
Logging.uninitialize()
}
def initializeForInterpreter(): Unit = {
// This would typically be handled automatically by Spark
val logger = new Logging {}
logger.initializeLogIfNecessary(isInterpreter = true, silent = false)
}
}
// Custom logging configuration
class CustomSparkComponent extends Logging {
// Force logging initialization (for testing)
initializeForcefully(isInterpreter = false, silent = true)
def performOperation(): Unit = {
if (isTraceEnabled()) {
logTrace("Entering performOperation")
}
logInfo("Performing custom Spark operation")
// Operation logic here
logInfo("Custom operation completed")
}
}The Logging trait uses lazy evaluation (call-by-name parameters) to avoid expensive string operations when logging is disabled.
class EfficientLogger extends Logging {
def processLargeDataset(data: Seq[String]): Unit = {
// ✅ Good: Expensive operation only runs if debug is enabled
logDebug(s"Processing dataset with items: ${data.map(_.toUpperCase).mkString(", ")}")
// ❌ Avoid: String concatenation always happens
// logDebug("Processing dataset with items: " + data.map(_.toUpperCase).mkString(", "))
// ✅ Good: Check level first for very expensive operations
if (log.isDebugEnabled) {
val summary = generateExpensiveDataSummary(data)
logDebug(s"Dataset summary: $summary")
}
}
private def generateExpensiveDataSummary(data: Seq[String]): String = {
// Expensive analysis
data.groupBy(_.length).mapValues(_.size).toString
}
}class ComponentLogger extends Logging {
def demonstrateLogLevels(): Unit = {
// ERROR: For errors that prevent operation from completing
logError("Failed to connect to database", databaseException)
// WARN: For recoverable issues or deprecated usage
logWarning("Using deprecated configuration, please update")
logWarning("Retrying failed operation", retryException)
// INFO: For high-level operation status
logInfo("Starting data processing job")
logInfo(s"Processed ${recordCount} records in ${duration}ms")
// DEBUG: For detailed diagnostic information
logDebug(s"Using connection pool with ${poolSize} connections")
logDebug(s"Configuration: ${config.toDebugString}")
// TRACE: For very detailed execution flow
logTrace("Entering method processRecord")
logTrace(s"Processing record with ID: ${record.id}")
}
}class ErrorHandlingLogger extends Logging {
def handleOperationWithRetry(): Unit = {
var attempts = 0
val maxAttempts = 3
while (attempts < maxAttempts) {
try {
performOperation()
return
} catch {
case e: RetryableException =>
attempts += 1
if (attempts < maxAttempts) {
logWarning(s"Operation failed, retrying (attempt $attempts/$maxAttempts)", e)
} else {
logError(s"Operation failed after $maxAttempts attempts", e)
throw e
}
case e: FatalException =>
logError("Fatal error occurred, not retrying", e)
throw e
}
}
}
def processWithValidation(input: String): String = {
try {
validate(input)
transform(input)
} catch {
case e: ValidationException =>
logWarning(s"Input validation failed: ${e.getMessage}")
throw e
case e: TransformException =>
logError("Transformation failed unexpectedly", e)
throw e
}
}
}class PerformanceAwareLogger extends Logging {
def processHighVolumeData(items: Seq[DataItem]): Unit = {
logInfo(s"Processing ${items.size} items")
var processed = 0
val reportInterval = 10000
for (item <- items) {
// Process item
processed += 1
// Avoid logging every item - use sampling
if (processed % reportInterval == 0) {
logInfo(s"Processed $processed items")
}
// Use trace level for detailed item logging (disabled in production)
logTrace(s"Processing item: ${item.id}")
}
logInfo(s"Completed processing ${processed} items")
}
def logLargeObject(obj: LargeObject): Unit = {
// Check level before expensive serialization
if (log.isDebugEnabled) {
logDebug(s"Object state: ${obj.toDebugString}")
}
// For very large objects, log summary instead
logInfo(s"Processing object of type ${obj.getClass.getSimpleName} " +
s"with ${obj.getElementCount} elements")
}
}class SparkTaskLogger extends Logging {
def executeSparkTask(taskId: String, partitionId: Int): Unit = {
// Include context information in logs
logInfo(s"[Task $taskId] [Partition $partitionId] Starting execution")
try {
val startTime = System.currentTimeMillis()
// Task execution
val result = performTaskWork()
val duration = System.currentTimeMillis() - startTime
logInfo(s"[Task $taskId] [Partition $partitionId] Completed in ${duration}ms")
} catch {
case e: Exception =>
logError(s"[Task $taskId] [Partition $partitionId] Failed", e)
throw e
}
}
private def performTaskWork(): TaskResult = {
logDebug("Performing task-specific work")
// Implementation details
TaskResult.success()
}
}The Logging trait integrates with Log4j 2 and provides Spark-specific configuration:
log4j2-defaults.properties