or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mdcolumns-functions.mddata-io.mddataset-dataframe.mdindex.mdsession-management.mdstreaming.mdtypes-encoders.mdudfs.md
tile.json

data-io.mddocs/

Data I/O Operations

Reading and writing data from various sources and formats. Supports batch and streaming data with extensive configuration options, built-in format support, and custom data source integration.

Capabilities

DataFrameReader

Interface for reading data from external storage systems into DataFrames.

/**
 * Interface for reading data from external storage systems
 */
class DataFrameReader {
  /** Specify data source format */
  def format(source: String): DataFrameReader
  
  /** Set schema for the data */
  def schema(schema: StructType): DataFrameReader
  def schema(schemaString: String): DataFrameReader
  
  /** Set options for the data source */
  def option(key: String, value: String): DataFrameReader
  def option(key: String, value: Boolean): DataFrameReader
  def option(key: String, value: Long): DataFrameReader
  def option(key: String, value: Double): DataFrameReader
  def options(options: scala.collection.Map[String, String]): DataFrameReader
  def options(options: java.util.Map[String, String]): DataFrameReader
  
  /** Load data using generic interface */
  def load(): DataFrame
  def load(path: String): DataFrame  
  def load(paths: String*): DataFrame
  
  /** Built-in format readers */
  def json(path: String): DataFrame
  def json(paths: String*): DataFrame
  def json(jsonRDD: RDD[String]): DataFrame
  def json(jsonDataset: Dataset[String]): DataFrame
  
  def parquet(paths: String*): DataFrame
  
  def orc(paths: String*): DataFrame
  
  def text(paths: String*): DataFrame
  def textFile(paths: String*): Dataset[String]
  
  def csv(paths: String*): DataFrame
  
  def table(tableName: String): DataFrame
  
  /** JDBC data source */
  def jdbc(url: String, table: String, properties: java.util.Properties): DataFrame
  def jdbc(url: String, table: String, predicates: Array[String], 
           connectionProperties: java.util.Properties): DataFrame
  def jdbc(url: String, table: String, columnName: String, 
           lowerBound: Long, upperBound: Long, numPartitions: Int,
           connectionProperties: java.util.Properties): DataFrame
}

Usage Examples:

// JSON with schema inference
val df1 = spark.read
  .option("multiline", "true")
  .json("path/to/file.json")

// CSV with custom options
val df2 = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .option("delimiter", ",")
  .csv("path/to/file.csv")

// Parquet (schema preserved)
val df3 = spark.read.parquet("path/to/*.parquet")

// With explicit schema
import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true),
  StructField("salary", DoubleType, nullable = true)
))

val df4 = spark.read
  .schema(schema)
  .option("header", "true")
  .csv("employees.csv")

// JDBC connection
val df5 = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost:5432/mydb")
  .option("dbtable", "employees")
  .option("user", "username")
  .option("password", "password")
  .load()

// Custom data source
val df6 = spark.read
  .format("org.apache.spark.sql.cassandra")
  .option("keyspace", "mykeyspace")
  .option("table", "mytable")
  .load()

DataFrameWriter

Interface for writing Dataset to external storage systems.

/**
 * Interface for writing Dataset to external storage systems
 * @tparam T Type of the Dataset
 */
class DataFrameWriter[T] {
  /** Specify output format */
  def format(source: String): DataFrameWriter[T]
  
  /** Set save mode */
  def mode(saveMode: SaveMode): DataFrameWriter[T]
  def mode(saveMode: String): DataFrameWriter[T]
  
  /** Set options for the data source */
  def option(key: String, value: String): DataFrameWriter[T]
  def option(key: String, value: Boolean): DataFrameWriter[T]
  def option(key: String, value: Long): DataFrameWriter[T]
  def option(key: String, value: Double): DataFrameWriter[T]
  def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
  def options(options: java.util.Map[String, String]): DataFrameWriter[T]
  
  /** Partition output by columns */
  def partitionBy(colNames: String*): DataFrameWriter[T]
  
  /** Bucket output by columns */
  def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
  def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
  
  /** Save using generic interface */
  def save(): Unit
  def save(path: String): Unit
  
  /** Built-in format writers */
  def json(path: String): Unit
  def parquet(path: String): Unit
  def orc(path: String): Unit
  def text(path: String): Unit
  def csv(path: String): Unit
  
  /** Save as table */
  def saveAsTable(tableName: String): Unit
  def insertInto(tableName: String): Unit
  
  /** JDBC output */
  def jdbc(url: String, table: String, connectionProperties: java.util.Properties): Unit
}

/**
 * Save modes for writing data
 */
object SaveMode extends Enumeration {
  type SaveMode = Value
  val Append, Overwrite, ErrorIfExists, Ignore = Value
}

Usage Examples:

val df = spark.table("employees")

// Basic save operations
df.write
  .mode(SaveMode.Overwrite)
  .parquet("output/employees.parquet")

df.write
  .mode("append")
  .option("header", "true")
  .csv("output/employees.csv")

// Partitioned output
df.write
  .mode(SaveMode.Overwrite)
  .partitionBy("department", "year")
  .parquet("output/employees_partitioned")

// Bucketed output
df.write
  .mode(SaveMode.Overwrite)
  .bucketBy(10, "employee_id")
  .sortBy("salary")
  .saveAsTable("bucketed_employees")

// JDBC output
df.write
  .mode(SaveMode.Overwrite)
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost:5432/mydb")
  .option("dbtable", "employees")
  .option("user", "username")
  .option("password", "password")
  .save()

// Custom format with options
df.write
  .mode(SaveMode.Append)
  .format("delta")
  .option("mergeSchema", "true")
  .save("path/to/delta-table")

Common Data Source Options

Configuration options for built-in data sources.

// CSV Options
object CsvOptions {
  val DELIMITER = "delimiter"          // Field delimiter (default: ",")
  val QUOTE = "quote"                  // Quote character (default: "\"")
  val ESCAPE = "escape"                // Escape character (default: "\")
  val HEADER = "header"                // First line is header (default: "false")
  val INFER_SCHEMA = "inferSchema"     // Infer schema from data (default: "false")
  val NULL_VALUE = "nullValue"         // String representation of null (default: "")
  val DATE_FORMAT = "dateFormat"       // Date format (default: "yyyy-MM-dd")
  val TIMESTAMP_FORMAT = "timestampFormat" // Timestamp format
  val MAX_COLUMNS = "maxColumns"       // Maximum number of columns
  val MAX_CHARS_PER_COLUMN = "maxCharsPerColumn" // Max chars per column
  val ENCODING = "encoding"            // Character encoding (default: "UTF-8")
  val COMMENT = "comment"              // Comment character
  val MODE = "mode"                    // Parse mode: PERMISSIVE, DROPMALFORMED, FAILFAST
}

// JSON Options  
object JsonOptions {
  val ALLOW_COMMENTS = "allowComments"         // Allow comments (default: "false")
  val ALLOW_UNQUOTED_FIELD_NAMES = "allowUnquotedFieldNames" // Allow unquoted field names
  val ALLOW_SINGLE_QUOTES = "allowSingleQuotes" // Allow single quotes
  val ALLOW_NUMERIC_LEADING_ZEROS = "allowNumericLeadingZeros" // Allow leading zeros
  val ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER = "allowBackslashEscapingAnyCharacter"
  val MULTILINE = "multiline"                  // Parse multiline JSON (default: "false")
  val DATE_FORMAT = "dateFormat"               // Date format
  val TIMESTAMP_FORMAT = "timestampFormat"     // Timestamp format
  val PRIMITIVE_AS_STRING = "primitivesAsString" // Parse primitives as strings
}

// Parquet Options
object ParquetOptions {
  val MERGE_SCHEMA = "mergeSchema"             // Merge schemas from multiple files
  val COMPRESSION = "compression"              // Compression codec: none, snappy, gzip, lzo
  val DICTIONARY_ENCODING = "dictionaryEncoding" // Use dictionary encoding
}

// JDBC Options
object JdbcOptions {
  val DRIVER = "driver"                        // JDBC driver class name
  val USER = "user"                           // Username
  val PASSWORD = "password"                   // Password
  val FETCH_SIZE = "fetchsize"                // JDBC fetch size
  val BATCH_SIZE = "batchsize"                // JDBC batch size for inserts
  val ISOLATION_LEVEL = "isolationLevel"      // Transaction isolation level
  val NUM_PARTITIONS = "numPartitions"        // Number of partitions for parallel reads
  val PARTITION_COLUMN = "partitionColumn"    // Column for partitioning
  val LOWER_BOUND = "lowerBound"              // Lower bound for partitioning
  val UPPER_BOUND = "upperBound"              // Upper bound for partitioning
  val QUERY_TIMEOUT = "queryTimeout"          // Query timeout in seconds
  val CREATE_TABLE_OPTIONS = "createTableOptions" // Options for CREATE TABLE
  val CREATE_TABLE_COLUMN_TYPES = "createTableColumnTypes" // Column types for CREATE TABLE
  val CUSTOM_SCHEMA = "customSchema"          // Custom schema for reading
}

Built-in Data Sources

Support for various data formats and storage systems.

// File-based sources
val CSV_SOURCE = "csv"
val JSON_SOURCE = "json"  
val PARQUET_SOURCE = "parquet"
val ORC_SOURCE = "orc"
val TEXT_SOURCE = "text"
val AVRO_SOURCE = "avro"          // Requires spark-avro package

// Database sources
val JDBC_SOURCE = "jdbc"

// Big data sources
val HIVE_SOURCE = "hive"
val DELTA_SOURCE = "delta"        // Requires Delta Lake
val ICEBERG_SOURCE = "iceberg"    // Requires Apache Iceberg

// Cloud sources (require appropriate dependencies)
val S3_SOURCE = "s3"
val AZURE_SOURCE = "azure"
val GCS_SOURCE = "gcs"

// Streaming sources
val KAFKA_SOURCE = "kafka"
val SOCKET_SOURCE = "socket"
val RATE_SOURCE = "rate"          // For testing

Advanced I/O Patterns

Common patterns for data ingestion and output.

Multi-format reading:

// Read from multiple formats
val jsonDf = spark.read.json("data/*.json")
val csvDf = spark.read.option("header", "true").csv("data/*.csv")
val combined = jsonDf.union(csvDf)

// Schema evolution with Parquet
val df1 = spark.read.parquet("data/year=2021")
val df2 = spark.read.parquet("data/year=2022")  
val merged = df1.union(df2)

Optimized writes:

// Optimize partition size
df.repartition(200).write
  .mode(SaveMode.Overwrite)
  .parquet("output/data")

// Coalesce for fewer files
df.coalesce(10).write
  .mode(SaveMode.Overwrite)
  .json("output/data")

// Dynamic partitioning
df.write
  .mode(SaveMode.Overwrite)
  .partitionBy("year", "month")
  .option("maxRecordsPerFile", "100000")
  .parquet("output/partitioned_data")

Error handling:

// Handle malformed records
val df = spark.read
  .option("mode", "DROPMALFORMED")  // or "PERMISSIVE", "FAILFAST"
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .json("data/potentially_bad.json")

// Validate data after read
val validDf = df.filter(col("_corrupt_record").isNull)
val corruptDf = df.filter(col("_corrupt_record").isNotNull)

Custom data sources:

// Register custom format
spark.sql("CREATE TABLE custom_table USING org.example.CustomDataSource OPTIONS (path 'data/custom')")

// Use programmatically
val df = spark.read
  .format("org.example.CustomDataSource")
  .option("customOption", "value")
  .load("data/custom")