class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
// Format specification
def format(source: String): DataFrameReader
// Schema operations
def schema(schema: StructType): DataFrameReader
def schema(schemaString: String): DataFrameReader
// Options configuration
def option(key: String, value: String): DataFrameReader
def option(key: String, value: Boolean): DataFrameReader
def option(key: String, value: Long): DataFrameReader
def option(key: String, value: Double): DataFrameReader
def options(options: scala.collection.Map[String, String]): DataFrameReader
def options(options: java.util.Map[String, String]): DataFrameReader
// Load operations
def load(): DataFrame
def load(path: String): DataFrame
def load(paths: String*): DataFrame
// Format-specific loaders
def json(path: String): DataFrame
def json(paths: String*): DataFrame
def json(jsonDataset: Dataset[String]): DataFrame
def csv(path: String): DataFrame
def csv(paths: String*): DataFrame
def parquet(path: String): DataFrame
def parquet(paths: String*): DataFrame
def orc(path: String): DataFrame
def orc(paths: String*): DataFrame
def text(path: String): DataFrame
def text(paths: String*): DataFrame
def textFile(path: String): Dataset[String]
def textFile(paths: String*): Dataset[String]
// Database and external sources
def jdbc(url: String, table: String, properties: Properties): DataFrame
def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame
def jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame
// Table operations
def table(tableName: String): DataFrame
}class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// Format specification
def format(source: String): DataFrameWriter[T]
// Save mode configuration
def mode(saveMode: SaveMode): DataFrameWriter[T]
def mode(saveMode: String): DataFrameWriter[T]
// Options configuration
def option(key: String, value: String): DataFrameWriter[T]
def option(key: String, value: Boolean): DataFrameWriter[T]
def option(key: String, value: Long): DataFrameWriter[T]
def option(key: String, value: Double): DataFrameWriter[T]
def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
def options(options: java.util.Map[String, String]): DataFrameWriter[T]
// Partitioning and bucketing
def partitionBy(colNames: String*): DataFrameWriter[T]
def partitionBy(cols: Seq[String]): DataFrameWriter[T]
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
// Save operations
def save(): Unit
def save(path: String): Unit
// Format-specific writers
def json(path: String): Unit
def csv(path: String): Unit
def parquet(path: String): Unit
def orc(path: String): Unit
def text(path: String): Unit
// Database operations
def jdbc(url: String, table: String, connectionProperties: Properties): Unit
// Table operations
def saveAsTable(tableName: String): Unit
def insertInto(tableName: String): Unit
}sealed abstract class SaveMode
object SaveMode {
case object Append extends SaveMode
case object Overwrite extends SaveMode
case object ErrorIfExists extends SaveMode
case object Ignore extends SaveMode
def valueOf(modeName: String): SaveMode
}class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) {
// Write operations
def append(): Unit
def overwrite(): Unit
def overwritePartitions(): Unit
// Conditional operations
def createOrReplace(): Unit
def create(): Unit
def replace(): Unit
// Partitioning
def partitionedBy(column: Column, columns: Column*): DataFrameWriterV2[T]
def partitionedBy(transform: String, transforms: String*): DataFrameWriterV2[T]
// Options
def option(key: String, value: String): DataFrameWriterV2[T]
def option(key: String, value: Boolean): DataFrameWriterV2[T]
def option(key: String, value: Long): DataFrameWriterV2[T]
def option(key: String, value: Double): DataFrameWriterV2[T]
def options(options: Map[String, String]): DataFrameWriterV2[T]
def options(options: java.util.Map[String, String]): DataFrameWriterV2[T]
// Table properties
def tableProperty(property: String, value: String): DataFrameWriterV2[T]
// Using clause for advanced operations
def using(provider: String): DataFrameWriterV2[T]
}// JSON Reader Options
class JSONOptions(
val samplingRatio: Double = 1.0,
val primitivesAsString: Boolean = false,
val prefersDecimal: Boolean = false,
val allowComments: Boolean = false,
val allowUnquotedFieldNames: Boolean = false,
val allowSingleQuotes: Boolean = true,
val allowNumericLeadingZeros: Boolean = false,
val allowBackslashEscapingAnyCharacter: Boolean = false,
val allowUnquotedControlChars: Boolean = false,
val mode: ParseMode = ParseMode.FAILFAST,
val columnNameOfCorruptRecord: String = "_corrupt_record",
val dateFormat: String = "yyyy-MM-dd",
val timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]",
val multiLine: Boolean = false,
val lineSep: String = null,
val encoding: String = null,
val locale: Locale = Locale.US,
val pathGlobFilter: String = null,
val recursiveFileLookup: Boolean = false,
val modifiedBefore: String = null,
val modifiedAfter: String = null
)// CSV Reader Options
class CSVOptions(
val delimiter: Char = ',',
val quote: Char = '"',
val escape: Char = '\\',
val charToEscapeQuoteEscaping: Char = '\u0000',
val comment: Char = '\u0000',
val header: Boolean = false,
val inferSchema: Boolean = false,
val ignoreLeadingWhiteSpace: Boolean = false,
val ignoreTrailingWhiteSpace: Boolean = false,
val nullValue: String = "",
val emptyValue: String = "",
val nanValue: String = "NaN",
val positiveInf: String = "Inf",
val negativeInf: String = "-Inf",
val dateFormat: String = "yyyy-MM-dd",
val timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]",
val maxColumns: Int = 20480,
val maxCharsPerColumn: Int = -1,
val mode: ParseMode = ParseMode.FAILFAST,
val columnNameOfCorruptRecord: String = "_corrupt_record",
val multiLine: Boolean = false,
val locale: Locale = Locale.US,
val lineSep: String = null,
val pathGlobFilter: String = null,
val recursiveFileLookup: Boolean = false,
val modifiedBefore: String = null,
val modifiedAfter: String = null,
val unescapedQuoteHandling: UnescapedQuoteHandling.Value = UnescapedQuoteHandling.STOP_AT_DELIMITER
)// Parquet Options
class ParquetOptions(
val compression: String = "snappy", // none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd
val mergeSchema: Boolean = false,
val pathGlobFilter: String = null,
val recursiveFileLookup: Boolean = false,
val modifiedBefore: String = null,
val modifiedAfter: String = null,
val datetimeRebaseMode: String = "EXCEPTION", // EXCEPTION, CORRECTED, LEGACY
val int96RebaseMode: String = "EXCEPTION"
)// JDBC Options
class JDBCOptions(
val url: String,
val table: String,
val driver: String = null,
val partitionColumn: String = null,
val lowerBound: String = null,
val upperBound: String = null,
val numPartitions: String = null,
val queryTimeout: String = "0",
val fetchsize: String = "0",
val batchsize: String = "1000",
val isolationLevel: String = "READ_UNCOMMITTED",
val sessionInitStatement: String = null,
val truncate: Boolean = false,
val cascadeTruncate: Boolean = false,
val createTableOptions: String = "",
val createTableColumnTypes: String = null,
val customSchema: String = null,
val pushDownPredicate: Boolean = true,
val pushDownAggregate: Boolean = false,
val pushDownLimit: Boolean = false,
val pushDownTableSample: Boolean = false,
val keytab: String = null,
val principal: String = null,
val refreshKrb5Config: Boolean = false,
val connectionProvider: String = null
)import org.apache.spark.sql.{SparkSession, SaveMode}
import org.apache.spark.sql.types._
val spark = SparkSession.builder()
.appName("Data Sources Demo")
.getOrCreate()
// Reading JSON files
val jsonDF = spark.read
.format("json")
.option("multiLine", "true")
.option("mode", "PERMISSIVE")
.load("/path/to/json/files/*.json")
// Alternative JSON reading with schema
val jsonSchema = StructType(Array(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("email", StringType, nullable = true)
))
val typedJsonDF = spark.read
.schema(jsonSchema)
.json("/path/to/json/files")
// Reading CSV files
val csvDF = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ",")
.option("quote", "\"")
.option("escape", "\\")
.option("nullValue", "NULL")
.option("dateFormat", "yyyy-MM-dd")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.load("/path/to/csv/files")
// Reading Parquet files
val parquetDF = spark.read
.format("parquet")
.option("mergeSchema", "true")
.load("/path/to/parquet/files")
// Reading ORC files
val orcDF = spark.read
.format("orc")
.load("/path/to/orc/files")
// Reading text files
val textDF = spark.read
.text("/path/to/text/files")
val textFileDS = spark.read
.textFile("/path/to/text/files")// Sample DataFrame for writing examples
val sampleData = Seq(
(1, "Alice", 25, "alice@example.com", "2023-01-15"),
(2, "Bob", 30, "bob@example.com", "2023-02-20"),
(3, "Charlie", 35, "charlie@example.com", "2023-03-10")
).toDF("id", "name", "age", "email", "join_date")
// Writing to JSON
sampleData.write
.format("json")
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.save("/path/to/output/json")
// Writing to CSV
sampleData.write
.format("csv")
.mode(SaveMode.Overwrite)
.option("header", "true")
.option("delimiter", ",")
.option("quote", "\"")
.option("escape", "\\")
.option("compression", "gzip")
.save("/path/to/output/csv")
// Writing to Parquet with partitioning
sampleData.write
.format("parquet")
.mode(SaveMode.Overwrite)
.option("compression", "snappy")
.partitionBy("join_date")
.save("/path/to/output/parquet")
// Writing to ORC
sampleData.write
.format("orc")
.mode(SaveMode.Overwrite)
.option("compression", "zlib")
.save("/path/to/output/orc")
// Writing to text (single column)
sampleData.select("name").write
.mode(SaveMode.Overwrite)
.text("/path/to/output/text")// Dynamic partitioning
val salesData = Seq(
(1, "Product A", 100.0, "2023-01-15", "Electronics", "US"),
(2, "Product B", 150.0, "2023-01-16", "Clothing", "US"),
(3, "Product C", 200.0, "2023-01-15", "Electronics", "UK"),
(4, "Product D", 120.0, "2023-01-17", "Books", "CA")
).toDF("id", "product_name", "price", "sale_date", "category", "country")
// Multi-level partitioning
salesData.write
.format("parquet")
.mode(SaveMode.Overwrite)
.partitionBy("country", "category")
.option("compression", "snappy")
.save("/path/to/partitioned/sales")
// Bucketing for better join performance
salesData.write
.format("parquet")
.mode(SaveMode.Overwrite)
.bucketBy(10, "id") // 10 buckets based on id column
.sortBy("price") // Sort within buckets
.option("path", "/path/to/bucketed/sales")
.saveAsTable("bucketed_sales")
// Custom partitioning with transformation
import org.apache.spark.sql.functions._
val partitionedData = salesData.withColumn("year", year(to_date($"sale_date")))
.withColumn("month", month(to_date($"sale_date")))
partitionedData.write
.format("delta") // Using Delta format for ACID properties
.mode(SaveMode.Overwrite)
.partitionBy("year", "month", "country")
.save("/path/to/delta/sales")import java.util.Properties
// JDBC connection properties
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
connectionProperties.put("driver", "org.postgresql.Driver")
// Reading from database
val jdbcDF = spark.read
.jdbc(
url = "jdbc:postgresql://localhost:5432/mydb",
table = "employees",
properties = connectionProperties
)
// Reading with SQL query
val queryDF = spark.read
.jdbc(
url = "jdbc:postgresql://localhost:5432/mydb",
table = "(SELECT * FROM employees WHERE department = 'Engineering') AS emp",
properties = connectionProperties
)
// Reading with partitioning for large tables
val partitionedJdbcDF = spark.read
.jdbc(
url = "jdbc:postgresql://localhost:5432/mydb",
table = "large_table",
columnName = "id", // Partition column
lowerBound = 1,
upperBound = 1000000,
numPartitions = 10,
connectionProperties = connectionProperties
)
// Reading with custom predicates
val predicates = Array(
"department = 'Engineering'",
"department = 'Sales'",
"department = 'Marketing'"
)
val predicateDF = spark.read
.jdbc(
url = "jdbc:postgresql://localhost:5432/mydb",
table = "employees",
predicates = predicates,
connectionProperties = connectionProperties
)
// Writing to database
sampleData.write
.mode(SaveMode.Append)
.jdbc(
url = "jdbc:postgresql://localhost:5432/mydb",
table = "new_employees",
connectionProperties = connectionProperties
)
// Writing with custom options
sampleData.write
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/mydb")
.option("dbtable", "employees_backup")
.option("user", "username")
.option("password", "password")
.option("driver", "org.postgresql.Driver")
.option("batchsize", "10000")
.option("isolationLevel", "READ_COMMITTED")
.mode(SaveMode.Overwrite)
.save()// Schema inference with sampling
val inferredDF = spark.read
.format("json")
.option("samplingRatio", "0.1") // Sample 10% for schema inference
.option("prefersDecimal", "true")
.load("/path/to/large/json/files")
// Explicit schema definition
val explicitSchema = StructType(Array(
StructField("user_id", StringType, nullable = false),
StructField("event_type", StringType, nullable = false),
StructField("timestamp", TimestampType, nullable = false),
StructField("properties", MapType(StringType, StringType), nullable = true),
StructField("user_properties", StructType(Array(
StructField("age", IntegerType, nullable = true),
StructField("country", StringType, nullable = true),
StructField("premium", BooleanType, nullable = false)
)), nullable = true)
))
val typedDF = spark.read
.schema(explicitSchema)
.format("json")
.load("/path/to/json/events")
// Schema evolution with Parquet
val evolvedParquetDF = spark.read
.format("parquet")
.option("mergeSchema", "true") // Merge schemas from different files
.load("/path/to/evolved/parquet/files")
// Handle corrupt records
val corruptHandlingDF = spark.read
.format("json")
.option("mode", "PERMISSIVE") // PERMISSIVE, DROPMALFORMED, FAILFAST
.option("columnNameOfCorruptRecord", "_corrupt_record")
.schema(explicitSchema.add("_corrupt_record", StringType))
.load("/path/to/potentially/corrupt/json")
// Schema compatibility checking
def validateSchema(df: DataFrame, expectedSchema: StructType): Boolean = {
val actualSchema = df.schema
expectedSchema.fields.forall { expectedField =>
actualSchema.fields.exists { actualField =>
actualField.name == expectedField.name &&
actualField.dataType == expectedField.dataType
}
}
}
val isCompatible = validateSchema(typedDF, explicitSchema)
println(s"Schema compatible: $isCompatible")// Reading compressed files
val gzipJsonDF = spark.read
.format("json")
.option("compression", "gzip")
.load("/path/to/compressed/*.json.gz")
val bzip2CsvDF = spark.read
.format("csv")
.option("header", "true")
.option("compression", "bzip2")
.load("/path/to/compressed/*.csv.bz2")
// Writing with different compression algorithms
sampleData.write
.format("parquet")
.option("compression", "snappy") // snappy, gzip, lzo, brotli, lz4, zstd
.mode(SaveMode.Overwrite)
.save("/path/to/compressed/parquet")
sampleData.write
.format("json")
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.save("/path/to/compressed/json")
sampleData.write
.format("csv")
.option("header", "true")
.option("compression", "bzip2")
.mode(SaveMode.Overwrite)
.save("/path/to/compressed/csv")
// Optimal compression settings for different use cases
// For archival (maximize compression)
sampleData.write
.format("parquet")
.option("compression", "gzip")
.option("parquet.block.size", "268435456") // 256MB blocks
.save("/path/to/archive")
// For query performance (balance compression and speed)
sampleData.write
.format("parquet")
.option("compression", "snappy")
.option("parquet.page.size", "1048576") // 1MB pages
.save("/path/to/queryable")// File globbing and filtering
val filteredDF = spark.read
.format("json")
.option("pathGlobFilter", "*.json") // Only JSON files
.option("recursiveFileLookup", "true") // Recursive directory traversal
.option("modifiedAfter", "2023-01-01T00:00:00") // Files modified after date
.option("modifiedBefore", "2023-12-31T23:59:59") // Files modified before date
.load("/path/to/data")
// Multi-path loading
val multiPathDF = spark.read
.format("parquet")
.load(
"/path/to/data/2023/01/*",
"/path/to/data/2023/02/*",
"/path/to/data/2023/03/*"
)
// Incremental data loading
import org.apache.spark.sql.functions._
def loadIncrementalData(basePath: String, lastProcessedTime: String): DataFrame = {
spark.read
.format("parquet")
.option("modifiedAfter", lastProcessedTime)
.load(basePath)
.filter($"event_time" > lit(lastProcessedTime))
}
val incrementalDF = loadIncrementalData("/path/to/events", "2023-12-01T00:00:00")
// Data source V2 operations
val v2Writer = sampleData.writeTo("catalog.db.table")
.option("write.format.default", "parquet")
.option("write.parquet.compression-codec", "snappy")
// Conditional writes
v2Writer.createOrReplace() // Create table or replace if exists
// v2Writer.create() // Create table, fail if exists
// v2Writer.append() // Append to existing table
// v2Writer.overwrite() // Overwrite entire table
// v2Writer.overwritePartitions() // Overwrite specific partitionsimport org.apache.spark.sql.functions._
// Robust CSV reading with error handling
val robustCsvDF = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE") // Continue processing despite errors
.option("columnNameOfCorruptRecord", "_corrupt_record")
.option("maxMalformedLogPerPartition", "10") // Log up to 10 malformed records per partition
.schema(
StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("_corrupt_record", StringType, nullable = true)
))
)
.load("/path/to/potentially/malformed/csv")
// Analyze data quality
val qualityReport = robustCsvDF.agg(
count("*").as("total_records"),
sum(when($"_corrupt_record".isNull, 1).otherwise(0)).as("valid_records"),
sum(when($"_corrupt_record".isNotNull, 1).otherwise(0)).as("corrupt_records"),
sum(when($"id".isNull, 1).otherwise(0)).as("missing_ids"),
sum(when($"name".isNull || $"name" === "", 1).otherwise(0)).as("missing_names")
)
qualityReport.show()
// Data validation and cleansing
val cleanedDF = robustCsvDF
.filter($"_corrupt_record".isNull) // Remove corrupt records
.filter($"id".isNotNull && $"id" > 0) // Valid IDs only
.filter($"name".isNotNull && length(trim($"name")) > 0) // Non-empty names
.filter($"age".isNull || ($"age" >= 0 && $"age" <= 150)) // Reasonable ages
.drop("_corrupt_record")
// Write validation results
val validationSummary = Map(
"source_path" -> "/path/to/potentially/malformed/csv",
"processing_time" -> java.time.Instant.now().toString,
"total_input_records" -> robustCsvDF.count(),
"valid_output_records" -> cleanedDF.count(),
"data_quality_score" -> (cleanedDF.count().toDouble / robustCsvDF.count() * 100)
)
// Save validation report
spark.createDataFrame(Seq(validationSummary)).write
.format("json")
.mode(SaveMode.Append)
.save("/path/to/validation/reports")