or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-hive-integration.mdexecution-engine.mdfile-formats.mdindex.mdmetastore-operations.mdudf-integration.md
tile.json

file-formats.mddocs/

File Format Support

Apache Spark Hive integration provides comprehensive support for various file formats, with native optimized readers and writers for ORC and Parquet formats, as well as compatibility with traditional Hive file formats.

ORC File Format Support

Spark provides native ORC support with advanced optimizations including vectorized reading, predicate pushdown, and column pruning.

OrcFileFormat

class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable {
  // DataSourceRegister interface
  def shortName(): String  // Returns "orc"
  
  // FileFormat interface
  def inferSchema(
    sparkSession: SparkSession,
    options: Map[String, String], 
    files: Seq[FileStatus]
  ): Option[StructType]
  
  def prepareWrite(
    sparkSession: SparkSession,
    job: Job,
    options: Map[String, String],
    dataSchema: StructType
  ): OutputWriterFactory
  
  def buildReader(
    sparkSession: SparkSession,
    dataSchema: StructType,
    partitionSchema: StructType,
    requiredSchema: StructType,
    filters: Seq[Filter],
    options: Map[String, String],
    hadoopConf: Configuration
  ): PartitionedFile => Iterator[InternalRow]
}

Using ORC Format

Reading ORC Files:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .enableHiveSupport()
  .getOrCreate()

// Read ORC files directly
val df = spark.read.format("orc").load("/path/to/orc/files")

// Read ORC table through Hive metastore
val table = spark.table("my_orc_table")

// Read with options
val dfWithOptions = spark.read
  .format("orc")
  .option("mergeSchema", "true")
  .load("/path/to/orc/files")

Writing ORC Files:

// Write DataFrame as ORC
df.write
  .format("orc")
  .option("compression", "snappy")
  .save("/path/to/output/orc")

// Write to Hive table
df.write
  .format("orc")
  .mode("overwrite")
  .saveAsTable("my_new_orc_table")

// Partitioned write
df.write
  .format("orc")
  .partitionBy("year", "month")
  .save("/path/to/partitioned/orc")

ORC Configuration Options

class OrcOptions(parameters: CaseInsensitiveMap[String]) {
  def compressionCodec: String
  def enableVectorizedReader: Boolean
  def mergeSchema: Boolean
}

Available ORC Options:

  • compression: Compression codec ("none", "snappy", "zlib", "lzo", "lz4", "zstd")
  • enableVectorizedReader: Enable vectorized ORC reader (default: true)
  • mergeSchema: Merge schemas when reading multiple files (default: false)

ORC File Operations

object OrcFileOperator extends Logging {
  def readSchema(files: Seq[String], conf: Option[Configuration]): Option[StructType]
  def listOrcFiles(path: String, hadoopConf: Configuration): Seq[String]
  def getRowCount(file: String, conf: Configuration): Long
}

Usage Example:

import org.apache.spark.sql.hive.orc.OrcFileOperator

// Read schema from ORC files
val schema = OrcFileOperator.readSchema(Seq("/path/to/file.orc"), None)
println(s"Schema: ${schema.get.treeString}")

// Get row count
val conf = spark.sparkContext.hadoopConfiguration
val rowCount = OrcFileOperator.getRowCount("/path/to/file.orc", conf)

ORC Filter Pushdown

object OrcFilters extends Logging {
  def createFilter(filters: Seq[Filter]): Option[SearchArgument]
  def buildSearchArgument(
    dataTypeMap: Map[String, DataType],
    filters: Seq[Filter]
  ): Option[SearchArgument]
}

ORC supports predicate pushdown for:

  • Equality filters (=)
  • Comparison filters (<, <=, >, >=)
  • IN predicates
  • IS NULL / IS NOT NULL
  • String pattern matching (LIKE)
  • Logical combinations (AND, OR, NOT)

Hive File Format Support

Support for traditional Hive file formats using Hive SerDes and input/output formats.

HiveFileFormat

class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
  def prepareWrite(
    sparkSession: SparkSession,
    job: Job,
    options: Map[String, String],
    dataSchema: StructType
  ): OutputWriterFactory
  
  def buildReader(
    sparkSession: SparkSession,
    dataSchema: StructType,
    partitionSchema: StructType,
    requiredSchema: StructType,
    filters: Seq[Filter],
    options: Map[String, String],
    hadoopConf: Configuration
  ): PartitionedFile => Iterator[InternalRow]
}

HiveOptions

Configuration for Hive-compatible file formats and SerDes.

class HiveOptions(parameters: CaseInsensitiveMap[String]) {
  def fileFormat: String
  def inputFormat: String  
  def outputFormat: String
  def serde: String
  def serdeProperties: Map[String, String]
}

Supported Hive File Formats

Text Files:

// Create table with text file format
spark.sql("""
CREATE TABLE text_table (
  id INT,
  name STRING
) USING HIVE
STORED AS TEXTFILE
""")

Sequence Files:

spark.sql("""
CREATE TABLE seq_table (
  id INT,
  name STRING  
) USING HIVE
STORED AS SEQUENCEFILE
""")

Avro Files:

spark.sql("""
CREATE TABLE avro_table (
  id INT,
  name STRING
) USING HIVE
STORED AS AVRO
""")

Custom SerDe:

spark.sql("""
CREATE TABLE custom_table (
  id INT,
  name STRING
) USING HIVE
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'field.delim' = '\t',
  'line.delim' = '\n'
)
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
""")

Parquet Integration

While Parquet support is primarily handled by Spark's native Parquet reader, the Hive integration provides compatibility for Hive-created Parquet tables.

Parquet Configuration

// Enable native Parquet reader for Hive tables
spark.conf.set("spark.sql.hive.convertMetastoreParquet", "true")

// Configure Parquet options
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")
spark.conf.set("spark.sql.parquet.mergeSchema", "false")

Reading Hive Parquet Tables

// Read Hive Parquet table with native reader
val df = spark.table("my_parquet_table")

// Force use of Hive SerDe (not recommended)
spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")
val dfWithSerde = spark.table("my_parquet_table")

Advanced File Format Features

Schema Evolution

Support for schema evolution in ORC and Parquet formats:

// Enable schema merging for ORC
val df = spark.read
  .format("orc")
  .option("mergeSchema", "true")
  .load("/path/to/evolved/schema")

// Handle missing columns
val dfWithDefaults = spark.read
  .format("orc")
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .load("/path/to/files")

Compression Support

ORC Compression Options:

  • NONE
  • ZLIB (default)
  • SNAPPY
  • LZO
  • LZ4
  • ZSTD

Setting Compression:

// For writes
df.write
  .format("orc")
  .option("compression", "snappy")
  .save("/path/to/output")

// Global setting
spark.conf.set("spark.sql.orc.compression.codec", "snappy")

Partition Support

File formats support both static and dynamic partitioning:

// Static partitioning
df.write
  .format("orc")
  .partitionBy("year", "month")
  .save("/partitioned/data")

// Dynamic partitioning in Hive tables
spark.sql("SET hive.exec.dynamic.partition = true")
spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")

spark.sql("""
INSERT INTO TABLE partitioned_table 
PARTITION(year, month)
SELECT id, name, year, month FROM source_table
""")

Bucketing

Support for bucketed tables for improved join performance:

// Create bucketed table
spark.sql("""
CREATE TABLE bucketed_table (
  id INT,
  name STRING,
  department STRING
) USING HIVE
CLUSTERED BY (id) INTO 4 BUCKETS
STORED AS ORC
""")

// Write to bucketed table
df.write
  .format("orc")
  .bucketBy(4, "id")
  .saveAsTable("bucketed_table")

Error Handling

Common file format errors and solutions:

Unsupported File Format

// Error: Unsupported file format
// Solution: Ensure format is supported or use appropriate SerDe
spark.sql("""
CREATE TABLE custom_format_table (...)
STORED AS INPUTFORMAT 'custom.input.format'
OUTPUTFORMAT 'custom.output.format'
""")

Schema Mismatch

// Error: Schema mismatch between file and table
// Solution: Enable schema evolution or fix schema
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

Compression Issues

// Error: Unsupported compression codec
// Solution: Use supported codec or install required libraries
df.write.format("orc").option("compression", "snappy").save(path)

Performance Tuning

ORC Optimization

// Enable vectorized reading
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")

// Configure split size
spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728") // 128MB

// Enable bloom filters
spark.sql("""
CREATE TABLE optimized_table (...)
USING HIVE
STORED AS ORC
TBLPROPERTIES (
  'orc.bloom.filter.columns'='id,name',
  'orc.create.index'='true'
)
""")

File Size Optimization

// Control file size during writes
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "67108864") // 64MB

// Coalesce small files
df.coalesce(1).write.format("orc").save(path)

Types

// File format interface
trait FileFormat {
  def inferSchema(
    sparkSession: SparkSession,
    options: Map[String, String],
    files: Seq[FileStatus]
  ): Option[StructType]
  
  def prepareWrite(
    sparkSession: SparkSession,
    job: Job, 
    options: Map[String, String],
    dataSchema: StructType
  ): OutputWriterFactory
}

// Data source registration
trait DataSourceRegister {
  def shortName(): String
}

// Hive file sink configuration
case class FileSinkDesc(
  dirName: String,
  tableInfo: TableDesc,
  compressed: Boolean,
  destTableId: Int,
  compressCodec: String
)

// Table description for Hive
case class TableDesc(
  inputFormat: Class[_ <: InputFormat[_, _]],
  outputFormat: Class[_ <: OutputFormat[_, _]],
  properties: Properties
)

// ORC search argument for predicate pushdown
trait SearchArgument {
  def getLeaves(): java.util.List[PredicateLeaf]
  def getExpression(): ExpressionTree
}

// File status information
case class FileStatus(
  path: String,
  length: Long,
  isDirectory: Boolean,
  blockReplication: Short,
  blockSize: Long,
  modificationTime: Long,
  accessTime: Long
)