or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-hive_2.11@1.6.x

docs

client-interface.mdexecution-engine.mdhive-context.mdhiveql-parser.mdindex.mdorc-support.mdtype-system.mdudf-support.md
tile.json

tessl/maven-org-apache-spark--spark-hive

tessl install tessl/maven-org-apache-spark--spark-hive@1.6.0

Apache Spark SQL Hive integration module providing HiveContext, metastore operations, HiveQL parsing, and Hive data format compatibility

orc-support.mddocs/

ORC Support

Apache Spark Hive integration provides comprehensive support for ORC (Optimized Row Columnar) file format with advanced features including predicate pushdown, column pruning, schema evolution, and vectorized execution. This integration offers both Hive-compatible ORC access and native Spark ORC optimizations.

Core ORC Integration

DefaultSource

class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
  def shortName(): String = "orc"
  
  def createRelation(
    sqlContext: SQLContext,
    paths: Array[String],
    dataSchema: Option[StructType],
    partitionColumns: Option[StructType],
    parameters: Map[String, String]
  ): HadoopFsRelation
}

DefaultSource - Data source provider for ORC files

shortName - Returns "orc" as the data source identifier

createRelation - Creates ORC relation with schema and partitioning support

Usage Examples:

import org.apache.spark.sql.hive.HiveContext

val hiveContext = new HiveContext(sc)

// Read ORC files using data source API
val orcData = hiveContext.read
  .format("orc")
  .option("path", "/data/orc_files")
  .load()

// Write data as ORC
dataFrame.write
  .format("orc")
  .option("compression", "snappy")
  .save("/output/orc_data")

// Create table using ORC format
hiveContext.sql("""
  CREATE TABLE sales_orc (
    order_id bigint,
    customer_id string,
    amount decimal(10,2),
    order_date timestamp
  )
  USING ORC
  OPTIONS (
    path '/warehouse/sales_orc'
  )
""")

OrcRelation

case class OrcRelation(
  location: String,
  parameters: Map[String, String]
)(sqlContext: SQLContext) extends HadoopFsRelation

OrcRelation - Represents ORC file relation with location and configuration

  • location - File system path to ORC files
  • parameters - Configuration options for ORC reading/writing
  • sqlContext - SQL context for execution

Configuration Parameters:

  • compression - Compression codec (NONE, ZLIB, SNAPPY, LZO, LZ4)
  • stripe.size - ORC stripe size in bytes
  • block.size - HDFS block size
  • row.index.stride - Number of rows between index entries

Usage Example:

// Create ORC relation with custom parameters
val orcRelation = OrcRelation(
  location = "/data/optimized_orc",
  parameters = Map(
    "compression" -> "snappy",
    "stripe.size" -> "67108864",  // 64MB stripes
    "row.index.stride" -> "10000"
  )
)(hiveContext)

Predicate Pushdown

ORC Filter Support

The ORC integration supports pushing down various filter types to the ORC reader for efficient data scanning:

Supported Filter Types:

  • Equality filters: column = value
  • Range filters: column > value, column < value, column BETWEEN x AND y
  • IN filters: column IN (value1, value2, ...)
  • NULL filters: column IS NULL, column IS NOT NULL
  • String filters: column LIKE pattern (limited patterns)

Usage Examples:

// These filters will be pushed down to ORC reader
val filteredData = hiveContext.sql("""
  SELECT order_id, amount
  FROM sales_orc
  WHERE amount > 100 
    AND customer_id IN ('CUST001', 'CUST002')
    AND order_date >= '2023-01-01'
""")

// Complex filters with pushdown
val complexFilters = hiveContext.sql("""
  SELECT *
  FROM sales_orc
  WHERE (amount BETWEEN 50 AND 500)
    AND customer_id IS NOT NULL
    AND order_date > current_date() - interval 30 days
""")

Filter Performance

Predicate pushdown provides significant performance benefits:

  • I/O Reduction: Skips irrelevant stripes and row groups
  • CPU Optimization: Reduces deserialization overhead
  • Memory Efficiency: Lower memory usage for filtered results

Column Pruning

Projection Pushdown

ORC supports reading only required columns, providing columnar access benefits:

// Only reads order_id and amount columns from ORC files
val projectedData = hiveContext.sql("""
  SELECT order_id, amount
  FROM large_sales_orc
""")

// Column pruning with aggregation
val aggregatedData = hiveContext.sql("""
  SELECT customer_id, SUM(amount) as total
  FROM sales_orc
  GROUP BY customer_id
""")

Performance Benefits:

  • Reduced I/O by reading only necessary columns
  • Lower memory usage
  • Faster query execution for wide tables

Schema Evolution

Compatible Schema Changes

ORC supports certain schema evolution scenarios:

Supported Changes:

  • Adding new columns at the end
  • Renaming columns (with proper mapping)
  • Changing column types (compatible conversions)

Usage Example:

// Original schema: (id: long, name: string)
// Evolved schema: (id: long, name: string, email: string, age: int)

val evolvedData = hiveContext.sql("""
  SELECT id, name, 
         COALESCE(email, 'unknown') as email,
         COALESCE(age, 0) as age
  FROM evolved_table_orc
""")

Partitioning Support

Hive-Style Partitioning

ORC integration supports Hive-style directory partitioning:

// Create partitioned ORC table
hiveContext.sql("""
  CREATE TABLE partitioned_sales_orc (
    order_id bigint,
    customer_id string,
    amount decimal(10,2)
  )
  USING ORC
  PARTITIONED BY (year int, month int)
  OPTIONS (
    path '/warehouse/partitioned_sales'
  )
""")

// Insert with dynamic partitioning
hiveContext.sql("""
  INSERT INTO partitioned_sales_orc
  PARTITION (year, month)
  SELECT order_id, customer_id, amount,
         YEAR(order_date), MONTH(order_date)
  FROM source_orders
""")

// Query with partition pruning
val partitionedQuery = hiveContext.sql("""
  SELECT customer_id, SUM(amount)
  FROM partitioned_sales_orc
  WHERE year = 2023 AND month IN (11, 12)
  GROUP BY customer_id
""")

Compression Options

Supported Compression Codecs

// Available compression options
val COMPRESSION_NONE = "NONE"
val COMPRESSION_ZLIB = "ZLIB"      // Default, good compression ratio
val COMPRESSION_SNAPPY = "SNAPPY"   // Fast compression/decompression
val COMPRESSION_LZO = "LZO"        // Fast with good ratio
val COMPRESSION_LZ4 = "LZ4"        // Very fast

Usage Examples:

// Write with different compression settings
dataFrame.write
  .format("orc")
  .option("compression", "snappy")  // Fast compression
  .save("/data/fast_orc")

dataFrame.write
  .format("orc")
  .option("compression", "zlib")    // Better compression ratio
  .save("/data/compressed_orc")

// Create table with compression
hiveContext.sql("""
  CREATE TABLE compressed_orc (
    id bigint,
    data string
  )
  USING ORC
  OPTIONS (
    compression 'lz4',
    path '/warehouse/compressed'
  )
""")

Advanced ORC Features

Vectorized Execution

ORC supports vectorized reading for improved performance:

// Enable vectorized ORC reading (usually enabled by default)
hiveContext.setConf("spark.sql.orc.impl", "native")
hiveContext.setConf("spark.sql.orc.enableVectorizedReader", "true")

// Vectorized operations benefit from batch processing
val vectorizedQuery = hiveContext.sql("""
  SELECT 
    customer_id,
    SUM(amount) as total,
    COUNT(*) as order_count,
    AVG(amount) as avg_amount
  FROM large_orc_table
  GROUP BY customer_id
""")

Bloom Filters

ORC supports bloom filters for efficient filtering:

// Create ORC table with bloom filters
hiveContext.sql("""
  CREATE TABLE bloom_filtered_orc (
    id bigint,
    customer_id string,
    product_id string,
    amount decimal(10,2)
  )
  USING ORC
  OPTIONS (
    'orc.bloom.filter.columns' 'customer_id,product_id',
    'orc.bloom.filter.fpp' '0.01'
  )
""")

ORC Statistics

ORC automatically maintains column statistics for optimization:

// Statistics are automatically used for:
// - Query planning and optimization
// - Predicate pushdown decisions
// - Join ordering

val statsOptimizedQuery = hiveContext.sql("""
  SELECT *
  FROM large_orc_table l
  JOIN small_orc_table s ON l.id = s.id
  WHERE l.amount > 1000
""")

File Operations

ORC File Utilities

object OrcFileOperator {
  def readSchema(path: String): StructType
  def readFooter(path: String): OrcProto.Footer
  def getFileMetadata(path: String): Map[String, String]
}

readSchema - Extracts schema from ORC file footer

readFooter - Reads ORC file footer with metadata

getFileMetadata - Retrieves custom metadata from ORC files

Usage Examples:

// Read ORC file schema without loading data
val schema = OrcFileOperator.readSchema("/data/sample.orc")
println(s"Schema: ${schema.prettyJson}")

// Get file statistics
val metadata = OrcFileOperator.getFileMetadata("/data/sample.orc")
metadata.foreach { case (key, value) =>
  println(s"$key: $value")
}

Performance Tuning

Optimal Stripe Size

// Configure stripe size based on data characteristics
dataFrame.write
  .format("orc")
  .option("orc.stripe.size", "134217728")  // 128MB for large files
  .option("orc.block.size", "268435456")   // 256MB HDFS blocks
  .save("/data/tuned_orc")

Row Group Size

// Optimize row group size for memory usage
dataFrame.write
  .format("orc")
  .option("orc.row.index.stride", "10000")  // 10K rows per group
  .save("/data/row_optimized_orc")

Memory Configuration

// Configure memory for ORC operations
hiveContext.setConf("spark.sql.orc.columnarReaderBatchSize", "4096")
hiveContext.setConf("spark.sql.orc.filterPushdown", "true")

Integration with Hive Metastore

Metastore Table Creation

// Create ORC table in Hive metastore
hiveContext.sql("""
  CREATE TABLE hive_orc_table (
    id bigint,
    name string,
    score double
  )
  STORED AS ORC
  LOCATION '/warehouse/hive_orc'
  TBLPROPERTIES (
    'orc.compress' = 'SNAPPY',
    'orc.stripe.size' = '67108864'
  )
""")

SerDe Properties

// ORC SerDe configuration
hiveContext.sql("""
  CREATE TABLE custom_orc (
    data string
  )
  ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
  STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
  SERDEPROPERTIES (
    'orc.compress' = 'ZLIB',
    'orc.bloom.filter.columns' = 'data'
  )
""")

Error Handling

Common ORC Errors

try {
  val orcData = hiveContext.read.format("orc").load("/invalid/path")
} catch {
  case e: FileNotFoundException =>
    println("ORC files not found at specified path")
  case e: CorruptedOrcFileException =>
    println("ORC file is corrupted or incomplete")
  case e: IncompatibleSchemaException =>
    println("Schema mismatch between ORC file and expected schema")
}

Schema Validation

// Validate schema compatibility before reading
try {
  val expectedSchema = StructType(Seq(
    StructField("id", LongType),
    StructField("name", StringType)
  ))
  
  val orcData = hiveContext.read
    .schema(expectedSchema)
    .format("orc")
    .load("/data/orc_files")
} catch {
  case e: AnalysisException if e.getMessage.contains("schema") =>
    println("Schema mismatch - check ORC file structure")
}

Best Practices

Writing ORC Files

  1. Choose appropriate stripe size (64-256MB)
  2. Enable compression (SNAPPY for speed, ZLIB for size)
  3. Use column ordering (frequently filtered columns first)
  4. Enable bloom filters for high-cardinality columns
  5. Partition large datasets for better query performance

Reading ORC Files

  1. Enable predicate pushdown for filtering
  2. Use column pruning for projection
  3. Enable vectorized reading for analytical queries
  4. Configure appropriate batch sizes
  5. Monitor memory usage for large files