Reading and writing data from various sources and formats. Supports batch and streaming data with extensive configuration options, built-in format support, and custom data source integration.
Interface for reading data from external storage systems into DataFrames.
/**
* Interface for reading data from external storage systems
*/
class DataFrameReader {
/** Specify data source format */
def format(source: String): DataFrameReader
/** Set schema for the data */
def schema(schema: StructType): DataFrameReader
def schema(schemaString: String): DataFrameReader
/** Set options for the data source */
def option(key: String, value: String): DataFrameReader
def option(key: String, value: Boolean): DataFrameReader
def option(key: String, value: Long): DataFrameReader
def option(key: String, value: Double): DataFrameReader
def options(options: scala.collection.Map[String, String]): DataFrameReader
def options(options: java.util.Map[String, String]): DataFrameReader
/** Load data using generic interface */
def load(): DataFrame
def load(path: String): DataFrame
def load(paths: String*): DataFrame
/** Built-in format readers */
def json(path: String): DataFrame
def json(paths: String*): DataFrame
def json(jsonRDD: RDD[String]): DataFrame
def json(jsonDataset: Dataset[String]): DataFrame
def parquet(paths: String*): DataFrame
def orc(paths: String*): DataFrame
def text(paths: String*): DataFrame
def textFile(paths: String*): Dataset[String]
def csv(paths: String*): DataFrame
def table(tableName: String): DataFrame
/** JDBC data source */
def jdbc(url: String, table: String, properties: java.util.Properties): DataFrame
def jdbc(url: String, table: String, predicates: Array[String],
connectionProperties: java.util.Properties): DataFrame
def jdbc(url: String, table: String, columnName: String,
lowerBound: Long, upperBound: Long, numPartitions: Int,
connectionProperties: java.util.Properties): DataFrame
}Usage Examples:
// JSON with schema inference
val df1 = spark.read
.option("multiline", "true")
.json("path/to/file.json")
// CSV with custom options
val df2 = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ",")
.csv("path/to/file.csv")
// Parquet (schema preserved)
val df3 = spark.read.parquet("path/to/*.parquet")
// With explicit schema
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("salary", DoubleType, nullable = true)
))
val df4 = spark.read
.schema(schema)
.option("header", "true")
.csv("employees.csv")
// JDBC connection
val df5 = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/mydb")
.option("dbtable", "employees")
.option("user", "username")
.option("password", "password")
.load()
// Custom data source
val df6 = spark.read
.format("org.apache.spark.sql.cassandra")
.option("keyspace", "mykeyspace")
.option("table", "mytable")
.load()Interface for writing Dataset to external storage systems.
/**
* Interface for writing Dataset to external storage systems
* @tparam T Type of the Dataset
*/
class DataFrameWriter[T] {
/** Specify output format */
def format(source: String): DataFrameWriter[T]
/** Set save mode */
def mode(saveMode: SaveMode): DataFrameWriter[T]
def mode(saveMode: String): DataFrameWriter[T]
/** Set options for the data source */
def option(key: String, value: String): DataFrameWriter[T]
def option(key: String, value: Boolean): DataFrameWriter[T]
def option(key: String, value: Long): DataFrameWriter[T]
def option(key: String, value: Double): DataFrameWriter[T]
def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
def options(options: java.util.Map[String, String]): DataFrameWriter[T]
/** Partition output by columns */
def partitionBy(colNames: String*): DataFrameWriter[T]
/** Bucket output by columns */
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
/** Save using generic interface */
def save(): Unit
def save(path: String): Unit
/** Built-in format writers */
def json(path: String): Unit
def parquet(path: String): Unit
def orc(path: String): Unit
def text(path: String): Unit
def csv(path: String): Unit
/** Save as table */
def saveAsTable(tableName: String): Unit
def insertInto(tableName: String): Unit
/** JDBC output */
def jdbc(url: String, table: String, connectionProperties: java.util.Properties): Unit
}
/**
* Save modes for writing data
*/
object SaveMode extends Enumeration {
type SaveMode = Value
val Append, Overwrite, ErrorIfExists, Ignore = Value
}Usage Examples:
val df = spark.table("employees")
// Basic save operations
df.write
.mode(SaveMode.Overwrite)
.parquet("output/employees.parquet")
df.write
.mode("append")
.option("header", "true")
.csv("output/employees.csv")
// Partitioned output
df.write
.mode(SaveMode.Overwrite)
.partitionBy("department", "year")
.parquet("output/employees_partitioned")
// Bucketed output
df.write
.mode(SaveMode.Overwrite)
.bucketBy(10, "employee_id")
.sortBy("salary")
.saveAsTable("bucketed_employees")
// JDBC output
df.write
.mode(SaveMode.Overwrite)
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/mydb")
.option("dbtable", "employees")
.option("user", "username")
.option("password", "password")
.save()
// Custom format with options
df.write
.mode(SaveMode.Append)
.format("delta")
.option("mergeSchema", "true")
.save("path/to/delta-table")Configuration options for built-in data sources.
// CSV Options
object CsvOptions {
val DELIMITER = "delimiter" // Field delimiter (default: ",")
val QUOTE = "quote" // Quote character (default: "\"")
val ESCAPE = "escape" // Escape character (default: "\")
val HEADER = "header" // First line is header (default: "false")
val INFER_SCHEMA = "inferSchema" // Infer schema from data (default: "false")
val NULL_VALUE = "nullValue" // String representation of null (default: "")
val DATE_FORMAT = "dateFormat" // Date format (default: "yyyy-MM-dd")
val TIMESTAMP_FORMAT = "timestampFormat" // Timestamp format
val MAX_COLUMNS = "maxColumns" // Maximum number of columns
val MAX_CHARS_PER_COLUMN = "maxCharsPerColumn" // Max chars per column
val ENCODING = "encoding" // Character encoding (default: "UTF-8")
val COMMENT = "comment" // Comment character
val MODE = "mode" // Parse mode: PERMISSIVE, DROPMALFORMED, FAILFAST
}
// JSON Options
object JsonOptions {
val ALLOW_COMMENTS = "allowComments" // Allow comments (default: "false")
val ALLOW_UNQUOTED_FIELD_NAMES = "allowUnquotedFieldNames" // Allow unquoted field names
val ALLOW_SINGLE_QUOTES = "allowSingleQuotes" // Allow single quotes
val ALLOW_NUMERIC_LEADING_ZEROS = "allowNumericLeadingZeros" // Allow leading zeros
val ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER = "allowBackslashEscapingAnyCharacter"
val MULTILINE = "multiline" // Parse multiline JSON (default: "false")
val DATE_FORMAT = "dateFormat" // Date format
val TIMESTAMP_FORMAT = "timestampFormat" // Timestamp format
val PRIMITIVE_AS_STRING = "primitivesAsString" // Parse primitives as strings
}
// Parquet Options
object ParquetOptions {
val MERGE_SCHEMA = "mergeSchema" // Merge schemas from multiple files
val COMPRESSION = "compression" // Compression codec: none, snappy, gzip, lzo
val DICTIONARY_ENCODING = "dictionaryEncoding" // Use dictionary encoding
}
// JDBC Options
object JdbcOptions {
val DRIVER = "driver" // JDBC driver class name
val USER = "user" // Username
val PASSWORD = "password" // Password
val FETCH_SIZE = "fetchsize" // JDBC fetch size
val BATCH_SIZE = "batchsize" // JDBC batch size for inserts
val ISOLATION_LEVEL = "isolationLevel" // Transaction isolation level
val NUM_PARTITIONS = "numPartitions" // Number of partitions for parallel reads
val PARTITION_COLUMN = "partitionColumn" // Column for partitioning
val LOWER_BOUND = "lowerBound" // Lower bound for partitioning
val UPPER_BOUND = "upperBound" // Upper bound for partitioning
val QUERY_TIMEOUT = "queryTimeout" // Query timeout in seconds
val CREATE_TABLE_OPTIONS = "createTableOptions" // Options for CREATE TABLE
val CREATE_TABLE_COLUMN_TYPES = "createTableColumnTypes" // Column types for CREATE TABLE
val CUSTOM_SCHEMA = "customSchema" // Custom schema for reading
}Support for various data formats and storage systems.
// File-based sources
val CSV_SOURCE = "csv"
val JSON_SOURCE = "json"
val PARQUET_SOURCE = "parquet"
val ORC_SOURCE = "orc"
val TEXT_SOURCE = "text"
val AVRO_SOURCE = "avro" // Requires spark-avro package
// Database sources
val JDBC_SOURCE = "jdbc"
// Big data sources
val HIVE_SOURCE = "hive"
val DELTA_SOURCE = "delta" // Requires Delta Lake
val ICEBERG_SOURCE = "iceberg" // Requires Apache Iceberg
// Cloud sources (require appropriate dependencies)
val S3_SOURCE = "s3"
val AZURE_SOURCE = "azure"
val GCS_SOURCE = "gcs"
// Streaming sources
val KAFKA_SOURCE = "kafka"
val SOCKET_SOURCE = "socket"
val RATE_SOURCE = "rate" // For testingCommon patterns for data ingestion and output.
Multi-format reading:
// Read from multiple formats
val jsonDf = spark.read.json("data/*.json")
val csvDf = spark.read.option("header", "true").csv("data/*.csv")
val combined = jsonDf.union(csvDf)
// Schema evolution with Parquet
val df1 = spark.read.parquet("data/year=2021")
val df2 = spark.read.parquet("data/year=2022")
val merged = df1.union(df2)Optimized writes:
// Optimize partition size
df.repartition(200).write
.mode(SaveMode.Overwrite)
.parquet("output/data")
// Coalesce for fewer files
df.coalesce(10).write
.mode(SaveMode.Overwrite)
.json("output/data")
// Dynamic partitioning
df.write
.mode(SaveMode.Overwrite)
.partitionBy("year", "month")
.option("maxRecordsPerFile", "100000")
.parquet("output/partitioned_data")Error handling:
// Handle malformed records
val df = spark.read
.option("mode", "DROPMALFORMED") // or "PERMISSIVE", "FAILFAST"
.option("columnNameOfCorruptRecord", "_corrupt_record")
.json("data/potentially_bad.json")
// Validate data after read
val validDf = df.filter(col("_corrupt_record").isNull)
val corruptDf = df.filter(col("_corrupt_record").isNotNull)Custom data sources:
// Register custom format
spark.sql("CREATE TABLE custom_table USING org.example.CustomDataSource OPTIONS (path 'data/custom')")
// Use programmatically
val df = spark.read
.format("org.example.CustomDataSource")
.option("customOption", "value")
.load("data/custom")