The Legacy Data Source V1 APIs provide backward compatibility with older Spark data source implementations. While these APIs are deprecated in favor of Data Source V2, they are still widely used and supported for existing implementations.
The core filter abstraction for predicate pushdown:
package org.apache.spark.sql.sources
abstract class Filter {
/**
* List of columns referenced by this filter
*/
def references: Array[String]
/**
* V2-style references (supporting nested fields)
*/
def v2references: Array[Array[String]]
/**
* Convert to V2 predicate format
*/
private[sql] def toV2: Predicate
}case class EqualTo(attribute: String, value: Any) extends Filter {
override def references: Array[String] = Array(attribute)
}case class EqualNullSafe(attribute: String, value: Any) extends Filter {
override def references: Array[String] = Array(attribute)
}case class GreaterThan(attribute: String, value: Any) extends Filter {
override def references: Array[String] = Array(attribute)
}
case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter {
override def references: Array[String] = Array(attribute)
}
case class LessThan(attribute: String, value: Any) extends Filter {
override def references: Array[String] = Array(attribute)
}
case class LessThanOrEqual(attribute: String, value: Any) extends Filter {
override def references: Array[String] = Array(attribute)
}case class In(attribute: String, values: Array[Any]) extends Filter {
override def references: Array[String] = Array(attribute)
}case class IsNull(attribute: String) extends Filter {
override def references: Array[String] = Array(attribute)
}
case class IsNotNull(attribute: String) extends Filter {
override def references: Array[String] = Array(attribute)
}case class And(left: Filter, right: Filter) extends Filter {
override def references: Array[String] = left.references ++ right.references
}case class Or(left: Filter, right: Filter) extends Filter {
override def references: Array[String] = left.references ++ right.references
}case class Not(child: Filter) extends Filter {
override def references: Array[String] = child.references
}case class StringStartsWith(attribute: String, value: String) extends Filter {
override def references: Array[String] = Array(attribute)
}case class StringEndsWith(attribute: String, value: String) extends Filter {
override def references: Array[String] = Array(attribute)
}case class StringContains(attribute: String, value: String) extends Filter {
override def references: Array[String] = Array(attribute)
}case class AlwaysTrue() extends Filter {
override def references: Array[String] = Array.empty
}
case class AlwaysFalse() extends Filter {
override def references: Array[String] = Array.empty
}import org.apache.spark.sql.sources._
// Comparison filters
val equalFilter = EqualTo("status", "active")
val nullSafeEqual = EqualNullSafe("status", "active")
val greaterThan = GreaterThan("age", 18)
val lessThanOrEqual = LessThanOrEqual("age", 65)
// Set-based filters
val inFilter = In("category", Array("A", "B", "C"))
val isNullFilter = IsNull("description")
val isNotNullFilter = IsNotNull("email")
// String pattern filters
val startsWithFilter = StringStartsWith("name", "John")
val endsWithFilter = StringEndsWith("email", "@company.com")
val containsFilter = StringContains("description", "important")// Logical combinations
val activeAdults = And(
EqualTo("status", "active"),
GreaterThan("age", 18)
)
val eligibleUsers = Or(
And(EqualTo("status", "active"), GreaterThan("age", 18)),
EqualTo("priority", "VIP")
)
val complexFilter = And(
Or(
EqualTo("category", "premium"),
In("tier", Array("gold", "platinum"))
),
And(
IsNotNull("email"),
Not(StringContains("email", "temp"))
)
)
// Age range filter
val workingAge = And(
GreaterThanOrEqual("age", 18),
LessThan("age", 65)
)def analyzeFilter(filter: Filter): String = filter match {
case EqualTo(attr, value) =>
s"Equality check: $attr = $value"
case GreaterThan(attr, value) =>
s"Range filter: $attr > $value"
case In(attr, values) =>
s"Set membership: $attr IN [${values.mkString(", ")}]"
case And(left, right) =>
s"Conjunction: (${analyzeFilter(left)}) AND (${analyzeFilter(right)})"
case Or(left, right) =>
s"Disjunction: (${analyzeFilter(left)}) OR (${analyzeFilter(right)})"
case Not(child) =>
s"Negation: NOT (${analyzeFilter(child)})"
case IsNull(attr) =>
s"Null check: $attr IS NULL"
case IsNotNull(attr) =>
s"Not null check: $attr IS NOT NULL"
case StringStartsWith(attr, value) =>
s"Prefix match: $attr LIKE '$value%'"
case StringEndsWith(attr, value) =>
s"Suffix match: $attr LIKE '%$value'"
case StringContains(attr, value) =>
s"Substring match: $attr LIKE '%$value%'"
case AlwaysTrue() =>
"Always true"
case AlwaysFalse() =>
"Always false"
case _ =>
s"Unknown filter: ${filter.getClass.getSimpleName}"
}object FilterOptimizer {
def simplifyFilter(filter: Filter): Filter = filter match {
// Identity optimizations
case And(AlwaysTrue(), right) => simplifyFilter(right)
case And(left, AlwaysTrue()) => simplifyFilter(left)
case Or(AlwaysFalse(), right) => simplifyFilter(right)
case Or(left, AlwaysFalse()) => simplifyFilter(left)
// Contradiction optimizations
case And(AlwaysFalse(), _) => AlwaysFalse()
case And(_, AlwaysFalse()) => AlwaysFalse()
case Or(AlwaysTrue(), _) => AlwaysTrue()
case Or(_, AlwaysTrue()) => AlwaysTrue()
// Double negation
case Not(Not(child)) => simplifyFilter(child)
// Recursive simplification
case And(left, right) =>
val simplifiedLeft = simplifyFilter(left)
val simplifiedRight = simplifyFilter(right)
if (simplifiedLeft != left || simplifiedRight != right) {
simplifyFilter(And(simplifiedLeft, simplifiedRight))
} else {
And(simplifiedLeft, simplifiedRight)
}
case Or(left, right) =>
val simplifiedLeft = simplifyFilter(left)
val simplifiedRight = simplifyFilter(right)
if (simplifiedLeft != left || simplifiedRight != right) {
simplifyFilter(Or(simplifiedLeft, simplifiedRight))
} else {
Or(simplifiedLeft, simplifiedRight)
}
case Not(child) =>
val simplifiedChild = simplifyFilter(child)
if (simplifiedChild != child) {
simplifyFilter(Not(simplifiedChild))
} else {
Not(simplifiedChild)
}
case other => other
}
def extractColumnReferences(filter: Filter): Set[String] = {
filter.references.toSet
}
def isSelectiveFilter(filter: Filter): Boolean = filter match {
case EqualTo(_, _) => true
case In(_, values) => values.length <= 10
case And(left, right) => isSelectiveFilter(left) && isSelectiveFilter(right)
case Or(left, right) => isSelectiveFilter(left) || isSelectiveFilter(right)
case _ => false
}
}object FilterConverter {
def toSqlString(filter: Filter): String = filter match {
case EqualTo(attr, value) => s"$attr = ${formatValue(value)}"
case EqualNullSafe(attr, value) => s"$attr <=> ${formatValue(value)}"
case GreaterThan(attr, value) => s"$attr > ${formatValue(value)}"
case GreaterThanOrEqual(attr, value) => s"$attr >= ${formatValue(value)}"
case LessThan(attr, value) => s"$attr < ${formatValue(value)}"
case LessThanOrEqual(attr, value) => s"$attr <= ${formatValue(value)}"
case In(attr, values) => s"$attr IN (${values.map(formatValue).mkString(", ")})"
case IsNull(attr) => s"$attr IS NULL"
case IsNotNull(attr) => s"$attr IS NOT NULL"
case And(left, right) => s"(${toSqlString(left)}) AND (${toSqlString(right)})"
case Or(left, right) => s"(${toSqlString(left)}) OR (${toSqlString(right)})"
case Not(child) => s"NOT (${toSqlString(child)})"
case StringStartsWith(attr, value) => s"$attr LIKE '${escapeString(value)}%'"
case StringEndsWith(attr, value) => s"$attr LIKE '%${escapeString(value)}'"
case StringContains(attr, value) => s"$attr LIKE '%${escapeString(value)}%'"
case AlwaysTrue() => "TRUE"
case AlwaysFalse() => "FALSE"
case _ => filter.toString
}
private def formatValue(value: Any): String = value match {
case null => "NULL"
case s: String => s"'${escapeString(s)}'"
case _ => value.toString
}
private def escapeString(s: String): String = {
s.replace("'", "''").replace("\\", "\\\\")
}
def toV2Predicate(filter: Filter): Predicate = {
// Convert V1 filter to V2 predicate
filter.toV2
}
}trait PushdownDataSource {
def buildScan(filters: Array[Filter]): RDD[Row] = {
val (pushable, nonPushable) = partitionFilters(filters)
// Build scan with pushed filters
val baseRDD = buildScanWithFilters(pushable)
// Apply remaining filters in Spark
if (nonPushable.nonEmpty) {
val combinedFilter = nonPushable.reduce(And)
baseRDD.filter(row => evaluateFilter(combinedFilter, row))
} else {
baseRDD
}
}
def partitionFilters(filters: Array[Filter]): (Array[Filter], Array[Filter]) = {
filters.partition(canPushDown)
}
def canPushDown(filter: Filter): Boolean = filter match {
case EqualTo(_, _) => true
case GreaterThan(_, _) => true
case LessThan(_, _) => true
case GreaterThanOrEqual(_, _) => true
case LessThanOrEqual(_, _) => true
case In(_, _) => true
case IsNull(_) => true
case IsNotNull(_) => true
case And(left, right) => canPushDown(left) && canPushDown(right)
case Or(left, right) => canPushDown(left) && canPushDown(right)
case Not(child) => canPushDown(child)
case _ => false
}
def buildScanWithFilters(filters: Array[Filter]): RDD[Row]
def evaluateFilter(filter: Filter, row: Row): Boolean
}class MyLegacyDataSource extends BaseRelation
with TableScan
with PrunedFilteredScan
with InsertableRelation {
override def schema: StructType = {
// Define schema for the data source
StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("status", StringType, nullable = true)
))
}
override def buildScan(): RDD[Row] = {
buildScan(Array.empty, schema.fieldNames)
}
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// Implement filtered and pruned scan
val pushableFilters = filters.filter(canPushDown)
val prunedSchema = StructType(schema.fields.filter(f => requiredColumns.contains(f.name)))
loadDataWithFiltersAndProjection(pushableFilters, prunedSchema)
}
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
// Implement data insertion
if (overwrite) {
// Clear existing data
clearData()
}
// Write new data
writeData(data)
}
private def loadDataWithFiltersAndProjection(filters: Array[Filter],
schema: StructType): RDD[Row] = {
// Implementation-specific data loading
// This would typically:
// 1. Apply filters during data loading
// 2. Project only required columns
// 3. Return RDD of rows
???
}
private def canPushDown(filter: Filter): Boolean = {
// Define which filters can be pushed to the data source
filter match {
case EqualTo(_, _) | GreaterThan(_, _) | LessThan(_, _) => true
case In(_, _) | IsNull(_) | IsNotNull(_) => true
case And(left, right) => canPushDown(left) && canPushDown(right)
case _ => false
}
}
}object V1ToV2Migration {
def convertFilter(v1Filter: Filter): Predicate = v1Filter match {
case EqualTo(attr, value) =>
new org.apache.spark.sql.connector.expressions.filter.EqualTo(
Expressions.column(attr),
Expressions.literal(value)
)
case GreaterThan(attr, value) =>
new org.apache.spark.sql.connector.expressions.filter.GreaterThan(
Expressions.column(attr),
Expressions.literal(value)
)
case LessThan(attr, value) =>
new org.apache.spark.sql.connector.expressions.filter.LessThan(
Expressions.column(attr),
Expressions.literal(value)
)
case In(attr, values) =>
new org.apache.spark.sql.connector.expressions.filter.In(
Expressions.column(attr),
values.map(Expressions.literal)
)
case IsNull(attr) =>
new org.apache.spark.sql.connector.expressions.filter.IsNull(
Expressions.column(attr)
)
case And(left, right) =>
new org.apache.spark.sql.connector.expressions.filter.And(
convertFilter(left),
convertFilter(right)
)
case Or(left, right) =>
new org.apache.spark.sql.connector.expressions.filter.Or(
convertFilter(left),
convertFilter(right)
)
case Not(child) =>
new org.apache.spark.sql.connector.expressions.filter.Not(
convertFilter(child)
)
case AlwaysTrue() =>
new org.apache.spark.sql.connector.expressions.filter.AlwaysTrue()
case AlwaysFalse() =>
new org.apache.spark.sql.connector.expressions.filter.AlwaysFalse()
case _ =>
throw new UnsupportedOperationException(s"Cannot convert filter: $v1Filter")
}
def migrateDataSource(v1Source: BaseRelation): Table = {
new V2TableWrapper(v1Source)
}
}
class V2TableWrapper(v1Source: BaseRelation) extends Table with SupportsRead {
override def name(): String = v1Source.getClass.getSimpleName
override def schema(): StructType = v1Source.schema
override def capabilities(): java.util.Set[TableCapability] = {
java.util.Set.of(TableCapability.BATCH_READ)
}
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new V1CompatScanBuilder(v1Source, schema())
}
}object FilterOptimizationStrategies {
def optimizeFilterOrder(filters: Array[Filter]): Array[Filter] = {
// Order filters by selectivity (most selective first)
filters.sortBy(calculateSelectivity)
}
private def calculateSelectivity(filter: Filter): Double = filter match {
case EqualTo(_, _) => 0.01 // Very selective
case In(_, values) => Math.min(0.1, values.length * 0.01) // Based on value count
case IsNull(_) => 0.05 // Usually selective
case IsNotNull(_) => 0.95 // Usually not selective
case GreaterThan(_, _) | LessThan(_, _) => 0.3 // Moderately selective
case StringStartsWith(_, _) => 0.1 // Quite selective
case StringContains(_, _) => 0.2 // Less selective
case And(left, right) => calculateSelectivity(left) * calculateSelectivity(right)
case Or(left, right) => calculateSelectivity(left) + calculateSelectivity(right) -
(calculateSelectivity(left) * calculateSelectivity(right))
case Not(child) => 1.0 - calculateSelectivity(child)
case _ => 0.5 // Default moderate selectivity
}
def canUseIndex(filter: Filter, indexedColumns: Set[String]): Boolean = {
filter.references.exists(indexedColumns.contains)
}
}trait PerformantV1DataSource extends BaseRelation with PrunedFilteredScan {
// Cache parsed filters to avoid repeated parsing
private val filterCache = new ConcurrentHashMap[Array[Filter], Array[Filter]]()
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// Use cached filter analysis
val optimizedFilters = filterCache.computeIfAbsent(filters, optimizeFilters)
// Minimize data transfer by projecting early
val projectedRDD = loadData(optimizedFilters)
// Apply column pruning
if (requiredColumns.length < schema.fields.length) {
val columnIndices = requiredColumns.map(schema.fieldIndex)
projectedRDD.map(row => Row.fromSeq(columnIndices.map(row.get)))
} else {
projectedRDD
}
}
private def optimizeFilters(filters: Array[Filter]): Array[Filter] = {
filters
.map(FilterOptimizer.simplifyFilter)
.filter(_ != AlwaysTrue())
}
}The Legacy Data Source V1 APIs provide essential compatibility for existing Spark integrations while offering a migration path to the more powerful and flexible V2 APIs. Understanding these APIs is crucial for maintaining and upgrading existing Spark data source implementations.