0
# Apache Spark SQL - Data Sources and I/O Operations
1
2
## Capabilities
3
4
### Data Loading and Reading Operations
5
- Read data from various formats including Parquet, JSON, CSV, ORC, Avro, Delta, and text files
6
- Connect to external systems like JDBC databases, Kafka, Cassandra, and cloud storage systems
7
- Support for schema inference with configurable options and explicit schema specification
8
- Handle compressed files and partitioned datasets with optimized reading strategies
9
10
### Data Writing and Persistence Operations
11
- Write DataFrames to multiple output formats with configurable compression and partitioning
12
- Support for different save modes including append, overwrite, error-if-exists, and ignore
13
- Enable atomic writes and transactional operations for supported formats
14
- Handle large-scale data exports with optimized parallel writing and bucketing strategies
15
16
### Schema Management and Evolution
17
- Infer schemas automatically from data sources with type promotion and null handling
18
- Support schema evolution and merging for compatible schema changes
19
- Validate schemas during read and write operations with comprehensive error reporting
20
- Handle schema mismatches with configurable behavior for missing or extra columns
21
22
### Advanced I/O Configuration and Optimization
23
- Configure read and write operations with format-specific options for performance tuning
24
- Support for predicate pushdown and column pruning for optimized query execution
25
- Handle partitioning strategies including static and dynamic partitioning schemes
26
- Enable compression algorithms and encoding schemes for storage optimization
27
28
## API Reference
29
30
### DataFrameReader Class
31
```scala { .api }
32
class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
33
// Format specification
34
def format(source: String): DataFrameReader
35
36
// Schema operations
37
def schema(schema: StructType): DataFrameReader
38
def schema(schemaString: String): DataFrameReader
39
40
// Options configuration
41
def option(key: String, value: String): DataFrameReader
42
def option(key: String, value: Boolean): DataFrameReader
43
def option(key: String, value: Long): DataFrameReader
44
def option(key: String, value: Double): DataFrameReader
45
def options(options: scala.collection.Map[String, String]): DataFrameReader
46
def options(options: java.util.Map[String, String]): DataFrameReader
47
48
// Load operations
49
def load(): DataFrame
50
def load(path: String): DataFrame
51
def load(paths: String*): DataFrame
52
53
// Format-specific loaders
54
def json(path: String): DataFrame
55
def json(paths: String*): DataFrame
56
def json(jsonDataset: Dataset[String]): DataFrame
57
def csv(path: String): DataFrame
58
def csv(paths: String*): DataFrame
59
def parquet(path: String): DataFrame
60
def parquet(paths: String*): DataFrame
61
def orc(path: String): DataFrame
62
def orc(paths: String*): DataFrame
63
def text(path: String): DataFrame
64
def text(paths: String*): DataFrame
65
def textFile(path: String): Dataset[String]
66
def textFile(paths: String*): Dataset[String]
67
68
// Database and external sources
69
def jdbc(url: String, table: String, properties: Properties): DataFrame
70
def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame
71
def jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame
72
73
// Table operations
74
def table(tableName: String): DataFrame
75
}
76
```
77
78
### DataFrameWriter[T] Class
79
```scala { .api }
80
class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
81
// Format specification
82
def format(source: String): DataFrameWriter[T]
83
84
// Save mode configuration
85
def mode(saveMode: SaveMode): DataFrameWriter[T]
86
def mode(saveMode: String): DataFrameWriter[T]
87
88
// Options configuration
89
def option(key: String, value: String): DataFrameWriter[T]
90
def option(key: String, value: Boolean): DataFrameWriter[T]
91
def option(key: String, value: Long): DataFrameWriter[T]
92
def option(key: String, value: Double): DataFrameWriter[T]
93
def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
94
def options(options: java.util.Map[String, String]): DataFrameWriter[T]
95
96
// Partitioning and bucketing
97
def partitionBy(colNames: String*): DataFrameWriter[T]
98
def partitionBy(cols: Seq[String]): DataFrameWriter[T]
99
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
100
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
101
102
// Save operations
103
def save(): Unit
104
def save(path: String): Unit
105
106
// Format-specific writers
107
def json(path: String): Unit
108
def csv(path: String): Unit
109
def parquet(path: String): Unit
110
def orc(path: String): Unit
111
def text(path: String): Unit
112
113
// Database operations
114
def jdbc(url: String, table: String, connectionProperties: Properties): Unit
115
116
// Table operations
117
def saveAsTable(tableName: String): Unit
118
def insertInto(tableName: String): Unit
119
}
120
```
121
122
### Save Modes
123
```scala { .api }
124
sealed abstract class SaveMode
125
126
object SaveMode {
127
case object Append extends SaveMode
128
case object Overwrite extends SaveMode
129
case object ErrorIfExists extends SaveMode
130
case object Ignore extends SaveMode
131
132
def valueOf(modeName: String): SaveMode
133
}
134
```
135
136
### DataFrameWriterV2[T] Class
137
```scala { .api }
138
class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) {
139
// Write operations
140
def append(): Unit
141
def overwrite(): Unit
142
def overwritePartitions(): Unit
143
144
// Conditional operations
145
def createOrReplace(): Unit
146
def create(): Unit
147
def replace(): Unit
148
149
// Partitioning
150
def partitionedBy(column: Column, columns: Column*): DataFrameWriterV2[T]
151
def partitionedBy(transform: String, transforms: String*): DataFrameWriterV2[T]
152
153
// Options
154
def option(key: String, value: String): DataFrameWriterV2[T]
155
def option(key: String, value: Boolean): DataFrameWriterV2[T]
156
def option(key: String, value: Long): DataFrameWriterV2[T]
157
def option(key: String, value: Double): DataFrameWriterV2[T]
158
def options(options: Map[String, String]): DataFrameWriterV2[T]
159
def options(options: java.util.Map[String, String]): DataFrameWriterV2[T]
160
161
// Table properties
162
def tableProperty(property: String, value: String): DataFrameWriterV2[T]
163
164
// Using clause for advanced operations
165
def using(provider: String): DataFrameWriterV2[T]
166
}
167
```
168
169
### Format-Specific Options
170
171
#### JSON Options
172
```scala { .api }
173
// JSON Reader Options
174
class JSONOptions(
175
val samplingRatio: Double = 1.0,
176
val primitivesAsString: Boolean = false,
177
val prefersDecimal: Boolean = false,
178
val allowComments: Boolean = false,
179
val allowUnquotedFieldNames: Boolean = false,
180
val allowSingleQuotes: Boolean = true,
181
val allowNumericLeadingZeros: Boolean = false,
182
val allowBackslashEscapingAnyCharacter: Boolean = false,
183
val allowUnquotedControlChars: Boolean = false,
184
val mode: ParseMode = ParseMode.FAILFAST,
185
val columnNameOfCorruptRecord: String = "_corrupt_record",
186
val dateFormat: String = "yyyy-MM-dd",
187
val timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]",
188
val multiLine: Boolean = false,
189
val lineSep: String = null,
190
val encoding: String = null,
191
val locale: Locale = Locale.US,
192
val pathGlobFilter: String = null,
193
val recursiveFileLookup: Boolean = false,
194
val modifiedBefore: String = null,
195
val modifiedAfter: String = null
196
)
197
```
198
199
#### CSV Options
200
```scala { .api }
201
// CSV Reader Options
202
class CSVOptions(
203
val delimiter: Char = ',',
204
val quote: Char = '"',
205
val escape: Char = '\\',
206
val charToEscapeQuoteEscaping: Char = '\u0000',
207
val comment: Char = '\u0000',
208
val header: Boolean = false,
209
val inferSchema: Boolean = false,
210
val ignoreLeadingWhiteSpace: Boolean = false,
211
val ignoreTrailingWhiteSpace: Boolean = false,
212
val nullValue: String = "",
213
val emptyValue: String = "",
214
val nanValue: String = "NaN",
215
val positiveInf: String = "Inf",
216
val negativeInf: String = "-Inf",
217
val dateFormat: String = "yyyy-MM-dd",
218
val timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]",
219
val maxColumns: Int = 20480,
220
val maxCharsPerColumn: Int = -1,
221
val mode: ParseMode = ParseMode.FAILFAST,
222
val columnNameOfCorruptRecord: String = "_corrupt_record",
223
val multiLine: Boolean = false,
224
val locale: Locale = Locale.US,
225
val lineSep: String = null,
226
val pathGlobFilter: String = null,
227
val recursiveFileLookup: Boolean = false,
228
val modifiedBefore: String = null,
229
val modifiedAfter: String = null,
230
val unescapedQuoteHandling: UnescapedQuoteHandling.Value = UnescapedQuoteHandling.STOP_AT_DELIMITER
231
)
232
```
233
234
#### Parquet Options
235
```scala { .api }
236
// Parquet Options
237
class ParquetOptions(
238
val compression: String = "snappy", // none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd
239
val mergeSchema: Boolean = false,
240
val pathGlobFilter: String = null,
241
val recursiveFileLookup: Boolean = false,
242
val modifiedBefore: String = null,
243
val modifiedAfter: String = null,
244
val datetimeRebaseMode: String = "EXCEPTION", // EXCEPTION, CORRECTED, LEGACY
245
val int96RebaseMode: String = "EXCEPTION"
246
)
247
```
248
249
#### JDBC Options
250
```scala { .api }
251
// JDBC Options
252
class JDBCOptions(
253
val url: String,
254
val table: String,
255
val driver: String = null,
256
val partitionColumn: String = null,
257
val lowerBound: String = null,
258
val upperBound: String = null,
259
val numPartitions: String = null,
260
val queryTimeout: String = "0",
261
val fetchsize: String = "0",
262
val batchsize: String = "1000",
263
val isolationLevel: String = "READ_UNCOMMITTED",
264
val sessionInitStatement: String = null,
265
val truncate: Boolean = false,
266
val cascadeTruncate: Boolean = false,
267
val createTableOptions: String = "",
268
val createTableColumnTypes: String = null,
269
val customSchema: String = null,
270
val pushDownPredicate: Boolean = true,
271
val pushDownAggregate: Boolean = false,
272
val pushDownLimit: Boolean = false,
273
val pushDownTableSample: Boolean = false,
274
val keytab: String = null,
275
val principal: String = null,
276
val refreshKrb5Config: Boolean = false,
277
val connectionProvider: String = null
278
)
279
```
280
281
## Usage Examples
282
283
### Basic File I/O Operations
284
```scala
285
import org.apache.spark.sql.{SparkSession, SaveMode}
286
import org.apache.spark.sql.types._
287
288
val spark = SparkSession.builder()
289
.appName("Data Sources Demo")
290
.getOrCreate()
291
292
// Reading JSON files
293
val jsonDF = spark.read
294
.format("json")
295
.option("multiLine", "true")
296
.option("mode", "PERMISSIVE")
297
.load("/path/to/json/files/*.json")
298
299
// Alternative JSON reading with schema
300
val jsonSchema = StructType(Array(
301
StructField("id", IntegerType, nullable = false),
302
StructField("name", StringType, nullable = true),
303
StructField("age", IntegerType, nullable = true),
304
StructField("email", StringType, nullable = true)
305
))
306
307
val typedJsonDF = spark.read
308
.schema(jsonSchema)
309
.json("/path/to/json/files")
310
311
// Reading CSV files
312
val csvDF = spark.read
313
.format("csv")
314
.option("header", "true")
315
.option("inferSchema", "true")
316
.option("delimiter", ",")
317
.option("quote", "\"")
318
.option("escape", "\\")
319
.option("nullValue", "NULL")
320
.option("dateFormat", "yyyy-MM-dd")
321
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
322
.load("/path/to/csv/files")
323
324
// Reading Parquet files
325
val parquetDF = spark.read
326
.format("parquet")
327
.option("mergeSchema", "true")
328
.load("/path/to/parquet/files")
329
330
// Reading ORC files
331
val orcDF = spark.read
332
.format("orc")
333
.load("/path/to/orc/files")
334
335
// Reading text files
336
val textDF = spark.read
337
.text("/path/to/text/files")
338
339
val textFileDS = spark.read
340
.textFile("/path/to/text/files")
341
```
342
343
### Writing Data to Different Formats
344
```scala
345
// Sample DataFrame for writing examples
346
val sampleData = Seq(
347
(1, "Alice", 25, "alice@example.com", "2023-01-15"),
348
(2, "Bob", 30, "bob@example.com", "2023-02-20"),
349
(3, "Charlie", 35, "charlie@example.com", "2023-03-10")
350
).toDF("id", "name", "age", "email", "join_date")
351
352
// Writing to JSON
353
sampleData.write
354
.format("json")
355
.mode(SaveMode.Overwrite)
356
.option("compression", "gzip")
357
.save("/path/to/output/json")
358
359
// Writing to CSV
360
sampleData.write
361
.format("csv")
362
.mode(SaveMode.Overwrite)
363
.option("header", "true")
364
.option("delimiter", ",")
365
.option("quote", "\"")
366
.option("escape", "\\")
367
.option("compression", "gzip")
368
.save("/path/to/output/csv")
369
370
// Writing to Parquet with partitioning
371
sampleData.write
372
.format("parquet")
373
.mode(SaveMode.Overwrite)
374
.option("compression", "snappy")
375
.partitionBy("join_date")
376
.save("/path/to/output/parquet")
377
378
// Writing to ORC
379
sampleData.write
380
.format("orc")
381
.mode(SaveMode.Overwrite)
382
.option("compression", "zlib")
383
.save("/path/to/output/orc")
384
385
// Writing to text (single column)
386
sampleData.select("name").write
387
.mode(SaveMode.Overwrite)
388
.text("/path/to/output/text")
389
```
390
391
### Advanced Partitioning and Bucketing
392
```scala
393
// Dynamic partitioning
394
val salesData = Seq(
395
(1, "Product A", 100.0, "2023-01-15", "Electronics", "US"),
396
(2, "Product B", 150.0, "2023-01-16", "Clothing", "US"),
397
(3, "Product C", 200.0, "2023-01-15", "Electronics", "UK"),
398
(4, "Product D", 120.0, "2023-01-17", "Books", "CA")
399
).toDF("id", "product_name", "price", "sale_date", "category", "country")
400
401
// Multi-level partitioning
402
salesData.write
403
.format("parquet")
404
.mode(SaveMode.Overwrite)
405
.partitionBy("country", "category")
406
.option("compression", "snappy")
407
.save("/path/to/partitioned/sales")
408
409
// Bucketing for better join performance
410
salesData.write
411
.format("parquet")
412
.mode(SaveMode.Overwrite)
413
.bucketBy(10, "id") // 10 buckets based on id column
414
.sortBy("price") // Sort within buckets
415
.option("path", "/path/to/bucketed/sales")
416
.saveAsTable("bucketed_sales")
417
418
// Custom partitioning with transformation
419
import org.apache.spark.sql.functions._
420
421
val partitionedData = salesData.withColumn("year", year(to_date($"sale_date")))
422
.withColumn("month", month(to_date($"sale_date")))
423
424
partitionedData.write
425
.format("delta") // Using Delta format for ACID properties
426
.mode(SaveMode.Overwrite)
427
.partitionBy("year", "month", "country")
428
.save("/path/to/delta/sales")
429
```
430
431
### Database Connectivity (JDBC)
432
```scala
433
import java.util.Properties
434
435
// JDBC connection properties
436
val connectionProperties = new Properties()
437
connectionProperties.put("user", "username")
438
connectionProperties.put("password", "password")
439
connectionProperties.put("driver", "org.postgresql.Driver")
440
441
// Reading from database
442
val jdbcDF = spark.read
443
.jdbc(
444
url = "jdbc:postgresql://localhost:5432/mydb",
445
table = "employees",
446
properties = connectionProperties
447
)
448
449
// Reading with SQL query
450
val queryDF = spark.read
451
.jdbc(
452
url = "jdbc:postgresql://localhost:5432/mydb",
453
table = "(SELECT * FROM employees WHERE department = 'Engineering') AS emp",
454
properties = connectionProperties
455
)
456
457
// Reading with partitioning for large tables
458
val partitionedJdbcDF = spark.read
459
.jdbc(
460
url = "jdbc:postgresql://localhost:5432/mydb",
461
table = "large_table",
462
columnName = "id", // Partition column
463
lowerBound = 1,
464
upperBound = 1000000,
465
numPartitions = 10,
466
connectionProperties = connectionProperties
467
)
468
469
// Reading with custom predicates
470
val predicates = Array(
471
"department = 'Engineering'",
472
"department = 'Sales'",
473
"department = 'Marketing'"
474
)
475
476
val predicateDF = spark.read
477
.jdbc(
478
url = "jdbc:postgresql://localhost:5432/mydb",
479
table = "employees",
480
predicates = predicates,
481
connectionProperties = connectionProperties
482
)
483
484
// Writing to database
485
sampleData.write
486
.mode(SaveMode.Append)
487
.jdbc(
488
url = "jdbc:postgresql://localhost:5432/mydb",
489
table = "new_employees",
490
connectionProperties = connectionProperties
491
)
492
493
// Writing with custom options
494
sampleData.write
495
.format("jdbc")
496
.option("url", "jdbc:postgresql://localhost:5432/mydb")
497
.option("dbtable", "employees_backup")
498
.option("user", "username")
499
.option("password", "password")
500
.option("driver", "org.postgresql.Driver")
501
.option("batchsize", "10000")
502
.option("isolationLevel", "READ_COMMITTED")
503
.mode(SaveMode.Overwrite)
504
.save()
505
```
506
507
### Schema Handling and Evolution
508
```scala
509
// Schema inference with sampling
510
val inferredDF = spark.read
511
.format("json")
512
.option("samplingRatio", "0.1") // Sample 10% for schema inference
513
.option("prefersDecimal", "true")
514
.load("/path/to/large/json/files")
515
516
// Explicit schema definition
517
val explicitSchema = StructType(Array(
518
StructField("user_id", StringType, nullable = false),
519
StructField("event_type", StringType, nullable = false),
520
StructField("timestamp", TimestampType, nullable = false),
521
StructField("properties", MapType(StringType, StringType), nullable = true),
522
StructField("user_properties", StructType(Array(
523
StructField("age", IntegerType, nullable = true),
524
StructField("country", StringType, nullable = true),
525
StructField("premium", BooleanType, nullable = false)
526
)), nullable = true)
527
))
528
529
val typedDF = spark.read
530
.schema(explicitSchema)
531
.format("json")
532
.load("/path/to/json/events")
533
534
// Schema evolution with Parquet
535
val evolvedParquetDF = spark.read
536
.format("parquet")
537
.option("mergeSchema", "true") // Merge schemas from different files
538
.load("/path/to/evolved/parquet/files")
539
540
// Handle corrupt records
541
val corruptHandlingDF = spark.read
542
.format("json")
543
.option("mode", "PERMISSIVE") // PERMISSIVE, DROPMALFORMED, FAILFAST
544
.option("columnNameOfCorruptRecord", "_corrupt_record")
545
.schema(explicitSchema.add("_corrupt_record", StringType))
546
.load("/path/to/potentially/corrupt/json")
547
548
// Schema compatibility checking
549
def validateSchema(df: DataFrame, expectedSchema: StructType): Boolean = {
550
val actualSchema = df.schema
551
expectedSchema.fields.forall { expectedField =>
552
actualSchema.fields.exists { actualField =>
553
actualField.name == expectedField.name &&
554
actualField.dataType == expectedField.dataType
555
}
556
}
557
}
558
559
val isCompatible = validateSchema(typedDF, explicitSchema)
560
println(s"Schema compatible: $isCompatible")
561
```
562
563
### Compressed File Handling
564
```scala
565
// Reading compressed files
566
val gzipJsonDF = spark.read
567
.format("json")
568
.option("compression", "gzip")
569
.load("/path/to/compressed/*.json.gz")
570
571
val bzip2CsvDF = spark.read
572
.format("csv")
573
.option("header", "true")
574
.option("compression", "bzip2")
575
.load("/path/to/compressed/*.csv.bz2")
576
577
// Writing with different compression algorithms
578
sampleData.write
579
.format("parquet")
580
.option("compression", "snappy") // snappy, gzip, lzo, brotli, lz4, zstd
581
.mode(SaveMode.Overwrite)
582
.save("/path/to/compressed/parquet")
583
584
sampleData.write
585
.format("json")
586
.option("compression", "gzip")
587
.mode(SaveMode.Overwrite)
588
.save("/path/to/compressed/json")
589
590
sampleData.write
591
.format("csv")
592
.option("header", "true")
593
.option("compression", "bzip2")
594
.mode(SaveMode.Overwrite)
595
.save("/path/to/compressed/csv")
596
597
// Optimal compression settings for different use cases
598
// For archival (maximize compression)
599
sampleData.write
600
.format("parquet")
601
.option("compression", "gzip")
602
.option("parquet.block.size", "268435456") // 256MB blocks
603
.save("/path/to/archive")
604
605
// For query performance (balance compression and speed)
606
sampleData.write
607
.format("parquet")
608
.option("compression", "snappy")
609
.option("parquet.page.size", "1048576") // 1MB pages
610
.save("/path/to/queryable")
611
```
612
613
### Advanced File Operations
614
```scala
615
// File globbing and filtering
616
val filteredDF = spark.read
617
.format("json")
618
.option("pathGlobFilter", "*.json") // Only JSON files
619
.option("recursiveFileLookup", "true") // Recursive directory traversal
620
.option("modifiedAfter", "2023-01-01T00:00:00") // Files modified after date
621
.option("modifiedBefore", "2023-12-31T23:59:59") // Files modified before date
622
.load("/path/to/data")
623
624
// Multi-path loading
625
val multiPathDF = spark.read
626
.format("parquet")
627
.load(
628
"/path/to/data/2023/01/*",
629
"/path/to/data/2023/02/*",
630
"/path/to/data/2023/03/*"
631
)
632
633
// Incremental data loading
634
import org.apache.spark.sql.functions._
635
636
def loadIncrementalData(basePath: String, lastProcessedTime: String): DataFrame = {
637
spark.read
638
.format("parquet")
639
.option("modifiedAfter", lastProcessedTime)
640
.load(basePath)
641
.filter($"event_time" > lit(lastProcessedTime))
642
}
643
644
val incrementalDF = loadIncrementalData("/path/to/events", "2023-12-01T00:00:00")
645
646
// Data source V2 operations
647
val v2Writer = sampleData.writeTo("catalog.db.table")
648
.option("write.format.default", "parquet")
649
.option("write.parquet.compression-codec", "snappy")
650
651
// Conditional writes
652
v2Writer.createOrReplace() // Create table or replace if exists
653
// v2Writer.create() // Create table, fail if exists
654
// v2Writer.append() // Append to existing table
655
// v2Writer.overwrite() // Overwrite entire table
656
// v2Writer.overwritePartitions() // Overwrite specific partitions
657
```
658
659
### Error Handling and Data Quality
660
```scala
661
import org.apache.spark.sql.functions._
662
663
// Robust CSV reading with error handling
664
val robustCsvDF = spark.read
665
.format("csv")
666
.option("header", "true")
667
.option("mode", "PERMISSIVE") // Continue processing despite errors
668
.option("columnNameOfCorruptRecord", "_corrupt_record")
669
.option("maxMalformedLogPerPartition", "10") // Log up to 10 malformed records per partition
670
.schema(
671
StructType(Array(
672
StructField("id", IntegerType, nullable = true),
673
StructField("name", StringType, nullable = true),
674
StructField("age", IntegerType, nullable = true),
675
StructField("_corrupt_record", StringType, nullable = true)
676
))
677
)
678
.load("/path/to/potentially/malformed/csv")
679
680
// Analyze data quality
681
val qualityReport = robustCsvDF.agg(
682
count("*").as("total_records"),
683
sum(when($"_corrupt_record".isNull, 1).otherwise(0)).as("valid_records"),
684
sum(when($"_corrupt_record".isNotNull, 1).otherwise(0)).as("corrupt_records"),
685
sum(when($"id".isNull, 1).otherwise(0)).as("missing_ids"),
686
sum(when($"name".isNull || $"name" === "", 1).otherwise(0)).as("missing_names")
687
)
688
689
qualityReport.show()
690
691
// Data validation and cleansing
692
val cleanedDF = robustCsvDF
693
.filter($"_corrupt_record".isNull) // Remove corrupt records
694
.filter($"id".isNotNull && $"id" > 0) // Valid IDs only
695
.filter($"name".isNotNull && length(trim($"name")) > 0) // Non-empty names
696
.filter($"age".isNull || ($"age" >= 0 && $"age" <= 150)) // Reasonable ages
697
.drop("_corrupt_record")
698
699
// Write validation results
700
val validationSummary = Map(
701
"source_path" -> "/path/to/potentially/malformed/csv",
702
"processing_time" -> java.time.Instant.now().toString,
703
"total_input_records" -> robustCsvDF.count(),
704
"valid_output_records" -> cleanedDF.count(),
705
"data_quality_score" -> (cleanedDF.count().toDouble / robustCsvDF.count() * 100)
706
)
707
708
// Save validation report
709
spark.createDataFrame(Seq(validationSummary)).write
710
.format("json")
711
.mode(SaveMode.Append)
712
.save("/path/to/validation/reports")
713
```