or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md
tile.json

data-io.mddocs/

Data Input and Output

Spark SQL provides comprehensive I/O capabilities through DataFrameReader and DataFrameWriter interfaces. These support reading from and writing to various data sources including files (JSON, Parquet, CSV, ORC, text), databases (JDBC), and streaming sources.

DataFrameReader

class DataFrameReader {
  def format(source: String): DataFrameReader
  def schema(schema: StructType): DataFrameReader
  def schema(schemaString: String): DataFrameReader
  def option(key: String, value: String): DataFrameReader
  def option(key: String, value: Boolean): DataFrameReader
  def option(key: String, value: Long): DataFrameReader
  def option(key: String, value: Double): DataFrameReader
  def options(options: scala.collection.Map[String, String]): DataFrameReader
  def options(options: java.util.Map[String, String]): DataFrameReader
  def load(): DataFrame
  def load(path: String): DataFrame
  def load(paths: String*): DataFrame
}

File Format Readers

JSON Files

class DataFrameReader {
  def json(path: String): DataFrame
  def json(paths: String*): DataFrame
  def json(jsonRDD: RDD[String]): DataFrame
  def json(jsonDataset: Dataset[String]): DataFrame
}

Usage Examples:

// Basic JSON reading
val df = spark.read.json("path/to/file.json")
val multipleFiles = spark.read.json("file1.json", "file2.json")

// With options
val jsonDF = spark.read
  .option("multiline", "true")
  .option("allowComments", "true")
  .option("allowUnquotedFieldNames", "true")
  .json("complex.json")

// With explicit schema for better performance
import org.apache.spark.sql.types._
val schema = StructType(Array(
  StructField("name", StringType, true),
  StructField("age", IntegerType, true),
  StructField("city", StringType, true)
))

val typedJson = spark.read
  .schema(schema)
  .json("people.json")

// From RDD or Dataset of strings
val jsonStrings = spark.sparkContext.parallelize(Seq(
  """{"name": "Alice", "age": 25}""",
  """{"name": "Bob", "age": 30}"""
))
val fromRDD = spark.read.json(jsonStrings)

Common JSON Options:

  • multiline: Parse multi-line JSON objects (default: false)
  • allowComments: Allow JavaScript-style comments (default: false)
  • allowUnquotedFieldNames: Allow unquoted field names (default: false)
  • allowSingleQuotes: Allow single quotes (default: true)
  • primitivesAsString: Infer all primitive values as strings (default: false)

Parquet Files

class DataFrameReader {
  def parquet(paths: String*): DataFrame
}

Usage Examples:

// Read Parquet files
val parquetDF = spark.read.parquet("data.parquet")
val multipleParquet = spark.read.parquet("part1.parquet", "part2.parquet")

// With options
val parquetWithOptions = spark.read
  .option("mergeSchema", "true")
  .parquet("partitioned_data/")

// Read partitioned Parquet data
val partitioned = spark.read.parquet("data/year=2023/month=*/day=*")

Common Parquet Options:

  • mergeSchema: Merge schemas from multiple files (default: false)
  • recursiveFileLookup: Recursively search subdirectories (default: false)

CSV Files

class DataFrameReader {
  def csv(paths: String*): DataFrame
}

Usage Examples:

// Basic CSV reading
val csvDF = spark.read.csv("data.csv")

// With options and header
val csvWithHeader = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .option("sep", ",")
  .csv("people.csv")

// With explicit schema
val csvSchema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("name", StringType, true),
  StructField("salary", DoubleType, true)
))

val typedCSV = spark.read
  .schema(csvSchema)
  .option("header", "true")
  .csv("employees.csv")

Common CSV Options:

  • header: Whether first line is header (default: false)
  • inferSchema: Automatically infer column types (default: false)
  • sep: Field separator character (default: ",")
  • quote: Quote character (default: """)
  • escape: Escape character (default: "\")
  • nullValue: String representing null values (default: "")
  • dateFormat: Date format string (default: "yyyy-MM-dd")
  • timestampFormat: Timestamp format string (default: "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")

ORC Files

class DataFrameReader {
  def orc(paths: String*): DataFrame
}

Usage Examples:

// Read ORC files
val orcDF = spark.read.orc("data.orc")

// With options
val orcWithOptions = spark.read
  .option("mergeSchema", "true")
  .orc("orc_data/")

Text Files

class DataFrameReader {
  def text(paths: String*): DataFrame
  def textFile(paths: String*): Dataset[String]
}

Usage Examples:

// Read as DataFrame with single "value" column
val textDF = spark.read.text("log.txt")

// Read as Dataset[String]
val textDS = spark.read.textFile("documents/*.txt")

// With encoding option
val encodedText = spark.read
  .option("encoding", "UTF-8")
  .text("data.txt")

Database Connectivity (JDBC)

class DataFrameReader {
  def jdbc(url: String, table: String, properties: Properties): DataFrame
  def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame
  def jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame
  def jdbc(url: String, table: String, columnName: String, lowerBound: Date, upperBound: Date, numPartitions: Int, connectionProperties: Properties): DataFrame
  def jdbc(url: String, table: String, columnName: String, lowerBound: Timestamp, upperBound: Timestamp, numPartitions: Int, connectionProperties: Properties): DataFrame
}

Usage Examples:

import java.util.Properties

// Basic JDBC reading
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
connectionProperties.put("driver", "com.mysql.jdbc.Driver")

val jdbcDF = spark.read
  .jdbc("jdbc:mysql://localhost:3306/mydb", "users", connectionProperties)

// With partitioning for performance
val partitionedJDBC = spark.read
  .jdbc(
    url = "jdbc:postgresql://localhost:5432/mydb",
    table = "large_table", 
    columnName = "id",
    lowerBound = 1,
    upperBound = 1000000,
    numPartitions = 10,
    connectionProperties = connectionProperties
  )

// With custom query
val customQuery = """
  (SELECT u.id, u.name, p.title 
   FROM users u JOIN profiles p ON u.id = p.user_id 
   WHERE u.active = true) as query
"""

val queryResult = spark.read
  .jdbc("jdbc:mysql://localhost:3306/mydb", customQuery, connectionProperties)

// With predicates for parallel reading
val predicates = Array(
  "age < 25",
  "age >= 25 AND age < 50", 
  "age >= 50"
)

val parallelJDBC = spark.read
  .jdbc("jdbc:mysql://localhost:3306/mydb", "users", predicates, connectionProperties)

Generic Data Sources

class DataFrameReader {
  def format(source: String): DataFrameReader
  def load(): DataFrame
  def load(path: String): DataFrame
}

Usage Examples:

// Delta Lake (third-party format)
val deltaDF = spark.read
  .format("delta")
  .load("path/to/delta/table")

// Avro files
val avroDF = spark.read
  .format("avro")
  .load("data.avro")

// Custom data source
val customDF = spark.read
  .format("com.example.CustomDataSource")
  .option("customOption", "value")
  .load("path/to/data")

DataFrameWriter

class DataFrameWriter[T] {
  def mode(saveMode: SaveMode): DataFrameWriter[T]
  def mode(saveMode: String): DataFrameWriter[T]
  def format(source: String): DataFrameWriter[T]
  def option(key: String, value: String): DataFrameWriter[T]
  def option(key: String, value: Boolean): DataFrameWriter[T]
  def option(key: String, value: Long): DataFrameWriter[T]
  def option(key: String, value: Double): DataFrameWriter[T]
  def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
  def options(options: java.util.Map[String, String]): DataFrameWriter[T]
  def partitionBy(colNames: String*): DataFrameWriter[T]
  def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
  def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
  def save(): Unit
  def save(path: String): Unit
}

object SaveMode extends Enumeration {
  val Overwrite, Append, ErrorIfExists, Ignore = Value
}

Writing to Files

Parquet Files

class DataFrameWriter[T] {
  def parquet(path: String): Unit
}

Usage Examples:

// Basic Parquet writing
df.write.parquet("output.parquet")

// With save mode
df.write
  .mode(SaveMode.Overwrite)
  .parquet("data/output.parquet")

// Partitioned writing
df.write
  .partitionBy("year", "month")
  .parquet("partitioned_data/")

// With options
df.write
  .option("compression", "snappy")
  .mode("overwrite")
  .parquet("compressed_output.parquet")

JSON Files

class DataFrameWriter[T] {
  def json(path: String): Unit
}

Usage Examples:

// Write JSON
df.write.json("output.json")

// With formatting options
df.write
  .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
  .option("dateFormat", "yyyy-MM-dd")
  .mode("overwrite")
  .json("formatted_output.json")

CSV Files

class DataFrameWriter[T] {
  def csv(path: String): Unit
}

Usage Examples:

// Write CSV
df.write.csv("output.csv")

// With options
df.write
  .option("header", "true")
  .option("sep", "|")
  .mode("overwrite")
  .csv("data_with_header.csv")

ORC Files

class DataFrameWriter[T] {
  def orc(path: String): Unit
}

Usage Examples:

// Write ORC
df.write.orc("output.orc")

// With compression
df.write
  .option("compression", "zlib")
  .mode("overwrite")
  .orc("compressed.orc")

Text Files

class DataFrameWriter[T] {
  def text(path: String): Unit
}

Usage Examples:

// Write text (requires single string column)
df.select("message").write.text("logs.txt")

// Combine columns first
df.select(concat_ws(",", col("name"), col("age")).alias("line"))
  .write
  .text("combined_output.txt")

Table Operations

class DataFrameWriter[T] {
  def saveAsTable(tableName: String): Unit
  def insertInto(tableName: String): Unit
}

Usage Examples:

// Save as managed table
df.write
  .mode("overwrite")
  .saveAsTable("my_database.my_table")

// Insert into existing table
df.write
  .mode("append")
  .insertInto("existing_table")

// Partitioned table
df.write
  .partitionBy("year", "month")
  .mode("overwrite")
  .saveAsTable("partitioned_table")

JDBC Writing

class DataFrameWriter[T] {
  def jdbc(url: String, table: String, connectionProperties: Properties): Unit
}

Usage Examples:

// Write to database
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password") 
connectionProperties.put("driver", "com.mysql.jdbc.Driver")

df.write
  .mode("overwrite")
  .jdbc("jdbc:mysql://localhost:3306/mydb", "users", connectionProperties)

// With additional options
df.write
  .option("batchsize", "10000")
  .option("truncate", "true")
  .mode("overwrite")
  .jdbc("jdbc:postgresql://localhost:5432/mydb", "large_table", connectionProperties)

Advanced I/O Patterns

Bucketing

// Write bucketed data for optimized joins
df.write
  .bucketBy(42, "user_id")
  .sortBy("timestamp")
  .mode("overwrite")
  .saveAsTable("bucketed_events")

Multi-format Writing

// Write same data to multiple formats
val writer = df.write.mode("overwrite")

writer.parquet("data.parquet")
writer.json("data.json")
writer.option("header", "true").csv("data.csv")

Conditional Writing

// Write different partitions to different locations
df.filter(col("region") === "US")
  .write
  .mode("overwrite")
  .parquet("us_data/")

df.filter(col("region") === "EU")
  .write
  .mode("overwrite")
  .parquet("eu_data/")

Schema Evolution

// Handle schema changes in Parquet
df.write
  .option("mergeSchema", "true")
  .mode("append")
  .parquet("evolving_schema/")

Data Source Options Reference

Common Options (All Formats)

  • path: File system path
  • recursiveFileLookup: Recursively search subdirectories (default: false)
  • pathGlobFilter: Glob pattern for file filtering
  • modifiedBefore: Only files modified before timestamp
  • modifiedAfter: Only files modified after timestamp

Compression Options

  • Parquet: none, snappy, gzip, lzo, brotli, lz4, zstd
  • JSON: none, bzip2, gzip, lz4, snappy, deflate
  • ORC: none, zlib, snappy, lzo, lz4, zstd

Performance Tuning

  • Use appropriate file sizes (128MB-1GB for Parquet)
  • Partition by frequently filtered columns
  • Use columnar formats (Parquet, ORC) for analytical workloads
  • Enable predicate pushdown with appropriate schemas
  • Consider bucketing for frequently joined tables