0
# Logging Framework
1
2
Structured logging infrastructure with Mapped Diagnostic Context (MDC) support and standardized logging keys for consistent log formatting across Spark.
3
4
## Logging Trait
5
6
The `Logging` trait provides the foundation for structured logging in Spark applications.
7
8
```scala { .api }
9
import org.apache.spark.internal.Logging
10
11
trait Logging {
12
// String-based logging methods
13
protected def logInfo(msg: => String): Unit
14
protected def logDebug(msg: => String): Unit
15
protected def logTrace(msg: => String): Unit
16
protected def logWarning(msg: => String): Unit
17
protected def logError(msg: => String): Unit
18
19
// Structured logging with LogEntry
20
protected def logInfo(entry: LogEntry): Unit
21
protected def logDebug(entry: LogEntry): Unit
22
protected def logTrace(entry: LogEntry): Unit
23
protected def logWarning(entry: LogEntry): Unit
24
protected def logError(entry: LogEntry): Unit
25
26
// Logging with throwables
27
protected def logInfo(msg: => String, throwable: Throwable): Unit
28
protected def logDebug(msg: => String, throwable: Throwable): Unit
29
protected def logTrace(msg: => String, throwable: Throwable): Unit
30
protected def logWarning(msg: => String, throwable: Throwable): Unit
31
protected def logError(msg: => String, throwable: Throwable): Unit
32
33
protected def logInfo(entry: LogEntry, throwable: Throwable): Unit
34
protected def logDebug(entry: LogEntry, throwable: Throwable): Unit
35
protected def logTrace(entry: LogEntry, throwable: Throwable): Unit
36
protected def logWarning(entry: LogEntry, throwable: Throwable): Unit
37
protected def logError(entry: LogEntry, throwable: Throwable): Unit
38
39
// Utility methods
40
protected def isTraceEnabled(): Boolean
41
protected def withLogContext(context: java.util.Map[String, String])(body: => Unit): Unit
42
}
43
```
44
45
### Basic Usage
46
47
```scala { .api }
48
import org.apache.spark.internal.Logging
49
50
class DataProcessor extends Logging {
51
def processData(inputPath: String): Unit = {
52
logInfo(s"Starting data processing from path: $inputPath")
53
54
try {
55
// Processing logic
56
logDebug("Data validation completed successfully")
57
} catch {
58
case ex: Exception =>
59
logError("Data processing failed", ex)
60
throw ex
61
}
62
63
logInfo("Data processing completed successfully")
64
}
65
66
def debugProcessing(): Unit = {
67
if (isTraceEnabled()) {
68
logTrace("Detailed processing steps enabled")
69
}
70
}
71
}
72
```
73
74
## Mapped Diagnostic Context (MDC)
75
76
### MDC Class
77
78
The `MDC` case class provides structured context for log messages.
79
80
```scala { .api }
81
import org.apache.spark.internal.{MDC, LogKey}
82
83
case class MDC(key: LogKey, value: Any)
84
85
object MDC {
86
def of(key: LogKey, value: Any): MDC = MDC(key, value)
87
}
88
```
89
90
### MessageWithContext
91
92
Enhanced logging with structured context information.
93
94
```scala { .api }
95
import org.apache.spark.internal.MessageWithContext
96
97
case class MessageWithContext(message: String, context: Map[LogKey, Any]) {
98
def +(mdc: MessageWithContext): MessageWithContext
99
def stripMargin: MessageWithContext
100
}
101
```
102
103
### Structured Logging Usage
104
105
```scala { .api }
106
import org.apache.spark.internal.{Logging, LogKeys, MDC}
107
108
class SparkJobManager extends Logging {
109
def executeJob(jobId: String, appId: String, userId: String): Unit = {
110
// Structured logging with context
111
logInfo(log"Starting job execution for " +
112
log"job ${MDC(LogKeys.JOB_ID, jobId)} " +
113
log"in app ${MDC(LogKeys.APP_ID, appId)} " +
114
log"by user ${MDC(LogKeys.USER_ID, userId)}")
115
116
val startTime = System.currentTimeMillis()
117
118
try {
119
executeJobSteps(jobId)
120
121
val duration = System.currentTimeMillis() - startTime
122
logInfo(log"Job completed successfully in ${MDC(LogKeys.DURATION, duration)}ms")
123
124
} catch {
125
case ex: Exception =>
126
logError(log"Job execution failed for " +
127
log"job ${MDC(LogKeys.JOB_ID, jobId)}: " +
128
log"${MDC(LogKeys.ERROR_CLASS, ex.getClass.getSimpleName)}", ex)
129
throw ex
130
}
131
}
132
133
private def executeJobSteps(jobId: String): Unit = {
134
val stages = getJobStages(jobId)
135
136
stages.zipWithIndex.foreach { case (stage, index) =>
137
logDebug(log"Executing stage ${MDC(LogKeys.STAGE_ID, stage.id)} " +
138
log"(${MDC(LogKeys.STAGE_INDEX, index + 1)} of ${MDC(LogKeys.TOTAL_STAGES, stages.length)})")
139
140
stage.execute()
141
}
142
}
143
}
144
```
145
146
## Log Keys
147
148
The `LogKeys` object provides over 900 standardized keys for consistent logging across Spark.
149
150
### Core Identifiers
151
152
```scala { .api }
153
import org.apache.spark.internal.LogKeys
154
155
// Application and job identifiers
156
LogKeys.APP_ID // Application ID
157
LogKeys.APP_NAME // Application name
158
LogKeys.JOB_ID // Job ID
159
LogKeys.STAGE_ID // Stage ID
160
LogKeys.TASK_ID // Task ID
161
LogKeys.EXECUTOR_ID // Executor ID
162
LogKeys.DRIVER_ID // Driver ID
163
164
// User and session information
165
LogKeys.USER_ID // User identifier
166
LogKeys.SESSION_ID // Session identifier
167
```
168
169
### Data and Processing
170
171
```scala { .api }
172
// Data identifiers
173
LogKeys.TABLE_NAME // Table name
174
LogKeys.COLUMN_NAME // Column name
175
LogKeys.PARTITION_ID // Partition ID
176
LogKeys.DATABASE_NAME // Database name
177
LogKeys.SCHEMA_NAME // Schema name
178
179
// File and path information
180
LogKeys.FILE_NAME // File name
181
LogKeys.FILE_PATH // File path
182
LogKeys.FILE_SIZE // File size
183
LogKeys.INPUT_PATH // Input path
184
LogKeys.OUTPUT_PATH // Output path
185
```
186
187
### System and Performance
188
189
```scala { .api }
190
// Performance metrics
191
LogKeys.DURATION // Operation duration
192
LogKeys.RECORD_COUNT // Number of records
193
LogKeys.BATCH_SIZE // Batch size
194
LogKeys.MEMORY_SIZE // Memory usage
195
LogKeys.DISK_SIZE // Disk usage
196
197
// Network and connectivity
198
LogKeys.HOST // Host name/address
199
LogKeys.PORT // Port number
200
LogKeys.URL // URL
201
LogKeys.CONNECTION_ID // Connection identifier
202
```
203
204
### Error and Status
205
206
```scala { .api }
207
// Error information
208
LogKeys.ERROR_CLASS // Error class name
209
LogKeys.ERROR_CODE // Error code
210
LogKeys.ERROR_MESSAGE // Error message
211
LogKeys.EXCEPTION_TYPE // Exception type
212
213
// Status and state
214
LogKeys.STATUS // Operation status
215
LogKeys.STATE // Current state
216
LogKeys.RESULT // Operation result
217
LogKeys.SUCCESS // Success flag
218
```
219
220
### Configuration and Parameters
221
222
```scala { .api }
223
// Configuration
224
LogKeys.CONFIG_KEY // Configuration key
225
LogKeys.CONFIG_VALUE // Configuration value
226
LogKeys.PARAMETER_NAME // Parameter name
227
LogKeys.PARAMETER_VALUE // Parameter value
228
229
// Versions and builds
230
LogKeys.VERSION // Version information
231
LogKeys.BUILD_VERSION // Build version
232
LogKeys.SPARK_VERSION // Spark version
233
```
234
235
## Advanced Logging Patterns
236
237
### Context Management
238
239
```scala { .api }
240
import org.apache.spark.internal.{Logging, LogKeys, MDC}
241
import java.util.{Map => JMap, HashMap => JHashMap}
242
243
class ContextualProcessor extends Logging {
244
def processWithContext(appId: String, userId: String): Unit = {
245
val context: JMap[String, String] = new JHashMap()
246
context.put(LogKeys.APP_ID.name, appId)
247
context.put(LogKeys.USER_ID.name, userId)
248
249
withLogContext(context) {
250
logInfo("Starting contextual processing")
251
252
// All log messages in this block will include the context
253
processSteps()
254
255
logInfo("Contextual processing completed")
256
}
257
}
258
259
private def processSteps(): Unit = {
260
logDebug("Executing step 1")
261
logDebug("Executing step 2")
262
logDebug("Executing step 3")
263
}
264
}
265
```
266
267
### Hierarchical Logging
268
269
```scala { .api }
270
class HierarchicalProcessor extends Logging {
271
def processDataPipeline(pipelineId: String): Unit = {
272
logInfo(log"Starting pipeline ${MDC(LogKeys.PIPELINE_ID, pipelineId)}")
273
274
val jobs = getPipelineJobs(pipelineId)
275
jobs.foreach(processJob)
276
277
logInfo(log"Pipeline ${MDC(LogKeys.PIPELINE_ID, pipelineId)} completed")
278
}
279
280
private def processJob(job: Job): Unit = {
281
logInfo(log"Starting job ${MDC(LogKeys.JOB_ID, job.id)} " +
282
log"in pipeline ${MDC(LogKeys.PIPELINE_ID, job.pipelineId)}")
283
284
val tasks = job.getTasks()
285
tasks.foreach(processTask(job, _))
286
287
logInfo(log"Job ${MDC(LogKeys.JOB_ID, job.id)} completed")
288
}
289
290
private def processTask(job: Job, task: Task): Unit = {
291
logDebug(log"Executing task ${MDC(LogKeys.TASK_ID, task.id)} " +
292
log"in job ${MDC(LogKeys.JOB_ID, job.id)}")
293
294
val startTime = System.currentTimeMillis()
295
296
try {
297
task.execute()
298
val duration = System.currentTimeMillis() - startTime
299
300
logDebug(log"Task ${MDC(LogKeys.TASK_ID, task.id)} completed " +
301
log"in ${MDC(LogKeys.DURATION, duration)}ms")
302
303
} catch {
304
case ex: Exception =>
305
logError(log"Task ${MDC(LogKeys.TASK_ID, task.id)} failed: " +
306
log"${MDC(LogKeys.ERROR_CLASS, ex.getClass.getSimpleName)}", ex)
307
throw ex
308
}
309
}
310
}
311
```
312
313
### Performance Logging
314
315
```scala { .api }
316
class PerformanceAwareProcessor extends Logging {
317
def processLargeDataset(inputPath: String, recordCount: Long): Unit = {
318
val startTime = System.currentTimeMillis()
319
320
logInfo(log"Processing dataset from ${MDC(LogKeys.INPUT_PATH, inputPath)} " +
321
log"with ${MDC(LogKeys.RECORD_COUNT, recordCount)} records")
322
323
val batchSize = 10000
324
val batches = (recordCount + batchSize - 1) / batchSize
325
326
logInfo(log"Processing in ${MDC(LogKeys.BATCH_COUNT, batches)} batches " +
327
log"of ${MDC(LogKeys.BATCH_SIZE, batchSize)} records")
328
329
(0 until batches.toInt).foreach { batchIndex =>
330
val batchStartTime = System.currentTimeMillis()
331
332
logDebug(log"Processing batch ${MDC(LogKeys.BATCH_INDEX, batchIndex + 1)} " +
333
log"of ${MDC(LogKeys.BATCH_COUNT, batches)}")
334
335
processBatch(batchIndex, batchSize)
336
337
val batchDuration = System.currentTimeMillis() - batchStartTime
338
val recordsPerSecond = (batchSize * 1000.0 / batchDuration).toInt
339
340
logDebug(log"Batch ${MDC(LogKeys.BATCH_INDEX, batchIndex + 1)} completed " +
341
log"in ${MDC(LogKeys.DURATION, batchDuration)}ms " +
342
log"(${MDC(LogKeys.THROUGHPUT, recordsPerSecond)} records/sec)")
343
}
344
345
val totalDuration = System.currentTimeMillis() - startTime
346
val overallThroughput = (recordCount * 1000.0 / totalDuration).toInt
347
348
logInfo(log"Dataset processing completed in ${MDC(LogKeys.DURATION, totalDuration)}ms " +
349
log"(${MDC(LogKeys.THROUGHPUT, overallThroughput)} records/sec)")
350
}
351
}
352
```
353
354
## LogUtils Developer API
355
356
Utilities for querying Spark logs with Spark SQL.
357
358
```scala { .api }
359
import org.apache.spark.util.LogUtils
360
361
object LogUtils {
362
// Schema for structured Spark logs
363
val SPARK_LOG_SCHEMA: String
364
}
365
```
366
367
### Usage with Spark SQL
368
369
```scala { .api }
370
import org.apache.spark.sql.SparkSession
371
import org.apache.spark.util.LogUtils
372
373
val spark = SparkSession.builder()
374
.appName("LogAnalysis")
375
.getOrCreate()
376
377
// Read structured logs using the schema
378
val logsDF = spark.read
379
.schema(LogUtils.SPARK_LOG_SCHEMA)
380
.json("hdfs://logs/spark-structured-logs")
381
382
// Query logs for specific patterns
383
logsDF.createOrReplaceTempView("spark_logs")
384
385
val errorLogs = spark.sql("""
386
SELECT timestamp, level, logger, message, mdc
387
FROM spark_logs
388
WHERE level = 'ERROR'
389
AND mdc.app_id IS NOT NULL
390
ORDER BY timestamp DESC
391
""")
392
393
errorLogs.show(50)
394
```
395
396
## Best Practices
397
398
### Choosing Log Levels
399
400
```scala { .api }
401
class BestPracticesExample extends Logging {
402
def demonstrateLogLevels(): Unit = {
403
// ERROR: For serious problems that prevent normal operation
404
logError("Database connection failed - application cannot continue")
405
406
// WARNING: For potential problems that don't prevent operation
407
logWarning("Deprecated configuration option detected, will be removed in future version")
408
409
// INFO: For general information about application flow
410
logInfo("Starting data processing job with 1000 input files")
411
412
// DEBUG: For detailed diagnostic information
413
logDebug("Applying transformation: filter -> map -> reduce")
414
415
// TRACE: For very detailed diagnostic information
416
logTrace("Processing record with ID: 12345, timestamp: 2023-01-01T00:00:00Z")
417
}
418
}
419
```
420
421
### Structured Logging Guidelines
422
423
1. **Use MDC consistently**: Always include relevant context
424
2. **Standardize keys**: Use `LogKeys` constants for consistency
425
3. **Include performance metrics**: Add timing and throughput information
426
4. **Structure error information**: Include error classes and relevant parameters
427
428
```scala { .api }
429
// Good: Structured logging with consistent context
430
logInfo(log"Job ${MDC(LogKeys.JOB_ID, jobId)} completed " +
431
log"in ${MDC(LogKeys.DURATION, duration)}ms " +
432
log"processing ${MDC(LogKeys.RECORD_COUNT, recordCount)} records")
433
434
// Avoid: Unstructured string concatenation
435
logInfo(s"Job $jobId completed in ${duration}ms processing $recordCount records")
436
```
437
438
### Performance Considerations
439
440
```scala { .api }
441
class PerformantLogging extends Logging {
442
def efficientLogging(): Unit = {
443
// Use lazy evaluation for expensive operations
444
logDebug {
445
val expensiveComputation = computeStatistics()
446
s"Statistics: $expensiveComputation"
447
}
448
449
// Check log level before expensive context creation
450
if (isTraceEnabled()) {
451
val detailedContext = buildDetailedContext()
452
logTrace(log"Detailed context: ${MDC(LogKeys.CONTEXT, detailedContext)}")
453
}
454
455
// Use structured logging for better performance than string formatting
456
logInfo(log"Processing ${MDC(LogKeys.RECORD_COUNT, records.size)} records")
457
}
458
}
459
```
460
461
### Integration with Exception Handling
462
463
```scala { .api }
464
import org.apache.spark.{SparkException, internal.Logging}
465
466
class IntegratedLogging extends Logging {
467
def processWithLogging(): Unit = {
468
try {
469
logInfo("Starting critical operation")
470
performCriticalOperation()
471
logInfo("Critical operation completed successfully")
472
473
} catch {
474
case ex: SparkException =>
475
logError(log"Spark operation failed: " +
476
log"${MDC(LogKeys.ERROR_CLASS, ex.getCondition())} " +
477
log"${MDC(LogKeys.ERROR_MESSAGE, ex.getMessage)}", ex)
478
throw ex
479
480
case ex: Exception =>
481
logError(log"Unexpected error in critical operation: " +
482
log"${MDC(LogKeys.ERROR_CLASS, ex.getClass.getSimpleName)}", ex)
483
throw new SparkException("CRITICAL_OPERATION_FAILED", Map.empty, ex)
484
}
485
}
486
}
487
```