or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mddata-sources.mddata-types.mddataframe-dataset.mdindex.mdsession-management.mdsql-functions.mdstreaming.mdudfs.md
tile.json

data-sources.mddocs/

Apache Spark SQL - Data Sources and I/O Operations

Capabilities

Data Loading and Reading Operations

  • Read data from various formats including Parquet, JSON, CSV, ORC, Avro, Delta, and text files
  • Connect to external systems like JDBC databases, Kafka, Cassandra, and cloud storage systems
  • Support for schema inference with configurable options and explicit schema specification
  • Handle compressed files and partitioned datasets with optimized reading strategies

Data Writing and Persistence Operations

  • Write DataFrames to multiple output formats with configurable compression and partitioning
  • Support for different save modes including append, overwrite, error-if-exists, and ignore
  • Enable atomic writes and transactional operations for supported formats
  • Handle large-scale data exports with optimized parallel writing and bucketing strategies

Schema Management and Evolution

  • Infer schemas automatically from data sources with type promotion and null handling
  • Support schema evolution and merging for compatible schema changes
  • Validate schemas during read and write operations with comprehensive error reporting
  • Handle schema mismatches with configurable behavior for missing or extra columns

Advanced I/O Configuration and Optimization

  • Configure read and write operations with format-specific options for performance tuning
  • Support for predicate pushdown and column pruning for optimized query execution
  • Handle partitioning strategies including static and dynamic partitioning schemes
  • Enable compression algorithms and encoding schemes for storage optimization

API Reference

DataFrameReader Class

class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
  // Format specification
  def format(source: String): DataFrameReader
  
  // Schema operations
  def schema(schema: StructType): DataFrameReader
  def schema(schemaString: String): DataFrameReader
  
  // Options configuration
  def option(key: String, value: String): DataFrameReader
  def option(key: String, value: Boolean): DataFrameReader
  def option(key: String, value: Long): DataFrameReader
  def option(key: String, value: Double): DataFrameReader
  def options(options: scala.collection.Map[String, String]): DataFrameReader
  def options(options: java.util.Map[String, String]): DataFrameReader
  
  // Load operations
  def load(): DataFrame
  def load(path: String): DataFrame
  def load(paths: String*): DataFrame
  
  // Format-specific loaders
  def json(path: String): DataFrame
  def json(paths: String*): DataFrame  
  def json(jsonDataset: Dataset[String]): DataFrame
  def csv(path: String): DataFrame
  def csv(paths: String*): DataFrame
  def parquet(path: String): DataFrame
  def parquet(paths: String*): DataFrame
  def orc(path: String): DataFrame
  def orc(paths: String*): DataFrame
  def text(path: String): DataFrame
  def text(paths: String*): DataFrame
  def textFile(path: String): Dataset[String]
  def textFile(paths: String*): Dataset[String]
  
  // Database and external sources
  def jdbc(url: String, table: String, properties: Properties): DataFrame
  def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame
  def jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame
  
  // Table operations
  def table(tableName: String): DataFrame
}

DataFrameWriter[T] Class

class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
  // Format specification
  def format(source: String): DataFrameWriter[T]
  
  // Save mode configuration
  def mode(saveMode: SaveMode): DataFrameWriter[T]
  def mode(saveMode: String): DataFrameWriter[T]
  
  // Options configuration
  def option(key: String, value: String): DataFrameWriter[T]
  def option(key: String, value: Boolean): DataFrameWriter[T] 
  def option(key: String, value: Long): DataFrameWriter[T]
  def option(key: String, value: Double): DataFrameWriter[T]
  def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
  def options(options: java.util.Map[String, String]): DataFrameWriter[T]
  
  // Partitioning and bucketing
  def partitionBy(colNames: String*): DataFrameWriter[T]
  def partitionBy(cols: Seq[String]): DataFrameWriter[T]
  def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
  def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
  
  // Save operations
  def save(): Unit
  def save(path: String): Unit
  
  // Format-specific writers
  def json(path: String): Unit
  def csv(path: String): Unit
  def parquet(path: String): Unit
  def orc(path: String): Unit
  def text(path: String): Unit
  
  // Database operations
  def jdbc(url: String, table: String, connectionProperties: Properties): Unit
  
  // Table operations
  def saveAsTable(tableName: String): Unit
  def insertInto(tableName: String): Unit
}

Save Modes

sealed abstract class SaveMode

object SaveMode {
  case object Append extends SaveMode
  case object Overwrite extends SaveMode
  case object ErrorIfExists extends SaveMode  
  case object Ignore extends SaveMode
  
  def valueOf(modeName: String): SaveMode
}

DataFrameWriterV2[T] Class

class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) {
  // Write operations
  def append(): Unit
  def overwrite(): Unit
  def overwritePartitions(): Unit
  
  // Conditional operations
  def createOrReplace(): Unit
  def create(): Unit
  def replace(): Unit
  
  // Partitioning
  def partitionedBy(column: Column, columns: Column*): DataFrameWriterV2[T]
  def partitionedBy(transform: String, transforms: String*): DataFrameWriterV2[T]
  
  // Options
  def option(key: String, value: String): DataFrameWriterV2[T]
  def option(key: String, value: Boolean): DataFrameWriterV2[T]
  def option(key: String, value: Long): DataFrameWriterV2[T]
  def option(key: String, value: Double): DataFrameWriterV2[T]
  def options(options: Map[String, String]): DataFrameWriterV2[T]
  def options(options: java.util.Map[String, String]): DataFrameWriterV2[T]
  
  // Table properties
  def tableProperty(property: String, value: String): DataFrameWriterV2[T]
  
  // Using clause for advanced operations
  def using(provider: String): DataFrameWriterV2[T]
}

Format-Specific Options

JSON Options

// JSON Reader Options
class JSONOptions(
  val samplingRatio: Double = 1.0,
  val primitivesAsString: Boolean = false,
  val prefersDecimal: Boolean = false,
  val allowComments: Boolean = false,
  val allowUnquotedFieldNames: Boolean = false,
  val allowSingleQuotes: Boolean = true,
  val allowNumericLeadingZeros: Boolean = false,
  val allowBackslashEscapingAnyCharacter: Boolean = false,
  val allowUnquotedControlChars: Boolean = false,
  val mode: ParseMode = ParseMode.FAILFAST,
  val columnNameOfCorruptRecord: String = "_corrupt_record",
  val dateFormat: String = "yyyy-MM-dd",
  val timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]",
  val multiLine: Boolean = false,
  val lineSep: String = null,
  val encoding: String = null,
  val locale: Locale = Locale.US,
  val pathGlobFilter: String = null,
  val recursiveFileLookup: Boolean = false,
  val modifiedBefore: String = null,
  val modifiedAfter: String = null
)

CSV Options

// CSV Reader Options  
class CSVOptions(
  val delimiter: Char = ',',
  val quote: Char = '"',
  val escape: Char = '\\',
  val charToEscapeQuoteEscaping: Char = '\u0000',
  val comment: Char = '\u0000',
  val header: Boolean = false,
  val inferSchema: Boolean = false,
  val ignoreLeadingWhiteSpace: Boolean = false,
  val ignoreTrailingWhiteSpace: Boolean = false,
  val nullValue: String = "",
  val emptyValue: String = "",
  val nanValue: String = "NaN",
  val positiveInf: String = "Inf",
  val negativeInf: String = "-Inf",
  val dateFormat: String = "yyyy-MM-dd",
  val timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]",
  val maxColumns: Int = 20480,
  val maxCharsPerColumn: Int = -1,
  val mode: ParseMode = ParseMode.FAILFAST,
  val columnNameOfCorruptRecord: String = "_corrupt_record",
  val multiLine: Boolean = false,
  val locale: Locale = Locale.US,
  val lineSep: String = null,
  val pathGlobFilter: String = null,
  val recursiveFileLookup: Boolean = false,
  val modifiedBefore: String = null,
  val modifiedAfter: String = null,
  val unescapedQuoteHandling: UnescapedQuoteHandling.Value = UnescapedQuoteHandling.STOP_AT_DELIMITER
)

Parquet Options

// Parquet Options
class ParquetOptions(
  val compression: String = "snappy", // none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd
  val mergeSchema: Boolean = false,
  val pathGlobFilter: String = null,
  val recursiveFileLookup: Boolean = false,
  val modifiedBefore: String = null,
  val modifiedAfter: String = null,
  val datetimeRebaseMode: String = "EXCEPTION", // EXCEPTION, CORRECTED, LEGACY
  val int96RebaseMode: String = "EXCEPTION"
)

JDBC Options

// JDBC Options
class JDBCOptions(
  val url: String,
  val table: String,
  val driver: String = null,
  val partitionColumn: String = null,
  val lowerBound: String = null,
  val upperBound: String = null,
  val numPartitions: String = null,
  val queryTimeout: String = "0",
  val fetchsize: String = "0",
  val batchsize: String = "1000",
  val isolationLevel: String = "READ_UNCOMMITTED",
  val sessionInitStatement: String = null,
  val truncate: Boolean = false,
  val cascadeTruncate: Boolean = false,
  val createTableOptions: String = "",
  val createTableColumnTypes: String = null,
  val customSchema: String = null,
  val pushDownPredicate: Boolean = true,
  val pushDownAggregate: Boolean = false,
  val pushDownLimit: Boolean = false,
  val pushDownTableSample: Boolean = false,
  val keytab: String = null,
  val principal: String = null,
  val refreshKrb5Config: Boolean = false,
  val connectionProvider: String = null
)

Usage Examples

Basic File I/O Operations

import org.apache.spark.sql.{SparkSession, SaveMode}
import org.apache.spark.sql.types._

val spark = SparkSession.builder()
  .appName("Data Sources Demo")
  .getOrCreate()

// Reading JSON files
val jsonDF = spark.read
  .format("json")
  .option("multiLine", "true")
  .option("mode", "PERMISSIVE")
  .load("/path/to/json/files/*.json")

// Alternative JSON reading with schema
val jsonSchema = StructType(Array(
  StructField("id", IntegerType, nullable = false),
  StructField("name", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true),
  StructField("email", StringType, nullable = true)
))

val typedJsonDF = spark.read
  .schema(jsonSchema)
  .json("/path/to/json/files")

// Reading CSV files
val csvDF = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .option("delimiter", ",")
  .option("quote", "\"")
  .option("escape", "\\")
  .option("nullValue", "NULL")
  .option("dateFormat", "yyyy-MM-dd")
  .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
  .load("/path/to/csv/files")

// Reading Parquet files
val parquetDF = spark.read
  .format("parquet")
  .option("mergeSchema", "true")
  .load("/path/to/parquet/files")

// Reading ORC files
val orcDF = spark.read
  .format("orc")
  .load("/path/to/orc/files")

// Reading text files
val textDF = spark.read
  .text("/path/to/text/files")

val textFileDS = spark.read
  .textFile("/path/to/text/files")

Writing Data to Different Formats

// Sample DataFrame for writing examples
val sampleData = Seq(
  (1, "Alice", 25, "alice@example.com", "2023-01-15"),
  (2, "Bob", 30, "bob@example.com", "2023-02-20"),
  (3, "Charlie", 35, "charlie@example.com", "2023-03-10")
).toDF("id", "name", "age", "email", "join_date")

// Writing to JSON
sampleData.write
  .format("json")
  .mode(SaveMode.Overwrite)
  .option("compression", "gzip")
  .save("/path/to/output/json")

// Writing to CSV  
sampleData.write
  .format("csv")
  .mode(SaveMode.Overwrite)
  .option("header", "true")
  .option("delimiter", ",")
  .option("quote", "\"")
  .option("escape", "\\")
  .option("compression", "gzip")
  .save("/path/to/output/csv")

// Writing to Parquet with partitioning
sampleData.write
  .format("parquet")
  .mode(SaveMode.Overwrite)
  .option("compression", "snappy")
  .partitionBy("join_date")
  .save("/path/to/output/parquet")

// Writing to ORC
sampleData.write
  .format("orc")
  .mode(SaveMode.Overwrite)  
  .option("compression", "zlib")
  .save("/path/to/output/orc")

// Writing to text (single column)
sampleData.select("name").write
  .mode(SaveMode.Overwrite)
  .text("/path/to/output/text")

Advanced Partitioning and Bucketing

// Dynamic partitioning
val salesData = Seq(
  (1, "Product A", 100.0, "2023-01-15", "Electronics", "US"),
  (2, "Product B", 150.0, "2023-01-16", "Clothing", "US"),
  (3, "Product C", 200.0, "2023-01-15", "Electronics", "UK"),
  (4, "Product D", 120.0, "2023-01-17", "Books", "CA")
).toDF("id", "product_name", "price", "sale_date", "category", "country")

// Multi-level partitioning
salesData.write
  .format("parquet")
  .mode(SaveMode.Overwrite)
  .partitionBy("country", "category")
  .option("compression", "snappy")
  .save("/path/to/partitioned/sales")

// Bucketing for better join performance
salesData.write
  .format("parquet")
  .mode(SaveMode.Overwrite)
  .bucketBy(10, "id") // 10 buckets based on id column
  .sortBy("price") // Sort within buckets
  .option("path", "/path/to/bucketed/sales")
  .saveAsTable("bucketed_sales")

// Custom partitioning with transformation
import org.apache.spark.sql.functions._

val partitionedData = salesData.withColumn("year", year(to_date($"sale_date")))
  .withColumn("month", month(to_date($"sale_date")))

partitionedData.write
  .format("delta") // Using Delta format for ACID properties
  .mode(SaveMode.Overwrite)
  .partitionBy("year", "month", "country")
  .save("/path/to/delta/sales")

Database Connectivity (JDBC)

import java.util.Properties

// JDBC connection properties
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
connectionProperties.put("driver", "org.postgresql.Driver")

// Reading from database
val jdbcDF = spark.read
  .jdbc(
    url = "jdbc:postgresql://localhost:5432/mydb",
    table = "employees",
    properties = connectionProperties
  )

// Reading with SQL query
val queryDF = spark.read
  .jdbc(
    url = "jdbc:postgresql://localhost:5432/mydb",
    table = "(SELECT * FROM employees WHERE department = 'Engineering') AS emp",
    properties = connectionProperties
  )

// Reading with partitioning for large tables
val partitionedJdbcDF = spark.read
  .jdbc(
    url = "jdbc:postgresql://localhost:5432/mydb",
    table = "large_table",
    columnName = "id", // Partition column
    lowerBound = 1,
    upperBound = 1000000,
    numPartitions = 10,
    connectionProperties = connectionProperties
  )

// Reading with custom predicates
val predicates = Array(
  "department = 'Engineering'",
  "department = 'Sales'", 
  "department = 'Marketing'"
)

val predicateDF = spark.read
  .jdbc(
    url = "jdbc:postgresql://localhost:5432/mydb",
    table = "employees",
    predicates = predicates,
    connectionProperties = connectionProperties
  )

// Writing to database
sampleData.write
  .mode(SaveMode.Append)
  .jdbc(
    url = "jdbc:postgresql://localhost:5432/mydb",
    table = "new_employees",
    connectionProperties = connectionProperties
  )

// Writing with custom options
sampleData.write
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost:5432/mydb")
  .option("dbtable", "employees_backup")
  .option("user", "username")
  .option("password", "password")
  .option("driver", "org.postgresql.Driver")
  .option("batchsize", "10000")
  .option("isolationLevel", "READ_COMMITTED")
  .mode(SaveMode.Overwrite)
  .save()

Schema Handling and Evolution

// Schema inference with sampling
val inferredDF = spark.read
  .format("json")
  .option("samplingRatio", "0.1") // Sample 10% for schema inference
  .option("prefersDecimal", "true")
  .load("/path/to/large/json/files")

// Explicit schema definition
val explicitSchema = StructType(Array(
  StructField("user_id", StringType, nullable = false),
  StructField("event_type", StringType, nullable = false),
  StructField("timestamp", TimestampType, nullable = false),
  StructField("properties", MapType(StringType, StringType), nullable = true),
  StructField("user_properties", StructType(Array(
    StructField("age", IntegerType, nullable = true),
    StructField("country", StringType, nullable = true),
    StructField("premium", BooleanType, nullable = false)
  )), nullable = true)
))

val typedDF = spark.read
  .schema(explicitSchema)
  .format("json")
  .load("/path/to/json/events")

// Schema evolution with Parquet
val evolvedParquetDF = spark.read
  .format("parquet") 
  .option("mergeSchema", "true") // Merge schemas from different files
  .load("/path/to/evolved/parquet/files")

// Handle corrupt records
val corruptHandlingDF = spark.read
  .format("json")
  .option("mode", "PERMISSIVE") // PERMISSIVE, DROPMALFORMED, FAILFAST
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .schema(explicitSchema.add("_corrupt_record", StringType))
  .load("/path/to/potentially/corrupt/json")

// Schema compatibility checking
def validateSchema(df: DataFrame, expectedSchema: StructType): Boolean = {
  val actualSchema = df.schema
  expectedSchema.fields.forall { expectedField =>
    actualSchema.fields.exists { actualField =>
      actualField.name == expectedField.name &&
      actualField.dataType == expectedField.dataType
    }
  }
}

val isCompatible = validateSchema(typedDF, explicitSchema)
println(s"Schema compatible: $isCompatible")

Compressed File Handling

// Reading compressed files
val gzipJsonDF = spark.read
  .format("json")
  .option("compression", "gzip")
  .load("/path/to/compressed/*.json.gz")

val bzip2CsvDF = spark.read
  .format("csv")
  .option("header", "true")
  .option("compression", "bzip2")
  .load("/path/to/compressed/*.csv.bz2")

// Writing with different compression algorithms
sampleData.write
  .format("parquet")
  .option("compression", "snappy") // snappy, gzip, lzo, brotli, lz4, zstd
  .mode(SaveMode.Overwrite)
  .save("/path/to/compressed/parquet")

sampleData.write
  .format("json")
  .option("compression", "gzip")
  .mode(SaveMode.Overwrite) 
  .save("/path/to/compressed/json")

sampleData.write
  .format("csv")
  .option("header", "true")
  .option("compression", "bzip2")
  .mode(SaveMode.Overwrite)
  .save("/path/to/compressed/csv")

// Optimal compression settings for different use cases
// For archival (maximize compression)
sampleData.write
  .format("parquet")
  .option("compression", "gzip")
  .option("parquet.block.size", "268435456") // 256MB blocks
  .save("/path/to/archive")

// For query performance (balance compression and speed)  
sampleData.write
  .format("parquet")
  .option("compression", "snappy")
  .option("parquet.page.size", "1048576") // 1MB pages
  .save("/path/to/queryable")

Advanced File Operations

// File globbing and filtering
val filteredDF = spark.read
  .format("json")
  .option("pathGlobFilter", "*.json") // Only JSON files
  .option("recursiveFileLookup", "true") // Recursive directory traversal
  .option("modifiedAfter", "2023-01-01T00:00:00") // Files modified after date
  .option("modifiedBefore", "2023-12-31T23:59:59") // Files modified before date
  .load("/path/to/data")

// Multi-path loading
val multiPathDF = spark.read
  .format("parquet")
  .load(
    "/path/to/data/2023/01/*",
    "/path/to/data/2023/02/*", 
    "/path/to/data/2023/03/*"
  )

// Incremental data loading
import org.apache.spark.sql.functions._

def loadIncrementalData(basePath: String, lastProcessedTime: String): DataFrame = {
  spark.read
    .format("parquet")
    .option("modifiedAfter", lastProcessedTime)
    .load(basePath)
    .filter($"event_time" > lit(lastProcessedTime))
}

val incrementalDF = loadIncrementalData("/path/to/events", "2023-12-01T00:00:00")

// Data source V2 operations
val v2Writer = sampleData.writeTo("catalog.db.table")
  .option("write.format.default", "parquet")
  .option("write.parquet.compression-codec", "snappy")

// Conditional writes
v2Writer.createOrReplace() // Create table or replace if exists
// v2Writer.create() // Create table, fail if exists
// v2Writer.append() // Append to existing table
// v2Writer.overwrite() // Overwrite entire table
// v2Writer.overwritePartitions() // Overwrite specific partitions

Error Handling and Data Quality

import org.apache.spark.sql.functions._

// Robust CSV reading with error handling
val robustCsvDF = spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE") // Continue processing despite errors
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .option("maxMalformedLogPerPartition", "10") // Log up to 10 malformed records per partition
  .schema(
    StructType(Array(
      StructField("id", IntegerType, nullable = true),
      StructField("name", StringType, nullable = true), 
      StructField("age", IntegerType, nullable = true),
      StructField("_corrupt_record", StringType, nullable = true)
    ))
  )
  .load("/path/to/potentially/malformed/csv")

// Analyze data quality
val qualityReport = robustCsvDF.agg(
  count("*").as("total_records"),
  sum(when($"_corrupt_record".isNull, 1).otherwise(0)).as("valid_records"),
  sum(when($"_corrupt_record".isNotNull, 1).otherwise(0)).as("corrupt_records"),
  sum(when($"id".isNull, 1).otherwise(0)).as("missing_ids"),
  sum(when($"name".isNull || $"name" === "", 1).otherwise(0)).as("missing_names")
)

qualityReport.show()

// Data validation and cleansing
val cleanedDF = robustCsvDF
  .filter($"_corrupt_record".isNull) // Remove corrupt records
  .filter($"id".isNotNull && $"id" > 0) // Valid IDs only
  .filter($"name".isNotNull && length(trim($"name")) > 0) // Non-empty names
  .filter($"age".isNull || ($"age" >= 0 && $"age" <= 150)) // Reasonable ages
  .drop("_corrupt_record")

// Write validation results
val validationSummary = Map(
  "source_path" -> "/path/to/potentially/malformed/csv",
  "processing_time" -> java.time.Instant.now().toString,
  "total_input_records" -> robustCsvDF.count(),
  "valid_output_records" -> cleanedDF.count(),
  "data_quality_score" -> (cleanedDF.count().toDouble / robustCsvDF.count() * 100)
)

// Save validation report
spark.createDataFrame(Seq(validationSummary)).write
  .format("json")
  .mode(SaveMode.Append)
  .save("/path/to/validation/reports")