or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog.mddata-sources.mddata-types.mddataframe-dataset.mdindex.mdsession-management.mdsql-functions.mdstreaming.mdudfs.md

data-sources.mddocs/

0

# Apache Spark SQL - Data Sources and I/O Operations

1

2

## Capabilities

3

4

### Data Loading and Reading Operations

5

- Read data from various formats including Parquet, JSON, CSV, ORC, Avro, Delta, and text files

6

- Connect to external systems like JDBC databases, Kafka, Cassandra, and cloud storage systems

7

- Support for schema inference with configurable options and explicit schema specification

8

- Handle compressed files and partitioned datasets with optimized reading strategies

9

10

### Data Writing and Persistence Operations

11

- Write DataFrames to multiple output formats with configurable compression and partitioning

12

- Support for different save modes including append, overwrite, error-if-exists, and ignore

13

- Enable atomic writes and transactional operations for supported formats

14

- Handle large-scale data exports with optimized parallel writing and bucketing strategies

15

16

### Schema Management and Evolution

17

- Infer schemas automatically from data sources with type promotion and null handling

18

- Support schema evolution and merging for compatible schema changes

19

- Validate schemas during read and write operations with comprehensive error reporting

20

- Handle schema mismatches with configurable behavior for missing or extra columns

21

22

### Advanced I/O Configuration and Optimization

23

- Configure read and write operations with format-specific options for performance tuning

24

- Support for predicate pushdown and column pruning for optimized query execution

25

- Handle partitioning strategies including static and dynamic partitioning schemes

26

- Enable compression algorithms and encoding schemes for storage optimization

27

28

## API Reference

29

30

### DataFrameReader Class

31

```scala { .api }

32

class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

33

// Format specification

34

def format(source: String): DataFrameReader

35

36

// Schema operations

37

def schema(schema: StructType): DataFrameReader

38

def schema(schemaString: String): DataFrameReader

39

40

// Options configuration

41

def option(key: String, value: String): DataFrameReader

42

def option(key: String, value: Boolean): DataFrameReader

43

def option(key: String, value: Long): DataFrameReader

44

def option(key: String, value: Double): DataFrameReader

45

def options(options: scala.collection.Map[String, String]): DataFrameReader

46

def options(options: java.util.Map[String, String]): DataFrameReader

47

48

// Load operations

49

def load(): DataFrame

50

def load(path: String): DataFrame

51

def load(paths: String*): DataFrame

52

53

// Format-specific loaders

54

def json(path: String): DataFrame

55

def json(paths: String*): DataFrame

56

def json(jsonDataset: Dataset[String]): DataFrame

57

def csv(path: String): DataFrame

58

def csv(paths: String*): DataFrame

59

def parquet(path: String): DataFrame

60

def parquet(paths: String*): DataFrame

61

def orc(path: String): DataFrame

62

def orc(paths: String*): DataFrame

63

def text(path: String): DataFrame

64

def text(paths: String*): DataFrame

65

def textFile(path: String): Dataset[String]

66

def textFile(paths: String*): Dataset[String]

67

68

// Database and external sources

69

def jdbc(url: String, table: String, properties: Properties): DataFrame

70

def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame

71

def jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame

72

73

// Table operations

74

def table(tableName: String): DataFrame

75

}

76

```

77

78

### DataFrameWriter[T] Class

79

```scala { .api }

80

class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

81

// Format specification

82

def format(source: String): DataFrameWriter[T]

83

84

// Save mode configuration

85

def mode(saveMode: SaveMode): DataFrameWriter[T]

86

def mode(saveMode: String): DataFrameWriter[T]

87

88

// Options configuration

89

def option(key: String, value: String): DataFrameWriter[T]

90

def option(key: String, value: Boolean): DataFrameWriter[T]

91

def option(key: String, value: Long): DataFrameWriter[T]

92

def option(key: String, value: Double): DataFrameWriter[T]

93

def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]

94

def options(options: java.util.Map[String, String]): DataFrameWriter[T]

95

96

// Partitioning and bucketing

97

def partitionBy(colNames: String*): DataFrameWriter[T]

98

def partitionBy(cols: Seq[String]): DataFrameWriter[T]

99

def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]

100

def sortBy(colName: String, colNames: String*): DataFrameWriter[T]

101

102

// Save operations

103

def save(): Unit

104

def save(path: String): Unit

105

106

// Format-specific writers

107

def json(path: String): Unit

108

def csv(path: String): Unit

109

def parquet(path: String): Unit

110

def orc(path: String): Unit

111

def text(path: String): Unit

112

113

// Database operations

114

def jdbc(url: String, table: String, connectionProperties: Properties): Unit

115

116

// Table operations

117

def saveAsTable(tableName: String): Unit

118

def insertInto(tableName: String): Unit

119

}

120

```

121

122

### Save Modes

123

```scala { .api }

124

sealed abstract class SaveMode

125

126

object SaveMode {

127

case object Append extends SaveMode

128

case object Overwrite extends SaveMode

129

case object ErrorIfExists extends SaveMode

130

case object Ignore extends SaveMode

131

132

def valueOf(modeName: String): SaveMode

133

}

134

```

135

136

### DataFrameWriterV2[T] Class

137

```scala { .api }

138

class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) {

139

// Write operations

140

def append(): Unit

141

def overwrite(): Unit

142

def overwritePartitions(): Unit

143

144

// Conditional operations

145

def createOrReplace(): Unit

146

def create(): Unit

147

def replace(): Unit

148

149

// Partitioning

150

def partitionedBy(column: Column, columns: Column*): DataFrameWriterV2[T]

151

def partitionedBy(transform: String, transforms: String*): DataFrameWriterV2[T]

152

153

// Options

154

def option(key: String, value: String): DataFrameWriterV2[T]

155

def option(key: String, value: Boolean): DataFrameWriterV2[T]

156

def option(key: String, value: Long): DataFrameWriterV2[T]

157

def option(key: String, value: Double): DataFrameWriterV2[T]

158

def options(options: Map[String, String]): DataFrameWriterV2[T]

159

def options(options: java.util.Map[String, String]): DataFrameWriterV2[T]

160

161

// Table properties

162

def tableProperty(property: String, value: String): DataFrameWriterV2[T]

163

164

// Using clause for advanced operations

165

def using(provider: String): DataFrameWriterV2[T]

166

}

167

```

168

169

### Format-Specific Options

170

171

#### JSON Options

172

```scala { .api }

173

// JSON Reader Options

174

class JSONOptions(

175

val samplingRatio: Double = 1.0,

176

val primitivesAsString: Boolean = false,

177

val prefersDecimal: Boolean = false,

178

val allowComments: Boolean = false,

179

val allowUnquotedFieldNames: Boolean = false,

180

val allowSingleQuotes: Boolean = true,

181

val allowNumericLeadingZeros: Boolean = false,

182

val allowBackslashEscapingAnyCharacter: Boolean = false,

183

val allowUnquotedControlChars: Boolean = false,

184

val mode: ParseMode = ParseMode.FAILFAST,

185

val columnNameOfCorruptRecord: String = "_corrupt_record",

186

val dateFormat: String = "yyyy-MM-dd",

187

val timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]",

188

val multiLine: Boolean = false,

189

val lineSep: String = null,

190

val encoding: String = null,

191

val locale: Locale = Locale.US,

192

val pathGlobFilter: String = null,

193

val recursiveFileLookup: Boolean = false,

194

val modifiedBefore: String = null,

195

val modifiedAfter: String = null

196

)

197

```

198

199

#### CSV Options

200

```scala { .api }

201

// CSV Reader Options

202

class CSVOptions(

203

val delimiter: Char = ',',

204

val quote: Char = '"',

205

val escape: Char = '\\',

206

val charToEscapeQuoteEscaping: Char = '\u0000',

207

val comment: Char = '\u0000',

208

val header: Boolean = false,

209

val inferSchema: Boolean = false,

210

val ignoreLeadingWhiteSpace: Boolean = false,

211

val ignoreTrailingWhiteSpace: Boolean = false,

212

val nullValue: String = "",

213

val emptyValue: String = "",

214

val nanValue: String = "NaN",

215

val positiveInf: String = "Inf",

216

val negativeInf: String = "-Inf",

217

val dateFormat: String = "yyyy-MM-dd",

218

val timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]",

219

val maxColumns: Int = 20480,

220

val maxCharsPerColumn: Int = -1,

221

val mode: ParseMode = ParseMode.FAILFAST,

222

val columnNameOfCorruptRecord: String = "_corrupt_record",

223

val multiLine: Boolean = false,

224

val locale: Locale = Locale.US,

225

val lineSep: String = null,

226

val pathGlobFilter: String = null,

227

val recursiveFileLookup: Boolean = false,

228

val modifiedBefore: String = null,

229

val modifiedAfter: String = null,

230

val unescapedQuoteHandling: UnescapedQuoteHandling.Value = UnescapedQuoteHandling.STOP_AT_DELIMITER

231

)

232

```

233

234

#### Parquet Options

235

```scala { .api }

236

// Parquet Options

237

class ParquetOptions(

238

val compression: String = "snappy", // none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd

239

val mergeSchema: Boolean = false,

240

val pathGlobFilter: String = null,

241

val recursiveFileLookup: Boolean = false,

242

val modifiedBefore: String = null,

243

val modifiedAfter: String = null,

244

val datetimeRebaseMode: String = "EXCEPTION", // EXCEPTION, CORRECTED, LEGACY

245

val int96RebaseMode: String = "EXCEPTION"

246

)

247

```

248

249

#### JDBC Options

250

```scala { .api }

251

// JDBC Options

252

class JDBCOptions(

253

val url: String,

254

val table: String,

255

val driver: String = null,

256

val partitionColumn: String = null,

257

val lowerBound: String = null,

258

val upperBound: String = null,

259

val numPartitions: String = null,

260

val queryTimeout: String = "0",

261

val fetchsize: String = "0",

262

val batchsize: String = "1000",

263

val isolationLevel: String = "READ_UNCOMMITTED",

264

val sessionInitStatement: String = null,

265

val truncate: Boolean = false,

266

val cascadeTruncate: Boolean = false,

267

val createTableOptions: String = "",

268

val createTableColumnTypes: String = null,

269

val customSchema: String = null,

270

val pushDownPredicate: Boolean = true,

271

val pushDownAggregate: Boolean = false,

272

val pushDownLimit: Boolean = false,

273

val pushDownTableSample: Boolean = false,

274

val keytab: String = null,

275

val principal: String = null,

276

val refreshKrb5Config: Boolean = false,

277

val connectionProvider: String = null

278

)

279

```

280

281

## Usage Examples

282

283

### Basic File I/O Operations

284

```scala

285

import org.apache.spark.sql.{SparkSession, SaveMode}

286

import org.apache.spark.sql.types._

287

288

val spark = SparkSession.builder()

289

.appName("Data Sources Demo")

290

.getOrCreate()

291

292

// Reading JSON files

293

val jsonDF = spark.read

294

.format("json")

295

.option("multiLine", "true")

296

.option("mode", "PERMISSIVE")

297

.load("/path/to/json/files/*.json")

298

299

// Alternative JSON reading with schema

300

val jsonSchema = StructType(Array(

301

StructField("id", IntegerType, nullable = false),

302

StructField("name", StringType, nullable = true),

303

StructField("age", IntegerType, nullable = true),

304

StructField("email", StringType, nullable = true)

305

))

306

307

val typedJsonDF = spark.read

308

.schema(jsonSchema)

309

.json("/path/to/json/files")

310

311

// Reading CSV files

312

val csvDF = spark.read

313

.format("csv")

314

.option("header", "true")

315

.option("inferSchema", "true")

316

.option("delimiter", ",")

317

.option("quote", "\"")

318

.option("escape", "\\")

319

.option("nullValue", "NULL")

320

.option("dateFormat", "yyyy-MM-dd")

321

.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")

322

.load("/path/to/csv/files")

323

324

// Reading Parquet files

325

val parquetDF = spark.read

326

.format("parquet")

327

.option("mergeSchema", "true")

328

.load("/path/to/parquet/files")

329

330

// Reading ORC files

331

val orcDF = spark.read

332

.format("orc")

333

.load("/path/to/orc/files")

334

335

// Reading text files

336

val textDF = spark.read

337

.text("/path/to/text/files")

338

339

val textFileDS = spark.read

340

.textFile("/path/to/text/files")

341

```

342

343

### Writing Data to Different Formats

344

```scala

345

// Sample DataFrame for writing examples

346

val sampleData = Seq(

347

(1, "Alice", 25, "alice@example.com", "2023-01-15"),

348

(2, "Bob", 30, "bob@example.com", "2023-02-20"),

349

(3, "Charlie", 35, "charlie@example.com", "2023-03-10")

350

).toDF("id", "name", "age", "email", "join_date")

351

352

// Writing to JSON

353

sampleData.write

354

.format("json")

355

.mode(SaveMode.Overwrite)

356

.option("compression", "gzip")

357

.save("/path/to/output/json")

358

359

// Writing to CSV

360

sampleData.write

361

.format("csv")

362

.mode(SaveMode.Overwrite)

363

.option("header", "true")

364

.option("delimiter", ",")

365

.option("quote", "\"")

366

.option("escape", "\\")

367

.option("compression", "gzip")

368

.save("/path/to/output/csv")

369

370

// Writing to Parquet with partitioning

371

sampleData.write

372

.format("parquet")

373

.mode(SaveMode.Overwrite)

374

.option("compression", "snappy")

375

.partitionBy("join_date")

376

.save("/path/to/output/parquet")

377

378

// Writing to ORC

379

sampleData.write

380

.format("orc")

381

.mode(SaveMode.Overwrite)

382

.option("compression", "zlib")

383

.save("/path/to/output/orc")

384

385

// Writing to text (single column)

386

sampleData.select("name").write

387

.mode(SaveMode.Overwrite)

388

.text("/path/to/output/text")

389

```

390

391

### Advanced Partitioning and Bucketing

392

```scala

393

// Dynamic partitioning

394

val salesData = Seq(

395

(1, "Product A", 100.0, "2023-01-15", "Electronics", "US"),

396

(2, "Product B", 150.0, "2023-01-16", "Clothing", "US"),

397

(3, "Product C", 200.0, "2023-01-15", "Electronics", "UK"),

398

(4, "Product D", 120.0, "2023-01-17", "Books", "CA")

399

).toDF("id", "product_name", "price", "sale_date", "category", "country")

400

401

// Multi-level partitioning

402

salesData.write

403

.format("parquet")

404

.mode(SaveMode.Overwrite)

405

.partitionBy("country", "category")

406

.option("compression", "snappy")

407

.save("/path/to/partitioned/sales")

408

409

// Bucketing for better join performance

410

salesData.write

411

.format("parquet")

412

.mode(SaveMode.Overwrite)

413

.bucketBy(10, "id") // 10 buckets based on id column

414

.sortBy("price") // Sort within buckets

415

.option("path", "/path/to/bucketed/sales")

416

.saveAsTable("bucketed_sales")

417

418

// Custom partitioning with transformation

419

import org.apache.spark.sql.functions._

420

421

val partitionedData = salesData.withColumn("year", year(to_date($"sale_date")))

422

.withColumn("month", month(to_date($"sale_date")))

423

424

partitionedData.write

425

.format("delta") // Using Delta format for ACID properties

426

.mode(SaveMode.Overwrite)

427

.partitionBy("year", "month", "country")

428

.save("/path/to/delta/sales")

429

```

430

431

### Database Connectivity (JDBC)

432

```scala

433

import java.util.Properties

434

435

// JDBC connection properties

436

val connectionProperties = new Properties()

437

connectionProperties.put("user", "username")

438

connectionProperties.put("password", "password")

439

connectionProperties.put("driver", "org.postgresql.Driver")

440

441

// Reading from database

442

val jdbcDF = spark.read

443

.jdbc(

444

url = "jdbc:postgresql://localhost:5432/mydb",

445

table = "employees",

446

properties = connectionProperties

447

)

448

449

// Reading with SQL query

450

val queryDF = spark.read

451

.jdbc(

452

url = "jdbc:postgresql://localhost:5432/mydb",

453

table = "(SELECT * FROM employees WHERE department = 'Engineering') AS emp",

454

properties = connectionProperties

455

)

456

457

// Reading with partitioning for large tables

458

val partitionedJdbcDF = spark.read

459

.jdbc(

460

url = "jdbc:postgresql://localhost:5432/mydb",

461

table = "large_table",

462

columnName = "id", // Partition column

463

lowerBound = 1,

464

upperBound = 1000000,

465

numPartitions = 10,

466

connectionProperties = connectionProperties

467

)

468

469

// Reading with custom predicates

470

val predicates = Array(

471

"department = 'Engineering'",

472

"department = 'Sales'",

473

"department = 'Marketing'"

474

)

475

476

val predicateDF = spark.read

477

.jdbc(

478

url = "jdbc:postgresql://localhost:5432/mydb",

479

table = "employees",

480

predicates = predicates,

481

connectionProperties = connectionProperties

482

)

483

484

// Writing to database

485

sampleData.write

486

.mode(SaveMode.Append)

487

.jdbc(

488

url = "jdbc:postgresql://localhost:5432/mydb",

489

table = "new_employees",

490

connectionProperties = connectionProperties

491

)

492

493

// Writing with custom options

494

sampleData.write

495

.format("jdbc")

496

.option("url", "jdbc:postgresql://localhost:5432/mydb")

497

.option("dbtable", "employees_backup")

498

.option("user", "username")

499

.option("password", "password")

500

.option("driver", "org.postgresql.Driver")

501

.option("batchsize", "10000")

502

.option("isolationLevel", "READ_COMMITTED")

503

.mode(SaveMode.Overwrite)

504

.save()

505

```

506

507

### Schema Handling and Evolution

508

```scala

509

// Schema inference with sampling

510

val inferredDF = spark.read

511

.format("json")

512

.option("samplingRatio", "0.1") // Sample 10% for schema inference

513

.option("prefersDecimal", "true")

514

.load("/path/to/large/json/files")

515

516

// Explicit schema definition

517

val explicitSchema = StructType(Array(

518

StructField("user_id", StringType, nullable = false),

519

StructField("event_type", StringType, nullable = false),

520

StructField("timestamp", TimestampType, nullable = false),

521

StructField("properties", MapType(StringType, StringType), nullable = true),

522

StructField("user_properties", StructType(Array(

523

StructField("age", IntegerType, nullable = true),

524

StructField("country", StringType, nullable = true),

525

StructField("premium", BooleanType, nullable = false)

526

)), nullable = true)

527

))

528

529

val typedDF = spark.read

530

.schema(explicitSchema)

531

.format("json")

532

.load("/path/to/json/events")

533

534

// Schema evolution with Parquet

535

val evolvedParquetDF = spark.read

536

.format("parquet")

537

.option("mergeSchema", "true") // Merge schemas from different files

538

.load("/path/to/evolved/parquet/files")

539

540

// Handle corrupt records

541

val corruptHandlingDF = spark.read

542

.format("json")

543

.option("mode", "PERMISSIVE") // PERMISSIVE, DROPMALFORMED, FAILFAST

544

.option("columnNameOfCorruptRecord", "_corrupt_record")

545

.schema(explicitSchema.add("_corrupt_record", StringType))

546

.load("/path/to/potentially/corrupt/json")

547

548

// Schema compatibility checking

549

def validateSchema(df: DataFrame, expectedSchema: StructType): Boolean = {

550

val actualSchema = df.schema

551

expectedSchema.fields.forall { expectedField =>

552

actualSchema.fields.exists { actualField =>

553

actualField.name == expectedField.name &&

554

actualField.dataType == expectedField.dataType

555

}

556

}

557

}

558

559

val isCompatible = validateSchema(typedDF, explicitSchema)

560

println(s"Schema compatible: $isCompatible")

561

```

562

563

### Compressed File Handling

564

```scala

565

// Reading compressed files

566

val gzipJsonDF = spark.read

567

.format("json")

568

.option("compression", "gzip")

569

.load("/path/to/compressed/*.json.gz")

570

571

val bzip2CsvDF = spark.read

572

.format("csv")

573

.option("header", "true")

574

.option("compression", "bzip2")

575

.load("/path/to/compressed/*.csv.bz2")

576

577

// Writing with different compression algorithms

578

sampleData.write

579

.format("parquet")

580

.option("compression", "snappy") // snappy, gzip, lzo, brotli, lz4, zstd

581

.mode(SaveMode.Overwrite)

582

.save("/path/to/compressed/parquet")

583

584

sampleData.write

585

.format("json")

586

.option("compression", "gzip")

587

.mode(SaveMode.Overwrite)

588

.save("/path/to/compressed/json")

589

590

sampleData.write

591

.format("csv")

592

.option("header", "true")

593

.option("compression", "bzip2")

594

.mode(SaveMode.Overwrite)

595

.save("/path/to/compressed/csv")

596

597

// Optimal compression settings for different use cases

598

// For archival (maximize compression)

599

sampleData.write

600

.format("parquet")

601

.option("compression", "gzip")

602

.option("parquet.block.size", "268435456") // 256MB blocks

603

.save("/path/to/archive")

604

605

// For query performance (balance compression and speed)

606

sampleData.write

607

.format("parquet")

608

.option("compression", "snappy")

609

.option("parquet.page.size", "1048576") // 1MB pages

610

.save("/path/to/queryable")

611

```

612

613

### Advanced File Operations

614

```scala

615

// File globbing and filtering

616

val filteredDF = spark.read

617

.format("json")

618

.option("pathGlobFilter", "*.json") // Only JSON files

619

.option("recursiveFileLookup", "true") // Recursive directory traversal

620

.option("modifiedAfter", "2023-01-01T00:00:00") // Files modified after date

621

.option("modifiedBefore", "2023-12-31T23:59:59") // Files modified before date

622

.load("/path/to/data")

623

624

// Multi-path loading

625

val multiPathDF = spark.read

626

.format("parquet")

627

.load(

628

"/path/to/data/2023/01/*",

629

"/path/to/data/2023/02/*",

630

"/path/to/data/2023/03/*"

631

)

632

633

// Incremental data loading

634

import org.apache.spark.sql.functions._

635

636

def loadIncrementalData(basePath: String, lastProcessedTime: String): DataFrame = {

637

spark.read

638

.format("parquet")

639

.option("modifiedAfter", lastProcessedTime)

640

.load(basePath)

641

.filter($"event_time" > lit(lastProcessedTime))

642

}

643

644

val incrementalDF = loadIncrementalData("/path/to/events", "2023-12-01T00:00:00")

645

646

// Data source V2 operations

647

val v2Writer = sampleData.writeTo("catalog.db.table")

648

.option("write.format.default", "parquet")

649

.option("write.parquet.compression-codec", "snappy")

650

651

// Conditional writes

652

v2Writer.createOrReplace() // Create table or replace if exists

653

// v2Writer.create() // Create table, fail if exists

654

// v2Writer.append() // Append to existing table

655

// v2Writer.overwrite() // Overwrite entire table

656

// v2Writer.overwritePartitions() // Overwrite specific partitions

657

```

658

659

### Error Handling and Data Quality

660

```scala

661

import org.apache.spark.sql.functions._

662

663

// Robust CSV reading with error handling

664

val robustCsvDF = spark.read

665

.format("csv")

666

.option("header", "true")

667

.option("mode", "PERMISSIVE") // Continue processing despite errors

668

.option("columnNameOfCorruptRecord", "_corrupt_record")

669

.option("maxMalformedLogPerPartition", "10") // Log up to 10 malformed records per partition

670

.schema(

671

StructType(Array(

672

StructField("id", IntegerType, nullable = true),

673

StructField("name", StringType, nullable = true),

674

StructField("age", IntegerType, nullable = true),

675

StructField("_corrupt_record", StringType, nullable = true)

676

))

677

)

678

.load("/path/to/potentially/malformed/csv")

679

680

// Analyze data quality

681

val qualityReport = robustCsvDF.agg(

682

count("*").as("total_records"),

683

sum(when($"_corrupt_record".isNull, 1).otherwise(0)).as("valid_records"),

684

sum(when($"_corrupt_record".isNotNull, 1).otherwise(0)).as("corrupt_records"),

685

sum(when($"id".isNull, 1).otherwise(0)).as("missing_ids"),

686

sum(when($"name".isNull || $"name" === "", 1).otherwise(0)).as("missing_names")

687

)

688

689

qualityReport.show()

690

691

// Data validation and cleansing

692

val cleanedDF = robustCsvDF

693

.filter($"_corrupt_record".isNull) // Remove corrupt records

694

.filter($"id".isNotNull && $"id" > 0) // Valid IDs only

695

.filter($"name".isNotNull && length(trim($"name")) > 0) // Non-empty names

696

.filter($"age".isNull || ($"age" >= 0 && $"age" <= 150)) // Reasonable ages

697

.drop("_corrupt_record")

698

699

// Write validation results

700

val validationSummary = Map(

701

"source_path" -> "/path/to/potentially/malformed/csv",

702

"processing_time" -> java.time.Instant.now().toString,

703

"total_input_records" -> robustCsvDF.count(),

704

"valid_output_records" -> cleanedDF.count(),

705

"data_quality_score" -> (cleanedDF.count().toDouble / robustCsvDF.count() * 100)

706

)

707

708

// Save validation report

709

spark.createDataFrame(Seq(validationSummary)).write

710

.format("json")

711

.mode(SaveMode.Append)

712

.save("/path/to/validation/reports")

713

```