or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mdexceptions.mdgraphx.mdindex.mdlogging.mdmllib.mdsql.mdstorage.mdstreaming.mdutils.md

logging.mddocs/

0

# Logging Framework

1

2

Structured logging infrastructure with Mapped Diagnostic Context (MDC) support and standardized logging keys for consistent log formatting across Spark.

3

4

## Logging Trait

5

6

The `Logging` trait provides the foundation for structured logging in Spark applications.

7

8

```scala { .api }

9

import org.apache.spark.internal.Logging

10

11

trait Logging {

12

// String-based logging methods

13

protected def logInfo(msg: => String): Unit

14

protected def logDebug(msg: => String): Unit

15

protected def logTrace(msg: => String): Unit

16

protected def logWarning(msg: => String): Unit

17

protected def logError(msg: => String): Unit

18

19

// Structured logging with LogEntry

20

protected def logInfo(entry: LogEntry): Unit

21

protected def logDebug(entry: LogEntry): Unit

22

protected def logTrace(entry: LogEntry): Unit

23

protected def logWarning(entry: LogEntry): Unit

24

protected def logError(entry: LogEntry): Unit

25

26

// Logging with throwables

27

protected def logInfo(msg: => String, throwable: Throwable): Unit

28

protected def logDebug(msg: => String, throwable: Throwable): Unit

29

protected def logTrace(msg: => String, throwable: Throwable): Unit

30

protected def logWarning(msg: => String, throwable: Throwable): Unit

31

protected def logError(msg: => String, throwable: Throwable): Unit

32

33

protected def logInfo(entry: LogEntry, throwable: Throwable): Unit

34

protected def logDebug(entry: LogEntry, throwable: Throwable): Unit

35

protected def logTrace(entry: LogEntry, throwable: Throwable): Unit

36

protected def logWarning(entry: LogEntry, throwable: Throwable): Unit

37

protected def logError(entry: LogEntry, throwable: Throwable): Unit

38

39

// Utility methods

40

protected def isTraceEnabled(): Boolean

41

protected def withLogContext(context: java.util.Map[String, String])(body: => Unit): Unit

42

}

43

```

44

45

### Basic Usage

46

47

```scala { .api }

48

import org.apache.spark.internal.Logging

49

50

class DataProcessor extends Logging {

51

def processData(inputPath: String): Unit = {

52

logInfo(s"Starting data processing from path: $inputPath")

53

54

try {

55

// Processing logic

56

logDebug("Data validation completed successfully")

57

} catch {

58

case ex: Exception =>

59

logError("Data processing failed", ex)

60

throw ex

61

}

62

63

logInfo("Data processing completed successfully")

64

}

65

66

def debugProcessing(): Unit = {

67

if (isTraceEnabled()) {

68

logTrace("Detailed processing steps enabled")

69

}

70

}

71

}

72

```

73

74

## Mapped Diagnostic Context (MDC)

75

76

### MDC Class

77

78

The `MDC` case class provides structured context for log messages.

79

80

```scala { .api }

81

import org.apache.spark.internal.{MDC, LogKey}

82

83

case class MDC(key: LogKey, value: Any)

84

85

object MDC {

86

def of(key: LogKey, value: Any): MDC = MDC(key, value)

87

}

88

```

89

90

### MessageWithContext

91

92

Enhanced logging with structured context information.

93

94

```scala { .api }

95

import org.apache.spark.internal.MessageWithContext

96

97

case class MessageWithContext(message: String, context: Map[LogKey, Any]) {

98

def +(mdc: MessageWithContext): MessageWithContext

99

def stripMargin: MessageWithContext

100

}

101

```

102

103

### Structured Logging Usage

104

105

```scala { .api }

106

import org.apache.spark.internal.{Logging, LogKeys, MDC}

107

108

class SparkJobManager extends Logging {

109

def executeJob(jobId: String, appId: String, userId: String): Unit = {

110

// Structured logging with context

111

logInfo(log"Starting job execution for " +

112

log"job ${MDC(LogKeys.JOB_ID, jobId)} " +

113

log"in app ${MDC(LogKeys.APP_ID, appId)} " +

114

log"by user ${MDC(LogKeys.USER_ID, userId)}")

115

116

val startTime = System.currentTimeMillis()

117

118

try {

119

executeJobSteps(jobId)

120

121

val duration = System.currentTimeMillis() - startTime

122

logInfo(log"Job completed successfully in ${MDC(LogKeys.DURATION, duration)}ms")

123

124

} catch {

125

case ex: Exception =>

126

logError(log"Job execution failed for " +

127

log"job ${MDC(LogKeys.JOB_ID, jobId)}: " +

128

log"${MDC(LogKeys.ERROR_CLASS, ex.getClass.getSimpleName)}", ex)

129

throw ex

130

}

131

}

132

133

private def executeJobSteps(jobId: String): Unit = {

134

val stages = getJobStages(jobId)

135

136

stages.zipWithIndex.foreach { case (stage, index) =>

137

logDebug(log"Executing stage ${MDC(LogKeys.STAGE_ID, stage.id)} " +

138

log"(${MDC(LogKeys.STAGE_INDEX, index + 1)} of ${MDC(LogKeys.TOTAL_STAGES, stages.length)})")

139

140

stage.execute()

141

}

142

}

143

}

144

```

145

146

## Log Keys

147

148

The `LogKeys` object provides over 900 standardized keys for consistent logging across Spark.

149

150

### Core Identifiers

151

152

```scala { .api }

153

import org.apache.spark.internal.LogKeys

154

155

// Application and job identifiers

156

LogKeys.APP_ID // Application ID

157

LogKeys.APP_NAME // Application name

158

LogKeys.JOB_ID // Job ID

159

LogKeys.STAGE_ID // Stage ID

160

LogKeys.TASK_ID // Task ID

161

LogKeys.EXECUTOR_ID // Executor ID

162

LogKeys.DRIVER_ID // Driver ID

163

164

// User and session information

165

LogKeys.USER_ID // User identifier

166

LogKeys.SESSION_ID // Session identifier

167

```

168

169

### Data and Processing

170

171

```scala { .api }

172

// Data identifiers

173

LogKeys.TABLE_NAME // Table name

174

LogKeys.COLUMN_NAME // Column name

175

LogKeys.PARTITION_ID // Partition ID

176

LogKeys.DATABASE_NAME // Database name

177

LogKeys.SCHEMA_NAME // Schema name

178

179

// File and path information

180

LogKeys.FILE_NAME // File name

181

LogKeys.FILE_PATH // File path

182

LogKeys.FILE_SIZE // File size

183

LogKeys.INPUT_PATH // Input path

184

LogKeys.OUTPUT_PATH // Output path

185

```

186

187

### System and Performance

188

189

```scala { .api }

190

// Performance metrics

191

LogKeys.DURATION // Operation duration

192

LogKeys.RECORD_COUNT // Number of records

193

LogKeys.BATCH_SIZE // Batch size

194

LogKeys.MEMORY_SIZE // Memory usage

195

LogKeys.DISK_SIZE // Disk usage

196

197

// Network and connectivity

198

LogKeys.HOST // Host name/address

199

LogKeys.PORT // Port number

200

LogKeys.URL // URL

201

LogKeys.CONNECTION_ID // Connection identifier

202

```

203

204

### Error and Status

205

206

```scala { .api }

207

// Error information

208

LogKeys.ERROR_CLASS // Error class name

209

LogKeys.ERROR_CODE // Error code

210

LogKeys.ERROR_MESSAGE // Error message

211

LogKeys.EXCEPTION_TYPE // Exception type

212

213

// Status and state

214

LogKeys.STATUS // Operation status

215

LogKeys.STATE // Current state

216

LogKeys.RESULT // Operation result

217

LogKeys.SUCCESS // Success flag

218

```

219

220

### Configuration and Parameters

221

222

```scala { .api }

223

// Configuration

224

LogKeys.CONFIG_KEY // Configuration key

225

LogKeys.CONFIG_VALUE // Configuration value

226

LogKeys.PARAMETER_NAME // Parameter name

227

LogKeys.PARAMETER_VALUE // Parameter value

228

229

// Versions and builds

230

LogKeys.VERSION // Version information

231

LogKeys.BUILD_VERSION // Build version

232

LogKeys.SPARK_VERSION // Spark version

233

```

234

235

## Advanced Logging Patterns

236

237

### Context Management

238

239

```scala { .api }

240

import org.apache.spark.internal.{Logging, LogKeys, MDC}

241

import java.util.{Map => JMap, HashMap => JHashMap}

242

243

class ContextualProcessor extends Logging {

244

def processWithContext(appId: String, userId: String): Unit = {

245

val context: JMap[String, String] = new JHashMap()

246

context.put(LogKeys.APP_ID.name, appId)

247

context.put(LogKeys.USER_ID.name, userId)

248

249

withLogContext(context) {

250

logInfo("Starting contextual processing")

251

252

// All log messages in this block will include the context

253

processSteps()

254

255

logInfo("Contextual processing completed")

256

}

257

}

258

259

private def processSteps(): Unit = {

260

logDebug("Executing step 1")

261

logDebug("Executing step 2")

262

logDebug("Executing step 3")

263

}

264

}

265

```

266

267

### Hierarchical Logging

268

269

```scala { .api }

270

class HierarchicalProcessor extends Logging {

271

def processDataPipeline(pipelineId: String): Unit = {

272

logInfo(log"Starting pipeline ${MDC(LogKeys.PIPELINE_ID, pipelineId)}")

273

274

val jobs = getPipelineJobs(pipelineId)

275

jobs.foreach(processJob)

276

277

logInfo(log"Pipeline ${MDC(LogKeys.PIPELINE_ID, pipelineId)} completed")

278

}

279

280

private def processJob(job: Job): Unit = {

281

logInfo(log"Starting job ${MDC(LogKeys.JOB_ID, job.id)} " +

282

log"in pipeline ${MDC(LogKeys.PIPELINE_ID, job.pipelineId)}")

283

284

val tasks = job.getTasks()

285

tasks.foreach(processTask(job, _))

286

287

logInfo(log"Job ${MDC(LogKeys.JOB_ID, job.id)} completed")

288

}

289

290

private def processTask(job: Job, task: Task): Unit = {

291

logDebug(log"Executing task ${MDC(LogKeys.TASK_ID, task.id)} " +

292

log"in job ${MDC(LogKeys.JOB_ID, job.id)}")

293

294

val startTime = System.currentTimeMillis()

295

296

try {

297

task.execute()

298

val duration = System.currentTimeMillis() - startTime

299

300

logDebug(log"Task ${MDC(LogKeys.TASK_ID, task.id)} completed " +

301

log"in ${MDC(LogKeys.DURATION, duration)}ms")

302

303

} catch {

304

case ex: Exception =>

305

logError(log"Task ${MDC(LogKeys.TASK_ID, task.id)} failed: " +

306

log"${MDC(LogKeys.ERROR_CLASS, ex.getClass.getSimpleName)}", ex)

307

throw ex

308

}

309

}

310

}

311

```

312

313

### Performance Logging

314

315

```scala { .api }

316

class PerformanceAwareProcessor extends Logging {

317

def processLargeDataset(inputPath: String, recordCount: Long): Unit = {

318

val startTime = System.currentTimeMillis()

319

320

logInfo(log"Processing dataset from ${MDC(LogKeys.INPUT_PATH, inputPath)} " +

321

log"with ${MDC(LogKeys.RECORD_COUNT, recordCount)} records")

322

323

val batchSize = 10000

324

val batches = (recordCount + batchSize - 1) / batchSize

325

326

logInfo(log"Processing in ${MDC(LogKeys.BATCH_COUNT, batches)} batches " +

327

log"of ${MDC(LogKeys.BATCH_SIZE, batchSize)} records")

328

329

(0 until batches.toInt).foreach { batchIndex =>

330

val batchStartTime = System.currentTimeMillis()

331

332

logDebug(log"Processing batch ${MDC(LogKeys.BATCH_INDEX, batchIndex + 1)} " +

333

log"of ${MDC(LogKeys.BATCH_COUNT, batches)}")

334

335

processBatch(batchIndex, batchSize)

336

337

val batchDuration = System.currentTimeMillis() - batchStartTime

338

val recordsPerSecond = (batchSize * 1000.0 / batchDuration).toInt

339

340

logDebug(log"Batch ${MDC(LogKeys.BATCH_INDEX, batchIndex + 1)} completed " +

341

log"in ${MDC(LogKeys.DURATION, batchDuration)}ms " +

342

log"(${MDC(LogKeys.THROUGHPUT, recordsPerSecond)} records/sec)")

343

}

344

345

val totalDuration = System.currentTimeMillis() - startTime

346

val overallThroughput = (recordCount * 1000.0 / totalDuration).toInt

347

348

logInfo(log"Dataset processing completed in ${MDC(LogKeys.DURATION, totalDuration)}ms " +

349

log"(${MDC(LogKeys.THROUGHPUT, overallThroughput)} records/sec)")

350

}

351

}

352

```

353

354

## LogUtils Developer API

355

356

Utilities for querying Spark logs with Spark SQL.

357

358

```scala { .api }

359

import org.apache.spark.util.LogUtils

360

361

object LogUtils {

362

// Schema for structured Spark logs

363

val SPARK_LOG_SCHEMA: String

364

}

365

```

366

367

### Usage with Spark SQL

368

369

```scala { .api }

370

import org.apache.spark.sql.SparkSession

371

import org.apache.spark.util.LogUtils

372

373

val spark = SparkSession.builder()

374

.appName("LogAnalysis")

375

.getOrCreate()

376

377

// Read structured logs using the schema

378

val logsDF = spark.read

379

.schema(LogUtils.SPARK_LOG_SCHEMA)

380

.json("hdfs://logs/spark-structured-logs")

381

382

// Query logs for specific patterns

383

logsDF.createOrReplaceTempView("spark_logs")

384

385

val errorLogs = spark.sql("""

386

SELECT timestamp, level, logger, message, mdc

387

FROM spark_logs

388

WHERE level = 'ERROR'

389

AND mdc.app_id IS NOT NULL

390

ORDER BY timestamp DESC

391

""")

392

393

errorLogs.show(50)

394

```

395

396

## Best Practices

397

398

### Choosing Log Levels

399

400

```scala { .api }

401

class BestPracticesExample extends Logging {

402

def demonstrateLogLevels(): Unit = {

403

// ERROR: For serious problems that prevent normal operation

404

logError("Database connection failed - application cannot continue")

405

406

// WARNING: For potential problems that don't prevent operation

407

logWarning("Deprecated configuration option detected, will be removed in future version")

408

409

// INFO: For general information about application flow

410

logInfo("Starting data processing job with 1000 input files")

411

412

// DEBUG: For detailed diagnostic information

413

logDebug("Applying transformation: filter -> map -> reduce")

414

415

// TRACE: For very detailed diagnostic information

416

logTrace("Processing record with ID: 12345, timestamp: 2023-01-01T00:00:00Z")

417

}

418

}

419

```

420

421

### Structured Logging Guidelines

422

423

1. **Use MDC consistently**: Always include relevant context

424

2. **Standardize keys**: Use `LogKeys` constants for consistency

425

3. **Include performance metrics**: Add timing and throughput information

426

4. **Structure error information**: Include error classes and relevant parameters

427

428

```scala { .api }

429

// Good: Structured logging with consistent context

430

logInfo(log"Job ${MDC(LogKeys.JOB_ID, jobId)} completed " +

431

log"in ${MDC(LogKeys.DURATION, duration)}ms " +

432

log"processing ${MDC(LogKeys.RECORD_COUNT, recordCount)} records")

433

434

// Avoid: Unstructured string concatenation

435

logInfo(s"Job $jobId completed in ${duration}ms processing $recordCount records")

436

```

437

438

### Performance Considerations

439

440

```scala { .api }

441

class PerformantLogging extends Logging {

442

def efficientLogging(): Unit = {

443

// Use lazy evaluation for expensive operations

444

logDebug {

445

val expensiveComputation = computeStatistics()

446

s"Statistics: $expensiveComputation"

447

}

448

449

// Check log level before expensive context creation

450

if (isTraceEnabled()) {

451

val detailedContext = buildDetailedContext()

452

logTrace(log"Detailed context: ${MDC(LogKeys.CONTEXT, detailedContext)}")

453

}

454

455

// Use structured logging for better performance than string formatting

456

logInfo(log"Processing ${MDC(LogKeys.RECORD_COUNT, records.size)} records")

457

}

458

}

459

```

460

461

### Integration with Exception Handling

462

463

```scala { .api }

464

import org.apache.spark.{SparkException, internal.Logging}

465

466

class IntegratedLogging extends Logging {

467

def processWithLogging(): Unit = {

468

try {

469

logInfo("Starting critical operation")

470

performCriticalOperation()

471

logInfo("Critical operation completed successfully")

472

473

} catch {

474

case ex: SparkException =>

475

logError(log"Spark operation failed: " +

476

log"${MDC(LogKeys.ERROR_CLASS, ex.getCondition())} " +

477

log"${MDC(LogKeys.ERROR_MESSAGE, ex.getMessage)}", ex)

478

throw ex

479

480

case ex: Exception =>

481

logError(log"Unexpected error in critical operation: " +

482

log"${MDC(LogKeys.ERROR_CLASS, ex.getClass.getSimpleName)}", ex)

483

throw new SparkException("CRITICAL_OPERATION_FAILED", Map.empty, ex)

484

}

485

}

486

}

487

```