or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md
tile.json

legacy-data-source-v1.mddocs/

Legacy Data Source V1 APIs

The Legacy Data Source V1 APIs provide backward compatibility with older Spark data source implementations. While these APIs are deprecated in favor of Data Source V2, they are still widely used and supported for existing implementations.

Filter System

Filter Base Class

The core filter abstraction for predicate pushdown:

package org.apache.spark.sql.sources

abstract class Filter {
  /**
   * List of columns referenced by this filter
   */
  def references: Array[String]
  
  /**
   * V2-style references (supporting nested fields)
   */
  def v2references: Array[Array[String]]
  
  /**
   * Convert to V2 predicate format
   */
  private[sql] def toV2: Predicate
}

Comparison Filters

EqualTo

case class EqualTo(attribute: String, value: Any) extends Filter {
  override def references: Array[String] = Array(attribute)
}

EqualNullSafe

case class EqualNullSafe(attribute: String, value: Any) extends Filter {
  override def references: Array[String] = Array(attribute)
}

Inequality Filters

case class GreaterThan(attribute: String, value: Any) extends Filter {
  override def references: Array[String] = Array(attribute)
}

case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter {
  override def references: Array[String] = Array(attribute)
}

case class LessThan(attribute: String, value: Any) extends Filter {
  override def references: Array[String] = Array(attribute)
}

case class LessThanOrEqual(attribute: String, value: Any) extends Filter {
  override def references: Array[String] = Array(attribute)
}

Set-Based Filters

In Filter

case class In(attribute: String, values: Array[Any]) extends Filter {
  override def references: Array[String] = Array(attribute)
}

IsNull and IsNotNull

case class IsNull(attribute: String) extends Filter {
  override def references: Array[String] = Array(attribute)
}

case class IsNotNull(attribute: String) extends Filter {
  override def references: Array[String] = Array(attribute)
}

Logical Filters

And Filter

case class And(left: Filter, right: Filter) extends Filter {
  override def references: Array[String] = left.references ++ right.references
}

Or Filter

case class Or(left: Filter, right: Filter) extends Filter {
  override def references: Array[String] = left.references ++ right.references
}

Not Filter

case class Not(child: Filter) extends Filter {
  override def references: Array[String] = child.references
}

String Pattern Filters

StringStartsWith

case class StringStartsWith(attribute: String, value: String) extends Filter {
  override def references: Array[String] = Array(attribute)
}

StringEndsWith

case class StringEndsWith(attribute: String, value: String) extends Filter {
  override def references: Array[String] = Array(attribute)
}

StringContains

case class StringContains(attribute: String, value: String) extends Filter {
  override def references: Array[String] = Array(attribute)
}

Constant Filters

AlwaysTrue and AlwaysFalse

case class AlwaysTrue() extends Filter {
  override def references: Array[String] = Array.empty
}

case class AlwaysFalse() extends Filter {
  override def references: Array[String] = Array.empty
}

Filter Usage Examples

Basic Filter Construction

import org.apache.spark.sql.sources._

// Comparison filters
val equalFilter = EqualTo("status", "active")
val nullSafeEqual = EqualNullSafe("status", "active")
val greaterThan = GreaterThan("age", 18)
val lessThanOrEqual = LessThanOrEqual("age", 65)

// Set-based filters
val inFilter = In("category", Array("A", "B", "C"))
val isNullFilter = IsNull("description")
val isNotNullFilter = IsNotNull("email")

// String pattern filters
val startsWithFilter = StringStartsWith("name", "John")
val endsWithFilter = StringEndsWith("email", "@company.com")
val containsFilter = StringContains("description", "important")

Complex Filter Combinations

// Logical combinations
val activeAdults = And(
  EqualTo("status", "active"),
  GreaterThan("age", 18)
)

val eligibleUsers = Or(
  And(EqualTo("status", "active"), GreaterThan("age", 18)),
  EqualTo("priority", "VIP")
)

val complexFilter = And(
  Or(
    EqualTo("category", "premium"),
    In("tier", Array("gold", "platinum"))
  ),
  And(
    IsNotNull("email"),
    Not(StringContains("email", "temp"))
  )
)

// Age range filter
val workingAge = And(
  GreaterThanOrEqual("age", 18),
  LessThan("age", 65)
)

Filter Pattern Matching

def analyzeFilter(filter: Filter): String = filter match {
  case EqualTo(attr, value) =>
    s"Equality check: $attr = $value"
    
  case GreaterThan(attr, value) =>
    s"Range filter: $attr > $value"
    
  case In(attr, values) =>
    s"Set membership: $attr IN [${values.mkString(", ")}]"
    
  case And(left, right) =>
    s"Conjunction: (${analyzeFilter(left)}) AND (${analyzeFilter(right)})"
    
  case Or(left, right) =>
    s"Disjunction: (${analyzeFilter(left)}) OR (${analyzeFilter(right)})"
    
  case Not(child) =>
    s"Negation: NOT (${analyzeFilter(child)})"
    
  case IsNull(attr) =>
    s"Null check: $attr IS NULL"
    
  case IsNotNull(attr) =>
    s"Not null check: $attr IS NOT NULL"
    
  case StringStartsWith(attr, value) =>
    s"Prefix match: $attr LIKE '$value%'"
    
  case StringEndsWith(attr, value) =>
    s"Suffix match: $attr LIKE '%$value'"
    
  case StringContains(attr, value) =>
    s"Substring match: $attr LIKE '%$value%'"
    
  case AlwaysTrue() =>
    "Always true"
    
  case AlwaysFalse() =>
    "Always false"
    
  case _ =>
    s"Unknown filter: ${filter.getClass.getSimpleName}"
}

Filter Optimization Utilities

Filter Simplification

object FilterOptimizer {
  
  def simplifyFilter(filter: Filter): Filter = filter match {
    // Identity optimizations
    case And(AlwaysTrue(), right) => simplifyFilter(right)
    case And(left, AlwaysTrue()) => simplifyFilter(left)
    case Or(AlwaysFalse(), right) => simplifyFilter(right)
    case Or(left, AlwaysFalse()) => simplifyFilter(left)
    
    // Contradiction optimizations
    case And(AlwaysFalse(), _) => AlwaysFalse()
    case And(_, AlwaysFalse()) => AlwaysFalse()
    case Or(AlwaysTrue(), _) => AlwaysTrue()
    case Or(_, AlwaysTrue()) => AlwaysTrue()
    
    // Double negation
    case Not(Not(child)) => simplifyFilter(child)
    
    // Recursive simplification
    case And(left, right) =>
      val simplifiedLeft = simplifyFilter(left)
      val simplifiedRight = simplifyFilter(right)
      if (simplifiedLeft != left || simplifiedRight != right) {
        simplifyFilter(And(simplifiedLeft, simplifiedRight))
      } else {
        And(simplifiedLeft, simplifiedRight)
      }
      
    case Or(left, right) =>
      val simplifiedLeft = simplifyFilter(left)
      val simplifiedRight = simplifyFilter(right)
      if (simplifiedLeft != left || simplifiedRight != right) {
        simplifyFilter(Or(simplifiedLeft, simplifiedRight))
      } else {
        Or(simplifiedLeft, simplifiedRight)
      }
      
    case Not(child) =>
      val simplifiedChild = simplifyFilter(child)
      if (simplifiedChild != child) {
        simplifyFilter(Not(simplifiedChild))
      } else {
        Not(simplifiedChild)
      }
      
    case other => other
  }
  
  def extractColumnReferences(filter: Filter): Set[String] = {
    filter.references.toSet
  }
  
  def isSelectiveFilter(filter: Filter): Boolean = filter match {
    case EqualTo(_, _) => true
    case In(_, values) => values.length <= 10
    case And(left, right) => isSelectiveFilter(left) && isSelectiveFilter(right)
    case Or(left, right) => isSelectiveFilter(left) || isSelectiveFilter(right)
    case _ => false
  }
}

Filter Conversion Utilities

object FilterConverter {
  
  def toSqlString(filter: Filter): String = filter match {
    case EqualTo(attr, value) => s"$attr = ${formatValue(value)}"
    case EqualNullSafe(attr, value) => s"$attr <=> ${formatValue(value)}"
    case GreaterThan(attr, value) => s"$attr > ${formatValue(value)}"
    case GreaterThanOrEqual(attr, value) => s"$attr >= ${formatValue(value)}"
    case LessThan(attr, value) => s"$attr < ${formatValue(value)}"
    case LessThanOrEqual(attr, value) => s"$attr <= ${formatValue(value)}"
    case In(attr, values) => s"$attr IN (${values.map(formatValue).mkString(", ")})"
    case IsNull(attr) => s"$attr IS NULL"
    case IsNotNull(attr) => s"$attr IS NOT NULL"
    case And(left, right) => s"(${toSqlString(left)}) AND (${toSqlString(right)})"
    case Or(left, right) => s"(${toSqlString(left)}) OR (${toSqlString(right)})"
    case Not(child) => s"NOT (${toSqlString(child)})"
    case StringStartsWith(attr, value) => s"$attr LIKE '${escapeString(value)}%'"
    case StringEndsWith(attr, value) => s"$attr LIKE '%${escapeString(value)}'"
    case StringContains(attr, value) => s"$attr LIKE '%${escapeString(value)}%'"
    case AlwaysTrue() => "TRUE"
    case AlwaysFalse() => "FALSE"
    case _ => filter.toString
  }
  
  private def formatValue(value: Any): String = value match {
    case null => "NULL"
    case s: String => s"'${escapeString(s)}'"
    case _ => value.toString
  }
  
  private def escapeString(s: String): String = {
    s.replace("'", "''").replace("\\", "\\\\")
  }
  
  def toV2Predicate(filter: Filter): Predicate = {
    // Convert V1 filter to V2 predicate
    filter.toV2
  }
}

Data Source V1 Integration Patterns

Filter Pushdown Implementation

trait PushdownDataSource {
  
  def buildScan(filters: Array[Filter]): RDD[Row] = {
    val (pushable, nonPushable) = partitionFilters(filters)
    
    // Build scan with pushed filters
    val baseRDD = buildScanWithFilters(pushable)
    
    // Apply remaining filters in Spark
    if (nonPushable.nonEmpty) {
      val combinedFilter = nonPushable.reduce(And)
      baseRDD.filter(row => evaluateFilter(combinedFilter, row))
    } else {
      baseRDD
    }
  }
  
  def partitionFilters(filters: Array[Filter]): (Array[Filter], Array[Filter]) = {
    filters.partition(canPushDown)
  }
  
  def canPushDown(filter: Filter): Boolean = filter match {
    case EqualTo(_, _) => true
    case GreaterThan(_, _) => true
    case LessThan(_, _) => true
    case GreaterThanOrEqual(_, _) => true
    case LessThanOrEqual(_, _) => true
    case In(_, _) => true
    case IsNull(_) => true
    case IsNotNull(_) => true
    case And(left, right) => canPushDown(left) && canPushDown(right)
    case Or(left, right) => canPushDown(left) && canPushDown(right)
    case Not(child) => canPushDown(child)
    case _ => false
  }
  
  def buildScanWithFilters(filters: Array[Filter]): RDD[Row]
  
  def evaluateFilter(filter: Filter, row: Row): Boolean
}

Legacy Data Source Implementation

class MyLegacyDataSource extends BaseRelation 
    with TableScan 
    with PrunedFilteredScan 
    with InsertableRelation {
  
  override def schema: StructType = {
    // Define schema for the data source
    StructType(Seq(
      StructField("id", IntegerType, nullable = false),
      StructField("name", StringType, nullable = true),
      StructField("age", IntegerType, nullable = true),
      StructField("status", StringType, nullable = true)
    ))
  }
  
  override def buildScan(): RDD[Row] = {
    buildScan(Array.empty, schema.fieldNames)
  }
  
  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
    // Implement filtered and pruned scan
    val pushableFilters = filters.filter(canPushDown)
    val prunedSchema = StructType(schema.fields.filter(f => requiredColumns.contains(f.name)))
    
    loadDataWithFiltersAndProjection(pushableFilters, prunedSchema)
  }
  
  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
    // Implement data insertion
    if (overwrite) {
      // Clear existing data
      clearData()
    }
    
    // Write new data
    writeData(data)
  }
  
  private def loadDataWithFiltersAndProjection(filters: Array[Filter], 
                                             schema: StructType): RDD[Row] = {
    // Implementation-specific data loading
    // This would typically:
    // 1. Apply filters during data loading
    // 2. Project only required columns
    // 3. Return RDD of rows
    ???
  }
  
  private def canPushDown(filter: Filter): Boolean = {
    // Define which filters can be pushed to the data source
    filter match {
      case EqualTo(_, _) | GreaterThan(_, _) | LessThan(_, _) => true
      case In(_, _) | IsNull(_) | IsNotNull(_) => true
      case And(left, right) => canPushDown(left) && canPushDown(right)
      case _ => false
    }
  }
}

Migration from V1 to V2

Filter Conversion Helper

object V1ToV2Migration {
  
  def convertFilter(v1Filter: Filter): Predicate = v1Filter match {
    case EqualTo(attr, value) =>
      new org.apache.spark.sql.connector.expressions.filter.EqualTo(
        Expressions.column(attr),
        Expressions.literal(value)
      )
      
    case GreaterThan(attr, value) =>
      new org.apache.spark.sql.connector.expressions.filter.GreaterThan(
        Expressions.column(attr),
        Expressions.literal(value)
      )
      
    case LessThan(attr, value) =>
      new org.apache.spark.sql.connector.expressions.filter.LessThan(
        Expressions.column(attr),
        Expressions.literal(value)
      )
      
    case In(attr, values) =>
      new org.apache.spark.sql.connector.expressions.filter.In(
        Expressions.column(attr),
        values.map(Expressions.literal)
      )
      
    case IsNull(attr) =>
      new org.apache.spark.sql.connector.expressions.filter.IsNull(
        Expressions.column(attr)
      )
      
    case And(left, right) =>
      new org.apache.spark.sql.connector.expressions.filter.And(
        convertFilter(left),
        convertFilter(right)
      )
      
    case Or(left, right) =>
      new org.apache.spark.sql.connector.expressions.filter.Or(
        convertFilter(left),
        convertFilter(right)
      )
      
    case Not(child) =>
      new org.apache.spark.sql.connector.expressions.filter.Not(
        convertFilter(child)
      )
      
    case AlwaysTrue() =>
      new org.apache.spark.sql.connector.expressions.filter.AlwaysTrue()
      
    case AlwaysFalse() =>
      new org.apache.spark.sql.connector.expressions.filter.AlwaysFalse()
      
    case _ =>
      throw new UnsupportedOperationException(s"Cannot convert filter: $v1Filter")
  }
  
  def migrateDataSource(v1Source: BaseRelation): Table = {
    new V2TableWrapper(v1Source)
  }
}

class V2TableWrapper(v1Source: BaseRelation) extends Table with SupportsRead {
  
  override def name(): String = v1Source.getClass.getSimpleName
  
  override def schema(): StructType = v1Source.schema
  
  override def capabilities(): java.util.Set[TableCapability] = {
    java.util.Set.of(TableCapability.BATCH_READ)
  }
  
  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
    new V1CompatScanBuilder(v1Source, schema())
  }
}

Best Practices and Performance

Filter Optimization Strategies

object FilterOptimizationStrategies {
  
  def optimizeFilterOrder(filters: Array[Filter]): Array[Filter] = {
    // Order filters by selectivity (most selective first)
    filters.sortBy(calculateSelectivity)
  }
  
  private def calculateSelectivity(filter: Filter): Double = filter match {
    case EqualTo(_, _) => 0.01 // Very selective
    case In(_, values) => Math.min(0.1, values.length * 0.01) // Based on value count
    case IsNull(_) => 0.05 // Usually selective
    case IsNotNull(_) => 0.95 // Usually not selective
    case GreaterThan(_, _) | LessThan(_, _) => 0.3 // Moderately selective
    case StringStartsWith(_, _) => 0.1 // Quite selective
    case StringContains(_, _) => 0.2 // Less selective
    case And(left, right) => calculateSelectivity(left) * calculateSelectivity(right)
    case Or(left, right) => calculateSelectivity(left) + calculateSelectivity(right) - 
                           (calculateSelectivity(left) * calculateSelectivity(right))
    case Not(child) => 1.0 - calculateSelectivity(child)
    case _ => 0.5 // Default moderate selectivity
  }
  
  def canUseIndex(filter: Filter, indexedColumns: Set[String]): Boolean = {
    filter.references.exists(indexedColumns.contains)
  }
}

Legacy Performance Considerations

trait PerformantV1DataSource extends BaseRelation with PrunedFilteredScan {
  
  // Cache parsed filters to avoid repeated parsing
  private val filterCache = new ConcurrentHashMap[Array[Filter], Array[Filter]]()
  
  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
    // Use cached filter analysis
    val optimizedFilters = filterCache.computeIfAbsent(filters, optimizeFilters)
    
    // Minimize data transfer by projecting early
    val projectedRDD = loadData(optimizedFilters)
    
    // Apply column pruning
    if (requiredColumns.length < schema.fields.length) {
      val columnIndices = requiredColumns.map(schema.fieldIndex)
      projectedRDD.map(row => Row.fromSeq(columnIndices.map(row.get)))
    } else {
      projectedRDD
    }
  }
  
  private def optimizeFilters(filters: Array[Filter]): Array[Filter] = {
    filters
      .map(FilterOptimizer.simplifyFilter)
      .filter(_ != AlwaysTrue())
  }
}

The Legacy Data Source V1 APIs provide essential compatibility for existing Spark integrations while offering a migration path to the more powerful and flexible V2 APIs. Understanding these APIs is crucial for maintaining and upgrading existing Spark data source implementations.