or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddata-type-conversion.mdfile-formats.mdindex.mdmetastore-operations.mdsession-management.mdudf-integration.md
tile.json

file-formats.mddocs/

File Formats

Native support for Hive file formats including traditional Hive tables and optimized ORC files with Hive compatibility. This enables reading and writing data in various Hive-compatible formats while leveraging Spark's performance optimizations.

Capabilities

HiveFileFormat

FileFormat implementation for traditional Hive tables supporting various SerDes and storage formats.

⚠️ Implementation Status: Currently write-only. Read operations are not implemented and will throw UnsupportedOperationException.

/**
 * FileFormat implementation for Hive tables
 * Currently supports WRITING data using Hive SerDes and OutputFormat classes
 * Reading is not implemented - use HiveTableScanExec for reading Hive tables
 */
class HiveFileFormat extends FileFormat with DataSourceRegister with Logging {
  
  /** Data source short name for SQL registration */
  override def shortName(): String = "hive"
  
  /**
   * Schema inference is not supported - throws UnsupportedOperationException
   * Schema must be provided from Hive metastore
   */
  override def inferSchema(
    sparkSession: SparkSession,
    options: Map[String, String], 
    files: Seq[FileStatus]
  ): Option[StructType]
  
  /**
   * Prepare write operations for Hive tables
   * @param sparkSession - Current SparkSession
   * @param job - Hadoop Job configuration
   * @param options - Write options including SerDe settings
   * @param dataSchema - Schema of data to write
   * @return OutputWriterFactory for creating individual file writers
   */
  override def prepareWrite(
    sparkSession: SparkSession,
    job: Job,
    options: Map[String, String],
    dataSchema: StructType
  ): OutputWriterFactory
  
  /**
   * Build reader for scanning Hive table files
   * @param sparkSession - Current SparkSession  
   * @param dataSchema - Schema of data to read
   * @param partitionSchema - Schema of partition columns
   * @param requiredSchema - Columns actually needed by query
   * @param filters - Pushdown predicates
   * @param options - Read options including SerDe settings
   * @param hadoopConf - Hadoop configuration
   * @return Function to create PartitionedFile readers
   */
  override def buildReader(
    sparkSession: SparkSession,
    dataSchema: StructType,
    partitionSchema: StructType,
    requiredSchema: StructType,
    filters: Seq[Filter],
    options: Map[String, String],
    hadoopConf: Configuration
  ): PartitionedFile => Iterator[InternalRow]
}

Usage Examples:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .enableHiveSupport()
  .getOrCreate()

// Create Hive table with specific file format
spark.sql("""
  CREATE TABLE text_table (
    id INT,
    name STRING,
    age INT
  )
  STORED AS TEXTFILE
  LOCATION '/user/data/text_table'
""")

// Read from Hive table (automatically uses HiveFileFormat)
val df = spark.sql("SELECT * FROM text_table")
df.show()

// Write to Hive table with custom SerDe
spark.sql("""
  CREATE TABLE custom_serde_table (
    id INT,
    data STRING
  )
  ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
  WITH SERDEPROPERTIES (
    "separatorChar" = ",",
    "quoteChar" = "\"",
    "escapeChar" = "\\"
  )
  STORED AS TEXTFILE
""")

OrcFileFormat

Hive-compatible ORC file format implementation with optimizations.

⚠️ Implementation Note: There are two ORC implementations in Spark:

  • Hive ORC (org.apache.spark.sql.hive.orc.OrcFileFormat) - Uses Hive ORC libraries for compatibility
  • Core ORC (org.apache.spark.sql.execution.datasources.orc.OrcFileFormat) - Uses Apache ORC directly for better performance

The Hive ORC implementation documented here provides better Hive compatibility but may have different performance characteristics.

/**
 * Hive-compatible ORC file format implementation
 * Provides native ORC reading/writing with Hive metastore integration
 */
class OrcFileFormat extends FileFormat with DataSourceRegister with Logging {
  
  /** Data source short name */
  override def shortName(): String = "orc"
  
  /**
   * Infer schema from ORC files
   * @param sparkSession - Current SparkSession
   * @param options - Read options
   * @param files - ORC files to analyze
   * @return Inferred schema or None if cannot infer
   */
  override def inferSchema(
    sparkSession: SparkSession,
    options: Map[String, String],
    files: Seq[FileStatus]
  ): Option[StructType]
  
  /**
   * Build optimized ORC reader with predicate pushdown
   * @param sparkSession - Current SparkSession
   * @param dataSchema - Schema of data in files
   * @param partitionSchema - Partition column schema
   * @param requiredSchema - Columns needed by query
   * @param filters - Pushdown predicates for ORC row groups
   * @param options - Read options
   * @param hadoopConf - Hadoop configuration
   * @return Reader function for ORC files
   */
  override def buildReader(
    sparkSession: SparkSession,
    dataSchema: StructType,
    partitionSchema: StructType, 
    requiredSchema: StructType,
    filters: Seq[Filter],
    options: Map[String, String],
    hadoopConf: Configuration
  ): PartitionedFile => Iterator[InternalRow]
  
  /**
   * Prepare ORC write operations
   * @param sparkSession - Current SparkSession
   * @param job - Hadoop Job for configuration
   * @param options - Write options including compression
   * @param dataSchema - Schema of data to write
   * @return OutputWriterFactory for creating ORC writers
   */
  override def prepareWrite(
    sparkSession: SparkSession,
    job: Job,
    options: Map[String, String],
    dataSchema: StructType
  ): OutputWriterFactory
  
  /**
   * Check if vectorized reading is supported
   * @param requiredSchema - Required columns
   * @return true if vectorized reading can be used
   */
  override def supportBatch(requiredSchema: StructType): Boolean = true
}

Usage Examples:

// Create ORC table  
spark.sql("""
  CREATE TABLE orc_table (
    id BIGINT,
    name STRING,
    age INT,
    salary DOUBLE
  )
  STORED AS ORC
  LOCATION '/user/data/orc_table'
  TBLPROPERTIES (
    'orc.compress' = 'SNAPPY',
    'orc.bloom.filter.columns' = 'id,name'
  )
""")

// Write data to ORC table with compression
df.write
  .mode("overwrite")
  .option("compression", "snappy")
  .format("orc") 
  .saveAsTable("orc_table")

// Read with predicate pushdown (automatically optimized)
val filtered = spark.sql("""
  SELECT name, salary 
  FROM orc_table 
  WHERE age > 25 AND salary > 50000
""")
filtered.explain(true) // Shows predicate pushdown

HiveOptions Configuration

Configuration class for Hive-specific format options.

/**
 * Configuration options for Hive data source operations
 */
class HiveOptions(parameters: Map[String, String]) {
  
  /** File format specification (e.g., "textfile", "sequencefile", "orc") */
  val fileFormat: Option[String] = parameters.get(HiveOptions.FILE_FORMAT)
  
  /** Input format class name */
  val inputFormat: Option[String] = parameters.get(HiveOptions.INPUT_FORMAT)
  
  /** Output format class name */
  val outputFormat: Option[String] = parameters.get(HiveOptions.OUTPUT_FORMAT)
  
  /** SerDe class name */
  val serde: Option[String] = parameters.get(HiveOptions.SERDE)
  
  /**
   * Check if input/output formats are explicitly specified
   * @return true if both input and output formats are provided
   */
  def hasInputOutputFormat: Boolean = inputFormat.isDefined && outputFormat.isDefined
  
  /**
   * Get SerDe properties from options
   * @return Map of SerDe-specific properties
   */
  def serdeProperties: Map[String, String] = {
    parameters.filterKeys(!HiveOptions.delimiterOptions.contains(_))
  }
}

object HiveOptions {
  // Option key constants
  val FILE_FORMAT = "fileFormat"
  val INPUT_FORMAT = "inputFormat"
  val OUTPUT_FORMAT = "outputFormat" 
  val SERDE = "serde"
  
  // Common delimiter option mappings
  val delimiterOptions: Map[String, String] = Map(
    "field.delim" -> "field.delim",
    "line.delim" -> "line.delim",
    "collection.delim" -> "collection.delim",
    "mapkey.delim" -> "mapkey.delim"
  )
  
  /**
   * Get compression configuration for Hive writes
   * @param sessionState - Hive SessionState
   * @param hadoopConf - Hadoop configuration
   * @param compressionCodec - Optional compression codec override
   * @return Compression codec to use or None
   */
  def getHiveWriteCompression(
    sessionState: SessionState,
    hadoopConf: Configuration,
    compressionCodec: Option[String]
  ): Option[String]
}

File Format Examples

Comprehensive examples for different Hive file formats.

TextFile Format:

// Create table with TextFile format and custom delimiters
spark.sql("""
  CREATE TABLE csv_data (
    id INT,
    name STRING,
    email STRING,
    age INT
  )
  ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
  WITH SERDEPROPERTIES (
    'field.delim' = ',',
    'line.delim' = '\n',
    'serialization.format' = ','
  )
  STORED AS TEXTFILE
  LOCATION '/data/csv'
""")

// Load CSV data
spark.sql("""
  LOAD DATA INPATH '/input/data.csv' 
  INTO TABLE csv_data
""")

SequenceFile Format:

// Create SequenceFile table
spark.sql("""
  CREATE TABLE sequence_data (
    key STRING,
    value STRING  
  )
  STORED AS SEQUENCEFILE
  LOCATION '/data/sequence'
""")

// Write data in SequenceFile format
df.write
  .format("hive")
  .option("fileFormat", "sequencefile")
  .mode("overwrite")
  .saveAsTable("sequence_data")

Avro Format:

// Create Avro table
spark.sql("""
  CREATE TABLE avro_data (
    id BIGINT,
    name STRING,
    metadata MAP<STRING,STRING>
  )
  STORED AS AVRO
  LOCATION '/data/avro'
  TBLPROPERTIES (
    'avro.schema.literal' = '{
      "type": "record",
      "name": "User", 
      "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"},
        {"name": "metadata", "type": {"type": "map", "values": "string"}}
      ]
    }'
  )
""")

Parquet Format (with Hive compatibility):

// Create Parquet table with Hive metastore
spark.sql("""
  CREATE TABLE parquet_data (
    id BIGINT,
    name STRING,
    created_date DATE
  )
  STORED AS PARQUET
  LOCATION '/data/parquet'
  TBLPROPERTIES (
    'parquet.compression' = 'SNAPPY'
  )
""")

// Automatic conversion to Spark's native Parquet reader
// (controlled by spark.sql.hive.convertMetastoreParquet)
val df = spark.sql("SELECT * FROM parquet_data")
df.explain() // Shows either HiveTableRelation or parquet scan

Advanced File Format Operations

Custom SerDe Integration:

// Register custom SerDe
spark.sql("ADD JAR /path/to/custom-serde.jar")

spark.sql("""
  CREATE TABLE json_data (
    id INT,
    data STRING
  )
  ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
  STORED AS TEXTFILE
  LOCATION '/data/json'
""")

// Use custom SerDe for complex data
val jsonDF = spark.sql("SELECT get_json_object(data, '$.user.name') as username FROM json_data")

Multi-Format Table Operations:

// Create partitioned table with different formats per partition
spark.sql("""
  CREATE TABLE multi_format_data (
    id INT,
    name STRING,
    value DOUBLE
  )
  PARTITIONED BY (format_type STRING)
  STORED AS TEXTFILE
  LOCATION '/data/multi_format'
""")

// Add partitions with different storage formats
spark.sql("""
  ALTER TABLE multi_format_data 
  ADD PARTITION (format_type='text')
  LOCATION '/data/multi_format/text'
""")

spark.sql("""
  ALTER TABLE multi_format_data
  ADD PARTITION (format_type='orc') 
  LOCATION '/data/multi_format/orc'
""")

Compression Configuration:

// Configure compression for different formats
val spark = SparkSession.builder()
  .config("spark.sql.hive.convertMetastoreOrc", "true")
  .config("spark.sql.orc.compression.codec", "snappy")
  .config("spark.sql.parquet.compression.codec", "gzip")
  .enableHiveSupport()
  .getOrCreate()

// Write with specific compression
df.write
  .format("orc")
  .option("compression", "zlib")
  .mode("overwrite")
  .saveAsTable("compressed_table")

Performance Optimization

Vectorized Reading:

// Enable vectorized ORC reading
spark.conf.set("spark.sql.orc.impl", "hive")
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")

// Query benefits from vectorization
val result = spark.sql("""
  SELECT sum(salary), avg(age)
  FROM large_orc_table
  WHERE department = 'Engineering'
""")
result.explain() // Shows vectorized operations

Predicate Pushdown:

// ORC predicate pushdown automatically applied
val filtered = spark.sql("""
  SELECT name, salary
  FROM employee_orc
  WHERE hire_date >= '2020-01-01' 
    AND department IN ('Engineering', 'Sales')
    AND salary > 75000
""")

// Check pushdown in query plan
filtered.explain(true)
// Shows: PushedFilters: [IsNotNull(hire_date), GreaterThanOrEqual(hire_date,...)]

Schema Evolution:

// Handle schema evolution in ORC files
spark.conf.set("spark.sql.orc.mergeSchema", "true")

// Read tables with evolved schemas
val evolvedDF = spark.sql("SELECT * FROM evolved_orc_table")
evolvedDF.printSchema() // Shows merged schema from all files

File Format Utilities

Format Detection and Conversion:

// Check table storage format
val tableInfo = spark.sql("DESCRIBE FORMATTED my_table").collect()
val storageFormat = tableInfo.find(_.getString(0) == "InputFormat").map(_.getString(1))

// Convert table format
spark.sql("""
  CREATE TABLE orc_converted
  STORED AS ORC
  AS SELECT * FROM textfile_table
""")

// Optimize table by converting format
spark.sql("""
  INSERT OVERWRITE TABLE existing_table
  SELECT * FROM existing_table
""") // Uses current table's optimal format

File Statistics and Metadata:

// Get file-level statistics for ORC
val stats = spark.sql("""
  SELECT 
    input_file_name() as filename,
    count(*) as row_count
  FROM orc_table
  GROUP BY input_file_name()
""")
stats.show()

// Analyze table statistics  
spark.sql("ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS")
spark.sql("DESCRIBE EXTENDED my_table").show(100, false)

Error Handling

Common error patterns and solutions for file format operations:

import org.apache.spark.sql.AnalysisException

try {
  spark.sql("SELECT * FROM malformed_table")
} catch {
  case e: AnalysisException if e.getMessage.contains("SerDe") =>
    println("SerDe configuration error - check SerDe properties")
  case e: AnalysisException if e.getMessage.contains("InputFormat") =>
    println("InputFormat error - verify file format configuration")
  case e: Exception =>
    println(s"File format error: ${e.getMessage}")
}

// Handle missing files gracefully
val safeDF = try {
  spark.sql("SELECT * FROM potentially_missing_table")
} catch {
  case _: AnalysisException =>
    spark.emptyDataFrame
}