0
# Logging Infrastructure
1
2
SLF4J-based logging infrastructure with lazy evaluation, level checking, and Spark-specific configuration management for distributed computing environments.
3
4
## Capabilities
5
6
### Logging Trait
7
8
Core logging functionality that provides SLF4J-based logging with performance optimizations for distributed computing.
9
10
```scala { .api }
11
/**
12
* Utility trait for classes that want to log data
13
* Creates a SLF4J logger for the class and allows logging messages at different levels
14
* using methods that only evaluate parameters lazily if the log level is enabled
15
*/
16
trait Logging {
17
18
/** Protected method to get the logger name for this object */
19
protected def logName: String
20
21
/** Protected method to get or create the logger for this object */
22
protected def log: Logger
23
24
/** Log info message with lazy evaluation */
25
protected def logInfo(msg: => String): Unit
26
27
/** Log debug message with lazy evaluation */
28
protected def logDebug(msg: => String): Unit
29
30
/** Log trace message with lazy evaluation */
31
protected def logTrace(msg: => String): Unit
32
33
/** Log warning message with lazy evaluation */
34
protected def logWarning(msg: => String): Unit
35
36
/** Log error message with lazy evaluation */
37
protected def logError(msg: => String): Unit
38
39
/** Log info message with throwable */
40
protected def logInfo(msg: => String, throwable: Throwable): Unit
41
42
/** Log debug message with throwable */
43
protected def logDebug(msg: => String, throwable: Throwable): Unit
44
45
/** Log trace message with throwable */
46
protected def logTrace(msg: => String, throwable: Throwable): Unit
47
48
/** Log warning message with throwable */
49
protected def logWarning(msg: => String, throwable: Throwable): Unit
50
51
/** Log error message with throwable */
52
protected def logError(msg: => String, throwable: Throwable): Unit
53
54
/** Check if trace logging is enabled */
55
protected def isTraceEnabled(): Boolean
56
57
/** Initialize logging if necessary */
58
protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit
59
60
/** Initialize logging if necessary with silence option */
61
protected def initializeLogIfNecessary(isInterpreter: Boolean, silent: Boolean): Boolean
62
}
63
```
64
65
**Usage Examples:**
66
67
```scala
68
import org.apache.spark.internal.Logging
69
70
// Extend Logging trait in your class
71
class DataProcessor extends Logging {
72
73
def processData(data: Seq[String]): Seq[String] = {
74
logInfo(s"Starting to process ${data.size} records")
75
76
if (log.isDebugEnabled) {
77
logDebug(s"Input data: ${data.take(5).mkString(", ")}...")
78
}
79
80
try {
81
val result = data.map(_.toUpperCase)
82
logInfo(s"Successfully processed ${result.size} records")
83
result
84
} catch {
85
case e: Exception =>
86
logError("Failed to process data", e)
87
throw e
88
}
89
}
90
91
def validateData(data: String): Boolean = {
92
logTrace(s"Validating data: $data")
93
94
val isValid = data.nonEmpty && data.length < 1000
95
96
if (!isValid) {
97
logWarning(s"Data validation failed for: ${data.take(50)}...")
98
}
99
100
isValid
101
}
102
}
103
104
// Usage in Spark components
105
class SparkTaskProcessor extends Logging {
106
def executeTask(taskId: String): Unit = {
107
logInfo(s"Executing task: $taskId")
108
109
// Expensive logging computation only happens if debug is enabled
110
logDebug(s"Task details: ${computeExpensiveTaskInfo()}")
111
112
try {
113
// Task execution logic
114
logInfo(s"Task $taskId completed successfully")
115
} catch {
116
case e: Exception =>
117
logError(s"Task $taskId failed", e)
118
throw e
119
}
120
}
121
122
private def computeExpensiveTaskInfo(): String = {
123
// This expensive computation only runs if debug logging is enabled
124
// due to the lazy evaluation of the message parameter
125
Thread.sleep(100) // Simulate expensive computation
126
"Detailed task information"
127
}
128
}
129
```
130
131
### Logging Companion Object
132
133
Configuration and management utilities for the Spark logging system.
134
135
```scala { .api }
136
private[spark] object Logging {
137
/** Lock for synchronizing logging initialization */
138
val initLock: Object
139
140
/** Reset the logging system to its initial state */
141
def uninitialize(): Unit
142
143
/** Internal logging filter for Spark shell */
144
private[spark] class SparkShellLoggingFilter extends AbstractFilter {
145
override def filter(logEvent: LogEvent): Filter.Result
146
override def getState: LifeCycle.State
147
override def initialize(): Unit
148
override def start(): Unit
149
override def stop(): Unit
150
override def isStarted: Boolean
151
override def isStopped: Boolean
152
}
153
}
154
```
155
156
**Configuration Examples:**
157
158
```scala
159
import org.apache.spark.internal.Logging
160
161
// Manual logging system management (advanced usage)
162
object LoggingManager {
163
def resetLogging(): Unit = {
164
Logging.uninitialize()
165
}
166
167
def initializeForInterpreter(): Unit = {
168
// This would typically be handled automatically by Spark
169
val logger = new Logging {}
170
logger.initializeLogIfNecessary(isInterpreter = true, silent = false)
171
}
172
}
173
174
// Custom logging configuration
175
class CustomSparkComponent extends Logging {
176
// Force logging initialization (for testing)
177
initializeForcefully(isInterpreter = false, silent = true)
178
179
def performOperation(): Unit = {
180
if (isTraceEnabled()) {
181
logTrace("Entering performOperation")
182
}
183
184
logInfo("Performing custom Spark operation")
185
186
// Operation logic here
187
188
logInfo("Custom operation completed")
189
}
190
}
191
```
192
193
## Logging Best Practices
194
195
### Lazy Evaluation
196
197
The Logging trait uses lazy evaluation (call-by-name parameters) to avoid expensive string operations when logging is disabled.
198
199
```scala
200
class EfficientLogger extends Logging {
201
202
def processLargeDataset(data: Seq[String]): Unit = {
203
// ✅ Good: Expensive operation only runs if debug is enabled
204
logDebug(s"Processing dataset with items: ${data.map(_.toUpperCase).mkString(", ")}")
205
206
// ❌ Avoid: String concatenation always happens
207
// logDebug("Processing dataset with items: " + data.map(_.toUpperCase).mkString(", "))
208
209
// ✅ Good: Check level first for very expensive operations
210
if (log.isDebugEnabled) {
211
val summary = generateExpensiveDataSummary(data)
212
logDebug(s"Dataset summary: $summary")
213
}
214
}
215
216
private def generateExpensiveDataSummary(data: Seq[String]): String = {
217
// Expensive analysis
218
data.groupBy(_.length).mapValues(_.size).toString
219
}
220
}
221
```
222
223
### Log Level Usage Guidelines
224
225
```scala
226
class ComponentLogger extends Logging {
227
228
def demonstrateLogLevels(): Unit = {
229
// ERROR: For errors that prevent operation from completing
230
logError("Failed to connect to database", databaseException)
231
232
// WARN: For recoverable issues or deprecated usage
233
logWarning("Using deprecated configuration, please update")
234
logWarning("Retrying failed operation", retryException)
235
236
// INFO: For high-level operation status
237
logInfo("Starting data processing job")
238
logInfo(s"Processed ${recordCount} records in ${duration}ms")
239
240
// DEBUG: For detailed diagnostic information
241
logDebug(s"Using connection pool with ${poolSize} connections")
242
logDebug(s"Configuration: ${config.toDebugString}")
243
244
// TRACE: For very detailed execution flow
245
logTrace("Entering method processRecord")
246
logTrace(s"Processing record with ID: ${record.id}")
247
}
248
}
249
```
250
251
### Exception Logging Patterns
252
253
```scala
254
class ErrorHandlingLogger extends Logging {
255
256
def handleOperationWithRetry(): Unit = {
257
var attempts = 0
258
val maxAttempts = 3
259
260
while (attempts < maxAttempts) {
261
try {
262
performOperation()
263
return
264
} catch {
265
case e: RetryableException =>
266
attempts += 1
267
if (attempts < maxAttempts) {
268
logWarning(s"Operation failed, retrying (attempt $attempts/$maxAttempts)", e)
269
} else {
270
logError(s"Operation failed after $maxAttempts attempts", e)
271
throw e
272
}
273
274
case e: FatalException =>
275
logError("Fatal error occurred, not retrying", e)
276
throw e
277
}
278
}
279
}
280
281
def processWithValidation(input: String): String = {
282
try {
283
validate(input)
284
transform(input)
285
} catch {
286
case e: ValidationException =>
287
logWarning(s"Input validation failed: ${e.getMessage}")
288
throw e
289
290
case e: TransformException =>
291
logError("Transformation failed unexpectedly", e)
292
throw e
293
}
294
}
295
}
296
```
297
298
### Performance Considerations
299
300
```scala
301
class PerformanceAwareLogger extends Logging {
302
303
def processHighVolumeData(items: Seq[DataItem]): Unit = {
304
logInfo(s"Processing ${items.size} items")
305
306
var processed = 0
307
val reportInterval = 10000
308
309
for (item <- items) {
310
// Process item
311
processed += 1
312
313
// Avoid logging every item - use sampling
314
if (processed % reportInterval == 0) {
315
logInfo(s"Processed $processed items")
316
}
317
318
// Use trace level for detailed item logging (disabled in production)
319
logTrace(s"Processing item: ${item.id}")
320
}
321
322
logInfo(s"Completed processing ${processed} items")
323
}
324
325
def logLargeObject(obj: LargeObject): Unit = {
326
// Check level before expensive serialization
327
if (log.isDebugEnabled) {
328
logDebug(s"Object state: ${obj.toDebugString}")
329
}
330
331
// For very large objects, log summary instead
332
logInfo(s"Processing object of type ${obj.getClass.getSimpleName} " +
333
s"with ${obj.getElementCount} elements")
334
}
335
}
336
```
337
338
### Integration with Spark Components
339
340
```scala
341
class SparkTaskLogger extends Logging {
342
343
def executeSparkTask(taskId: String, partitionId: Int): Unit = {
344
// Include context information in logs
345
logInfo(s"[Task $taskId] [Partition $partitionId] Starting execution")
346
347
try {
348
val startTime = System.currentTimeMillis()
349
350
// Task execution
351
val result = performTaskWork()
352
353
val duration = System.currentTimeMillis() - startTime
354
logInfo(s"[Task $taskId] [Partition $partitionId] Completed in ${duration}ms")
355
356
} catch {
357
case e: Exception =>
358
logError(s"[Task $taskId] [Partition $partitionId] Failed", e)
359
throw e
360
}
361
}
362
363
private def performTaskWork(): TaskResult = {
364
logDebug("Performing task-specific work")
365
// Implementation details
366
TaskResult.success()
367
}
368
}
369
```
370
371
## Log4j Integration
372
373
The Logging trait integrates with Log4j 2 and provides Spark-specific configuration:
374
375
- **Default Configuration**: Automatic loading of `log4j2-defaults.properties`
376
- **Shell Integration**: Special handling for Spark shell environments
377
- **Level Management**: Dynamic log level adjustment
378
- **Filter Support**: Custom filtering for different execution contexts
379
- **Bridge Support**: Integration with SLF4J bridge for Java Util Logging