Spark SQL provides comprehensive I/O capabilities through DataFrameReader and DataFrameWriter interfaces. These support reading from and writing to various data sources including files (JSON, Parquet, CSV, ORC, text), databases (JDBC), and streaming sources.
class DataFrameReader {
def format(source: String): DataFrameReader
def schema(schema: StructType): DataFrameReader
def schema(schemaString: String): DataFrameReader
def option(key: String, value: String): DataFrameReader
def option(key: String, value: Boolean): DataFrameReader
def option(key: String, value: Long): DataFrameReader
def option(key: String, value: Double): DataFrameReader
def options(options: scala.collection.Map[String, String]): DataFrameReader
def options(options: java.util.Map[String, String]): DataFrameReader
def load(): DataFrame
def load(path: String): DataFrame
def load(paths: String*): DataFrame
}class DataFrameReader {
def json(path: String): DataFrame
def json(paths: String*): DataFrame
def json(jsonRDD: RDD[String]): DataFrame
def json(jsonDataset: Dataset[String]): DataFrame
}Usage Examples:
// Basic JSON reading
val df = spark.read.json("path/to/file.json")
val multipleFiles = spark.read.json("file1.json", "file2.json")
// With options
val jsonDF = spark.read
.option("multiline", "true")
.option("allowComments", "true")
.option("allowUnquotedFieldNames", "true")
.json("complex.json")
// With explicit schema for better performance
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("city", StringType, true)
))
val typedJson = spark.read
.schema(schema)
.json("people.json")
// From RDD or Dataset of strings
val jsonStrings = spark.sparkContext.parallelize(Seq(
"""{"name": "Alice", "age": 25}""",
"""{"name": "Bob", "age": 30}"""
))
val fromRDD = spark.read.json(jsonStrings)Common JSON Options:
multiline: Parse multi-line JSON objects (default: false)allowComments: Allow JavaScript-style comments (default: false)allowUnquotedFieldNames: Allow unquoted field names (default: false)allowSingleQuotes: Allow single quotes (default: true)primitivesAsString: Infer all primitive values as strings (default: false)class DataFrameReader {
def parquet(paths: String*): DataFrame
}Usage Examples:
// Read Parquet files
val parquetDF = spark.read.parquet("data.parquet")
val multipleParquet = spark.read.parquet("part1.parquet", "part2.parquet")
// With options
val parquetWithOptions = spark.read
.option("mergeSchema", "true")
.parquet("partitioned_data/")
// Read partitioned Parquet data
val partitioned = spark.read.parquet("data/year=2023/month=*/day=*")Common Parquet Options:
mergeSchema: Merge schemas from multiple files (default: false)recursiveFileLookup: Recursively search subdirectories (default: false)class DataFrameReader {
def csv(paths: String*): DataFrame
}Usage Examples:
// Basic CSV reading
val csvDF = spark.read.csv("data.csv")
// With options and header
val csvWithHeader = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("sep", ",")
.csv("people.csv")
// With explicit schema
val csvSchema = StructType(Array(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("salary", DoubleType, true)
))
val typedCSV = spark.read
.schema(csvSchema)
.option("header", "true")
.csv("employees.csv")Common CSV Options:
header: Whether first line is header (default: false)inferSchema: Automatically infer column types (default: false)sep: Field separator character (default: ",")quote: Quote character (default: """)escape: Escape character (default: "\")nullValue: String representing null values (default: "")dateFormat: Date format string (default: "yyyy-MM-dd")timestampFormat: Timestamp format string (default: "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")class DataFrameReader {
def orc(paths: String*): DataFrame
}Usage Examples:
// Read ORC files
val orcDF = spark.read.orc("data.orc")
// With options
val orcWithOptions = spark.read
.option("mergeSchema", "true")
.orc("orc_data/")class DataFrameReader {
def text(paths: String*): DataFrame
def textFile(paths: String*): Dataset[String]
}Usage Examples:
// Read as DataFrame with single "value" column
val textDF = spark.read.text("log.txt")
// Read as Dataset[String]
val textDS = spark.read.textFile("documents/*.txt")
// With encoding option
val encodedText = spark.read
.option("encoding", "UTF-8")
.text("data.txt")class DataFrameReader {
def jdbc(url: String, table: String, properties: Properties): DataFrame
def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame
def jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame
def jdbc(url: String, table: String, columnName: String, lowerBound: Date, upperBound: Date, numPartitions: Int, connectionProperties: Properties): DataFrame
def jdbc(url: String, table: String, columnName: String, lowerBound: Timestamp, upperBound: Timestamp, numPartitions: Int, connectionProperties: Properties): DataFrame
}Usage Examples:
import java.util.Properties
// Basic JDBC reading
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
val jdbcDF = spark.read
.jdbc("jdbc:mysql://localhost:3306/mydb", "users", connectionProperties)
// With partitioning for performance
val partitionedJDBC = spark.read
.jdbc(
url = "jdbc:postgresql://localhost:5432/mydb",
table = "large_table",
columnName = "id",
lowerBound = 1,
upperBound = 1000000,
numPartitions = 10,
connectionProperties = connectionProperties
)
// With custom query
val customQuery = """
(SELECT u.id, u.name, p.title
FROM users u JOIN profiles p ON u.id = p.user_id
WHERE u.active = true) as query
"""
val queryResult = spark.read
.jdbc("jdbc:mysql://localhost:3306/mydb", customQuery, connectionProperties)
// With predicates for parallel reading
val predicates = Array(
"age < 25",
"age >= 25 AND age < 50",
"age >= 50"
)
val parallelJDBC = spark.read
.jdbc("jdbc:mysql://localhost:3306/mydb", "users", predicates, connectionProperties)class DataFrameReader {
def format(source: String): DataFrameReader
def load(): DataFrame
def load(path: String): DataFrame
}Usage Examples:
// Delta Lake (third-party format)
val deltaDF = spark.read
.format("delta")
.load("path/to/delta/table")
// Avro files
val avroDF = spark.read
.format("avro")
.load("data.avro")
// Custom data source
val customDF = spark.read
.format("com.example.CustomDataSource")
.option("customOption", "value")
.load("path/to/data")class DataFrameWriter[T] {
def mode(saveMode: SaveMode): DataFrameWriter[T]
def mode(saveMode: String): DataFrameWriter[T]
def format(source: String): DataFrameWriter[T]
def option(key: String, value: String): DataFrameWriter[T]
def option(key: String, value: Boolean): DataFrameWriter[T]
def option(key: String, value: Long): DataFrameWriter[T]
def option(key: String, value: Double): DataFrameWriter[T]
def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
def options(options: java.util.Map[String, String]): DataFrameWriter[T]
def partitionBy(colNames: String*): DataFrameWriter[T]
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
def save(): Unit
def save(path: String): Unit
}
object SaveMode extends Enumeration {
val Overwrite, Append, ErrorIfExists, Ignore = Value
}class DataFrameWriter[T] {
def parquet(path: String): Unit
}Usage Examples:
// Basic Parquet writing
df.write.parquet("output.parquet")
// With save mode
df.write
.mode(SaveMode.Overwrite)
.parquet("data/output.parquet")
// Partitioned writing
df.write
.partitionBy("year", "month")
.parquet("partitioned_data/")
// With options
df.write
.option("compression", "snappy")
.mode("overwrite")
.parquet("compressed_output.parquet")class DataFrameWriter[T] {
def json(path: String): Unit
}Usage Examples:
// Write JSON
df.write.json("output.json")
// With formatting options
df.write
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.option("dateFormat", "yyyy-MM-dd")
.mode("overwrite")
.json("formatted_output.json")class DataFrameWriter[T] {
def csv(path: String): Unit
}Usage Examples:
// Write CSV
df.write.csv("output.csv")
// With options
df.write
.option("header", "true")
.option("sep", "|")
.mode("overwrite")
.csv("data_with_header.csv")class DataFrameWriter[T] {
def orc(path: String): Unit
}Usage Examples:
// Write ORC
df.write.orc("output.orc")
// With compression
df.write
.option("compression", "zlib")
.mode("overwrite")
.orc("compressed.orc")class DataFrameWriter[T] {
def text(path: String): Unit
}Usage Examples:
// Write text (requires single string column)
df.select("message").write.text("logs.txt")
// Combine columns first
df.select(concat_ws(",", col("name"), col("age")).alias("line"))
.write
.text("combined_output.txt")class DataFrameWriter[T] {
def saveAsTable(tableName: String): Unit
def insertInto(tableName: String): Unit
}Usage Examples:
// Save as managed table
df.write
.mode("overwrite")
.saveAsTable("my_database.my_table")
// Insert into existing table
df.write
.mode("append")
.insertInto("existing_table")
// Partitioned table
df.write
.partitionBy("year", "month")
.mode("overwrite")
.saveAsTable("partitioned_table")class DataFrameWriter[T] {
def jdbc(url: String, table: String, connectionProperties: Properties): Unit
}Usage Examples:
// Write to database
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
df.write
.mode("overwrite")
.jdbc("jdbc:mysql://localhost:3306/mydb", "users", connectionProperties)
// With additional options
df.write
.option("batchsize", "10000")
.option("truncate", "true")
.mode("overwrite")
.jdbc("jdbc:postgresql://localhost:5432/mydb", "large_table", connectionProperties)// Write bucketed data for optimized joins
df.write
.bucketBy(42, "user_id")
.sortBy("timestamp")
.mode("overwrite")
.saveAsTable("bucketed_events")// Write same data to multiple formats
val writer = df.write.mode("overwrite")
writer.parquet("data.parquet")
writer.json("data.json")
writer.option("header", "true").csv("data.csv")// Write different partitions to different locations
df.filter(col("region") === "US")
.write
.mode("overwrite")
.parquet("us_data/")
df.filter(col("region") === "EU")
.write
.mode("overwrite")
.parquet("eu_data/")// Handle schema changes in Parquet
df.write
.option("mergeSchema", "true")
.mode("append")
.parquet("evolving_schema/")path: File system pathrecursiveFileLookup: Recursively search subdirectories (default: false)pathGlobFilter: Glob pattern for file filteringmodifiedBefore: Only files modified before timestampmodifiedAfter: Only files modified after timestampnone, snappy, gzip, lzo, brotli, lz4, zstdnone, bzip2, gzip, lz4, snappy, deflatenone, zlib, snappy, lzo, lz4, zstd