tessl install tessl/maven-org-apache-spark--spark-hive@1.6.0Apache Spark SQL Hive integration module providing HiveContext, metastore operations, HiveQL parsing, and Hive data format compatibility
Apache Spark Hive integration provides comprehensive support for ORC (Optimized Row Columnar) file format with advanced features including predicate pushdown, column pruning, schema evolution, and vectorized execution. This integration offers both Hive-compatible ORC access and native Spark ORC optimizations.
class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
def shortName(): String = "orc"
def createRelation(
sqlContext: SQLContext,
paths: Array[String],
dataSchema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]
): HadoopFsRelation
}DefaultSource - Data source provider for ORC files
shortName - Returns "orc" as the data source identifier
createRelation - Creates ORC relation with schema and partitioning support
Usage Examples:
import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc)
// Read ORC files using data source API
val orcData = hiveContext.read
.format("orc")
.option("path", "/data/orc_files")
.load()
// Write data as ORC
dataFrame.write
.format("orc")
.option("compression", "snappy")
.save("/output/orc_data")
// Create table using ORC format
hiveContext.sql("""
CREATE TABLE sales_orc (
order_id bigint,
customer_id string,
amount decimal(10,2),
order_date timestamp
)
USING ORC
OPTIONS (
path '/warehouse/sales_orc'
)
""")case class OrcRelation(
location: String,
parameters: Map[String, String]
)(sqlContext: SQLContext) extends HadoopFsRelationOrcRelation - Represents ORC file relation with location and configuration
Configuration Parameters:
compression - Compression codec (NONE, ZLIB, SNAPPY, LZO, LZ4)stripe.size - ORC stripe size in bytesblock.size - HDFS block sizerow.index.stride - Number of rows between index entriesUsage Example:
// Create ORC relation with custom parameters
val orcRelation = OrcRelation(
location = "/data/optimized_orc",
parameters = Map(
"compression" -> "snappy",
"stripe.size" -> "67108864", // 64MB stripes
"row.index.stride" -> "10000"
)
)(hiveContext)The ORC integration supports pushing down various filter types to the ORC reader for efficient data scanning:
Supported Filter Types:
column = valuecolumn > value, column < value, column BETWEEN x AND ycolumn IN (value1, value2, ...)column IS NULL, column IS NOT NULLcolumn LIKE pattern (limited patterns)Usage Examples:
// These filters will be pushed down to ORC reader
val filteredData = hiveContext.sql("""
SELECT order_id, amount
FROM sales_orc
WHERE amount > 100
AND customer_id IN ('CUST001', 'CUST002')
AND order_date >= '2023-01-01'
""")
// Complex filters with pushdown
val complexFilters = hiveContext.sql("""
SELECT *
FROM sales_orc
WHERE (amount BETWEEN 50 AND 500)
AND customer_id IS NOT NULL
AND order_date > current_date() - interval 30 days
""")Predicate pushdown provides significant performance benefits:
ORC supports reading only required columns, providing columnar access benefits:
// Only reads order_id and amount columns from ORC files
val projectedData = hiveContext.sql("""
SELECT order_id, amount
FROM large_sales_orc
""")
// Column pruning with aggregation
val aggregatedData = hiveContext.sql("""
SELECT customer_id, SUM(amount) as total
FROM sales_orc
GROUP BY customer_id
""")Performance Benefits:
ORC supports certain schema evolution scenarios:
Supported Changes:
Usage Example:
// Original schema: (id: long, name: string)
// Evolved schema: (id: long, name: string, email: string, age: int)
val evolvedData = hiveContext.sql("""
SELECT id, name,
COALESCE(email, 'unknown') as email,
COALESCE(age, 0) as age
FROM evolved_table_orc
""")ORC integration supports Hive-style directory partitioning:
// Create partitioned ORC table
hiveContext.sql("""
CREATE TABLE partitioned_sales_orc (
order_id bigint,
customer_id string,
amount decimal(10,2)
)
USING ORC
PARTITIONED BY (year int, month int)
OPTIONS (
path '/warehouse/partitioned_sales'
)
""")
// Insert with dynamic partitioning
hiveContext.sql("""
INSERT INTO partitioned_sales_orc
PARTITION (year, month)
SELECT order_id, customer_id, amount,
YEAR(order_date), MONTH(order_date)
FROM source_orders
""")
// Query with partition pruning
val partitionedQuery = hiveContext.sql("""
SELECT customer_id, SUM(amount)
FROM partitioned_sales_orc
WHERE year = 2023 AND month IN (11, 12)
GROUP BY customer_id
""")// Available compression options
val COMPRESSION_NONE = "NONE"
val COMPRESSION_ZLIB = "ZLIB" // Default, good compression ratio
val COMPRESSION_SNAPPY = "SNAPPY" // Fast compression/decompression
val COMPRESSION_LZO = "LZO" // Fast with good ratio
val COMPRESSION_LZ4 = "LZ4" // Very fastUsage Examples:
// Write with different compression settings
dataFrame.write
.format("orc")
.option("compression", "snappy") // Fast compression
.save("/data/fast_orc")
dataFrame.write
.format("orc")
.option("compression", "zlib") // Better compression ratio
.save("/data/compressed_orc")
// Create table with compression
hiveContext.sql("""
CREATE TABLE compressed_orc (
id bigint,
data string
)
USING ORC
OPTIONS (
compression 'lz4',
path '/warehouse/compressed'
)
""")ORC supports vectorized reading for improved performance:
// Enable vectorized ORC reading (usually enabled by default)
hiveContext.setConf("spark.sql.orc.impl", "native")
hiveContext.setConf("spark.sql.orc.enableVectorizedReader", "true")
// Vectorized operations benefit from batch processing
val vectorizedQuery = hiveContext.sql("""
SELECT
customer_id,
SUM(amount) as total,
COUNT(*) as order_count,
AVG(amount) as avg_amount
FROM large_orc_table
GROUP BY customer_id
""")ORC supports bloom filters for efficient filtering:
// Create ORC table with bloom filters
hiveContext.sql("""
CREATE TABLE bloom_filtered_orc (
id bigint,
customer_id string,
product_id string,
amount decimal(10,2)
)
USING ORC
OPTIONS (
'orc.bloom.filter.columns' 'customer_id,product_id',
'orc.bloom.filter.fpp' '0.01'
)
""")ORC automatically maintains column statistics for optimization:
// Statistics are automatically used for:
// - Query planning and optimization
// - Predicate pushdown decisions
// - Join ordering
val statsOptimizedQuery = hiveContext.sql("""
SELECT *
FROM large_orc_table l
JOIN small_orc_table s ON l.id = s.id
WHERE l.amount > 1000
""")object OrcFileOperator {
def readSchema(path: String): StructType
def readFooter(path: String): OrcProto.Footer
def getFileMetadata(path: String): Map[String, String]
}readSchema - Extracts schema from ORC file footer
readFooter - Reads ORC file footer with metadata
getFileMetadata - Retrieves custom metadata from ORC files
Usage Examples:
// Read ORC file schema without loading data
val schema = OrcFileOperator.readSchema("/data/sample.orc")
println(s"Schema: ${schema.prettyJson}")
// Get file statistics
val metadata = OrcFileOperator.getFileMetadata("/data/sample.orc")
metadata.foreach { case (key, value) =>
println(s"$key: $value")
}// Configure stripe size based on data characteristics
dataFrame.write
.format("orc")
.option("orc.stripe.size", "134217728") // 128MB for large files
.option("orc.block.size", "268435456") // 256MB HDFS blocks
.save("/data/tuned_orc")// Optimize row group size for memory usage
dataFrame.write
.format("orc")
.option("orc.row.index.stride", "10000") // 10K rows per group
.save("/data/row_optimized_orc")// Configure memory for ORC operations
hiveContext.setConf("spark.sql.orc.columnarReaderBatchSize", "4096")
hiveContext.setConf("spark.sql.orc.filterPushdown", "true")// Create ORC table in Hive metastore
hiveContext.sql("""
CREATE TABLE hive_orc_table (
id bigint,
name string,
score double
)
STORED AS ORC
LOCATION '/warehouse/hive_orc'
TBLPROPERTIES (
'orc.compress' = 'SNAPPY',
'orc.stripe.size' = '67108864'
)
""")// ORC SerDe configuration
hiveContext.sql("""
CREATE TABLE custom_orc (
data string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
SERDEPROPERTIES (
'orc.compress' = 'ZLIB',
'orc.bloom.filter.columns' = 'data'
)
""")try {
val orcData = hiveContext.read.format("orc").load("/invalid/path")
} catch {
case e: FileNotFoundException =>
println("ORC files not found at specified path")
case e: CorruptedOrcFileException =>
println("ORC file is corrupted or incomplete")
case e: IncompatibleSchemaException =>
println("Schema mismatch between ORC file and expected schema")
}// Validate schema compatibility before reading
try {
val expectedSchema = StructType(Seq(
StructField("id", LongType),
StructField("name", StringType)
))
val orcData = hiveContext.read
.schema(expectedSchema)
.format("orc")
.load("/data/orc_files")
} catch {
case e: AnalysisException if e.getMessage.contains("schema") =>
println("Schema mismatch - check ORC file structure")
}