or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration-utilities.mdfile-formats.mdhive-client.mdindex.mdmetastore-integration.mdquery-execution.mdsession-configuration.mdudf-support.md
tile.json

file-formats.mddocs/

File Format Support

Support for Hive-compatible file formats, particularly ORC files with Hive metadata integration.

Core Imports

import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.sources.DataSourceRegister

Capabilities

ORC File Format

Primary file format implementation for reading and writing ORC files with full Hive compatibility.

class OrcFileFormat extends FileFormat with DataSourceRegister {
  
  /**
   * Infer schema from ORC files
   * @param sparkSession Active Spark session
   * @param options Format-specific options
   * @param files Sequence of file statuses to analyze
   * @returns Inferred schema or None if cannot infer
   */
  def inferSchema(
    sparkSession: SparkSession,
    options: Map[String, String],
    files: Seq[FileStatus]
  ): Option[StructType]
  
  /**
   * Prepare write operations for ORC format
   * @param sparkSession Active Spark session
   * @param job Hadoop job configuration
   * @param options Write options and settings
   * @param dataSchema Schema of data to write
   * @returns OutputWriterFactory for creating writers
   */
  def prepareWrite(
    sparkSession: SparkSession,
    job: Job,
    options: Map[String, String],
    dataSchema: StructType
  ): OutputWriterFactory
  
  /**
   * Build reader for scanning ORC files
   * @param sparkSession Active Spark session
   * @param dataSchema Schema of data in files
   * @param partitionSchema Schema of partition columns
   * @param requiredSchema Schema of required columns
   * @param filters Push-down filters for optimization
   * @param options Read options and settings
   * @param hadoopConf Hadoop configuration
   * @returns Function to create PartitionedFile readers
   */
  def buildReader(
    sparkSession: SparkSession,
    dataSchema: StructType,
    partitionSchema: StructType,
    requiredSchema: StructType,
    filters: Seq[Filter],
    options: Map[String, String],
    hadoopConf: Configuration
  ): PartitionedFile => Iterator[InternalRow]
  
  /** Short name for this data source format */
  def shortName(): String = "orc"
}

ORC File Operations

Utility operations for ORC file handling and metadata.

object OrcFileOperator extends Logging {
  
  /**
   * Read schema from ORC file footer
   * @param file Path to ORC file
   * @param conf Hadoop configuration
   * @param ignoreCorruptFiles Whether to ignore corrupt files
   * @returns Tuple of (schema, user metadata)
   */
  def readSchema(
    file: Path,
    conf: Configuration,
    ignoreCorruptFiles: Boolean
  ): Option[(StructType, Map[String, String])]
  
  /**
   * Read ORC file metadata including statistics
   * @param files Sequence of ORC files to read
   * @param conf Hadoop configuration
   * @returns Aggregated file metadata
   */
  def readFileMetadata(
    files: Seq[Path],
    conf: Configuration
  ): Map[String, String]
}

Hive File Format Integration

Integration layer for Hive-specific file format operations.

private[hive] class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
  
  /**
   * Prepare write using Hive OutputFormat
   * @param sparkSession Active Spark session
   * @param job Hadoop job for write operation
   * @param options Write configuration options
   * @param dataSchema Schema of data to be written
   * @returns OutputWriterFactory using Hive serialization
   */
  def prepareWrite(
    sparkSession: SparkSession,
    job: Job,
    options: Map[String, String],
    dataSchema: StructType
  ): OutputWriterFactory
  
  /**
   * Build reader using Hive InputFormat and SerDe
   * @param sparkSession Active Spark session
   * @param dataSchema Full schema of data files
   * @param partitionSchema Schema of partition columns
   * @param requiredSchema Schema of columns to read
   * @param filters Filters to push down to storage
   * @param options Read configuration options
   * @param hadoopConf Hadoop configuration
   * @returns Function to read PartitionedFile
   */
  def buildReader(
    sparkSession: SparkSession,
    dataSchema: StructType,
    partitionSchema: StructType,
    requiredSchema: StructType,
    filters: Seq[Filter],
    options: Map[String, String],
    hadoopConf: Configuration
  ): PartitionedFile => Iterator[InternalRow]
}

Java Integration Classes

Low-level Java classes for ORC integration.

// Note: These are Java classes with Scala signatures for documentation

/**
 * Custom ORC record reader optimized for Spark
 */
class SparkOrcNewRecordReader extends RecordReader[NullWritable, VectorizedRowBatch] {
  
  /**
   * Initialize the record reader
   * @param inputSplit Input split to read
   * @param context Task attempt context
   */
  def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit
  
  /**
   * Read next batch of records
   * @returns true if more records available
   */
  def nextKeyValue(): Boolean
  
  /**
   * Get current key (always null for ORC)
   * @returns NullWritable key
   */
  def getCurrentKey(): NullWritable
  
  /**
   * Get current batch of records
   * @returns VectorizedRowBatch containing records
   */
  def getCurrentValue(): VectorizedRowBatch
  
  /**
   * Get reading progress as percentage
   * @returns Progress between 0.0 and 1.0
   */
  def getProgress(): Float
  
  /** Close the record reader */
  def close(): Unit
}

/**
 * Input format for handling symlinked text files
 */
class DelegateSymlinkTextInputFormat extends TextInputFormat {
  
  /**
   * Get input splits for symlinked files
   * @param job Job configuration
   * @returns Array of input splits
   */
  def getSplits(job: JobContext): java.util.List[InputSplit]
}

Usage Examples

Reading ORC Files

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .enableHiveSupport()
  .getOrCreate()

// Read ORC files directly
val orcDF = spark.read
  .format("orc")
  .option("mergeSchema", "true")
  .load("/path/to/orc/files")

orcDF.printSchema()
orcDF.show()

// Read Hive ORC table
val hiveOrcTable = spark.sql("SELECT * FROM hive_orc_table")
hiveOrcTable.explain(true)

Writing ORC Files

import org.apache.spark.sql.SaveMode

// Create sample data
val data = Seq(
  (1, "Alice", 25),
  (2, "Bob", 30),
  (3, "Charlie", 35)
).toDF("id", "name", "age")

// Write as ORC with Hive compatibility
data.write
  .mode(SaveMode.Overwrite)
  .option("compression", "snappy")
  .format("orc")
  .save("/path/to/output/orc")

// Write to Hive table using ORC format
data.write
  .mode(SaveMode.Overwrite)
  .saveAsTable("my_database.orc_table")

Schema Evolution and Merging

// Enable schema merging for ORC files
val mergedDF = spark.read
  .format("orc")
  .option("mergeSchema", "true")
  .load("/path/to/orc/files/*")

// Handle schema evolution gracefully
val evolvedDF = spark.read
  .format("orc")
  .option("recursiveFileLookup", "true")
  .load("/path/to/evolved/schema/files")

// Check for schema differences
mergedDF.printSchema()
evolvedDF.printSchema()

Configuration Options

ORC-Specific Options

// Read options
val orcOptions = Map(
  "mergeSchema" -> "true",           // Merge schemas from multiple files
  "recursiveFileLookup" -> "true",   // Recursively look for files
  "ignoreCorruptFiles" -> "false",   // Fail on corrupt files
  "compression" -> "snappy"          // Compression codec
)

// Write options  
val writeOptions = Map(
  "compression" -> "zlib",           // zlib, snappy, lzo, lz4, none
  "orc.compress" -> "SNAPPY",        // ORC compression
  "orc.stripe.size" -> "67108864",   // 64MB stripe size
  "orc.block.size" -> "268435456"    // 256MB block size
)

Hive Integration Settings

// Configure ORC conversion from Hive metastore
spark.conf.set("spark.sql.hive.convertMetastoreOrc", "true")
spark.conf.set("spark.sql.orc.impl", "native")
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")

Error Handling

Common file format exceptions:

  • CorruptedFileException: When ORC files are corrupted or unreadable
  • UnsupportedFileFormatException: When file format is not supported
  • SchemaIncompatibleException: When schemas cannot be merged or converted
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat

try {
  val df = spark.read.format("orc").load("/path/to/corrupt/files")
  df.count()
} catch {
  case e: java.io.IOException if e.getMessage.contains("Malformed ORC file") =>
    println("ORC file is corrupted")
  case e: AnalysisException if e.getMessage.contains("Unable to infer schema") =>
    println("Cannot determine schema from ORC files")
}

Types

File Format Types

case class OrcOptions(
  parameters: Map[String, String]
) {
  def mergeSchema: Boolean
  def ignoreCorruptFiles: Boolean  
  def recursiveFileLookup: Boolean
  def compression: String
}

```scala { .api }
trait FileFormat {
  def inferSchema(
    sparkSession: SparkSession,
    options: Map[String, String], 
    files: Seq[FileStatus]
  ): Option[StructType]
  
  def prepareWrite(
    sparkSession: SparkSession,
    job: Job,
    options: Map[String, String],
    dataSchema: StructType
  ): OutputWriterFactory
}