0
# Legacy Data Source V1 APIs
1
2
The Legacy Data Source V1 APIs provide backward compatibility with older Spark data source implementations. While these APIs are deprecated in favor of Data Source V2, they are still widely used and supported for existing implementations.
3
4
## Filter System
5
6
### Filter Base Class
7
8
The core filter abstraction for predicate pushdown:
9
10
```scala { .api }
11
package org.apache.spark.sql.sources
12
13
abstract class Filter {
14
/**
15
* List of columns referenced by this filter
16
*/
17
def references: Array[String]
18
19
/**
20
* V2-style references (supporting nested fields)
21
*/
22
def v2references: Array[Array[String]]
23
24
/**
25
* Convert to V2 predicate format
26
*/
27
private[sql] def toV2: Predicate
28
}
29
```
30
31
### Comparison Filters
32
33
#### EqualTo
34
```scala { .api }
35
case class EqualTo(attribute: String, value: Any) extends Filter {
36
override def references: Array[String] = Array(attribute)
37
}
38
```
39
40
#### EqualNullSafe
41
```scala { .api }
42
case class EqualNullSafe(attribute: String, value: Any) extends Filter {
43
override def references: Array[String] = Array(attribute)
44
}
45
```
46
47
#### Inequality Filters
48
```scala { .api }
49
case class GreaterThan(attribute: String, value: Any) extends Filter {
50
override def references: Array[String] = Array(attribute)
51
}
52
53
case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter {
54
override def references: Array[String] = Array(attribute)
55
}
56
57
case class LessThan(attribute: String, value: Any) extends Filter {
58
override def references: Array[String] = Array(attribute)
59
}
60
61
case class LessThanOrEqual(attribute: String, value: Any) extends Filter {
62
override def references: Array[String] = Array(attribute)
63
}
64
```
65
66
### Set-Based Filters
67
68
#### In Filter
69
```scala { .api }
70
case class In(attribute: String, values: Array[Any]) extends Filter {
71
override def references: Array[String] = Array(attribute)
72
}
73
```
74
75
#### IsNull and IsNotNull
76
```scala { .api }
77
case class IsNull(attribute: String) extends Filter {
78
override def references: Array[String] = Array(attribute)
79
}
80
81
case class IsNotNull(attribute: String) extends Filter {
82
override def references: Array[String] = Array(attribute)
83
}
84
```
85
86
### Logical Filters
87
88
#### And Filter
89
```scala { .api }
90
case class And(left: Filter, right: Filter) extends Filter {
91
override def references: Array[String] = left.references ++ right.references
92
}
93
```
94
95
#### Or Filter
96
```scala { .api }
97
case class Or(left: Filter, right: Filter) extends Filter {
98
override def references: Array[String] = left.references ++ right.references
99
}
100
```
101
102
#### Not Filter
103
```scala { .api }
104
case class Not(child: Filter) extends Filter {
105
override def references: Array[String] = child.references
106
}
107
```
108
109
### String Pattern Filters
110
111
#### StringStartsWith
112
```scala { .api }
113
case class StringStartsWith(attribute: String, value: String) extends Filter {
114
override def references: Array[String] = Array(attribute)
115
}
116
```
117
118
#### StringEndsWith
119
```scala { .api }
120
case class StringEndsWith(attribute: String, value: String) extends Filter {
121
override def references: Array[String] = Array(attribute)
122
}
123
```
124
125
#### StringContains
126
```scala { .api }
127
case class StringContains(attribute: String, value: String) extends Filter {
128
override def references: Array[String] = Array(attribute)
129
}
130
```
131
132
### Constant Filters
133
134
#### AlwaysTrue and AlwaysFalse
135
```scala { .api }
136
case class AlwaysTrue() extends Filter {
137
override def references: Array[String] = Array.empty
138
}
139
140
case class AlwaysFalse() extends Filter {
141
override def references: Array[String] = Array.empty
142
}
143
```
144
145
## Filter Usage Examples
146
147
### Basic Filter Construction
148
149
```scala
150
import org.apache.spark.sql.sources._
151
152
// Comparison filters
153
val equalFilter = EqualTo("status", "active")
154
val nullSafeEqual = EqualNullSafe("status", "active")
155
val greaterThan = GreaterThan("age", 18)
156
val lessThanOrEqual = LessThanOrEqual("age", 65)
157
158
// Set-based filters
159
val inFilter = In("category", Array("A", "B", "C"))
160
val isNullFilter = IsNull("description")
161
val isNotNullFilter = IsNotNull("email")
162
163
// String pattern filters
164
val startsWithFilter = StringStartsWith("name", "John")
165
val endsWithFilter = StringEndsWith("email", "@company.com")
166
val containsFilter = StringContains("description", "important")
167
```
168
169
### Complex Filter Combinations
170
171
```scala
172
// Logical combinations
173
val activeAdults = And(
174
EqualTo("status", "active"),
175
GreaterThan("age", 18)
176
)
177
178
val eligibleUsers = Or(
179
And(EqualTo("status", "active"), GreaterThan("age", 18)),
180
EqualTo("priority", "VIP")
181
)
182
183
val complexFilter = And(
184
Or(
185
EqualTo("category", "premium"),
186
In("tier", Array("gold", "platinum"))
187
),
188
And(
189
IsNotNull("email"),
190
Not(StringContains("email", "temp"))
191
)
192
)
193
194
// Age range filter
195
val workingAge = And(
196
GreaterThanOrEqual("age", 18),
197
LessThan("age", 65)
198
)
199
```
200
201
### Filter Pattern Matching
202
203
```scala
204
def analyzeFilter(filter: Filter): String = filter match {
205
case EqualTo(attr, value) =>
206
s"Equality check: $attr = $value"
207
208
case GreaterThan(attr, value) =>
209
s"Range filter: $attr > $value"
210
211
case In(attr, values) =>
212
s"Set membership: $attr IN [${values.mkString(", ")}]"
213
214
case And(left, right) =>
215
s"Conjunction: (${analyzeFilter(left)}) AND (${analyzeFilter(right)})"
216
217
case Or(left, right) =>
218
s"Disjunction: (${analyzeFilter(left)}) OR (${analyzeFilter(right)})"
219
220
case Not(child) =>
221
s"Negation: NOT (${analyzeFilter(child)})"
222
223
case IsNull(attr) =>
224
s"Null check: $attr IS NULL"
225
226
case IsNotNull(attr) =>
227
s"Not null check: $attr IS NOT NULL"
228
229
case StringStartsWith(attr, value) =>
230
s"Prefix match: $attr LIKE '$value%'"
231
232
case StringEndsWith(attr, value) =>
233
s"Suffix match: $attr LIKE '%$value'"
234
235
case StringContains(attr, value) =>
236
s"Substring match: $attr LIKE '%$value%'"
237
238
case AlwaysTrue() =>
239
"Always true"
240
241
case AlwaysFalse() =>
242
"Always false"
243
244
case _ =>
245
s"Unknown filter: ${filter.getClass.getSimpleName}"
246
}
247
```
248
249
## Filter Optimization Utilities
250
251
### Filter Simplification
252
253
```scala
254
object FilterOptimizer {
255
256
def simplifyFilter(filter: Filter): Filter = filter match {
257
// Identity optimizations
258
case And(AlwaysTrue(), right) => simplifyFilter(right)
259
case And(left, AlwaysTrue()) => simplifyFilter(left)
260
case Or(AlwaysFalse(), right) => simplifyFilter(right)
261
case Or(left, AlwaysFalse()) => simplifyFilter(left)
262
263
// Contradiction optimizations
264
case And(AlwaysFalse(), _) => AlwaysFalse()
265
case And(_, AlwaysFalse()) => AlwaysFalse()
266
case Or(AlwaysTrue(), _) => AlwaysTrue()
267
case Or(_, AlwaysTrue()) => AlwaysTrue()
268
269
// Double negation
270
case Not(Not(child)) => simplifyFilter(child)
271
272
// Recursive simplification
273
case And(left, right) =>
274
val simplifiedLeft = simplifyFilter(left)
275
val simplifiedRight = simplifyFilter(right)
276
if (simplifiedLeft != left || simplifiedRight != right) {
277
simplifyFilter(And(simplifiedLeft, simplifiedRight))
278
} else {
279
And(simplifiedLeft, simplifiedRight)
280
}
281
282
case Or(left, right) =>
283
val simplifiedLeft = simplifyFilter(left)
284
val simplifiedRight = simplifyFilter(right)
285
if (simplifiedLeft != left || simplifiedRight != right) {
286
simplifyFilter(Or(simplifiedLeft, simplifiedRight))
287
} else {
288
Or(simplifiedLeft, simplifiedRight)
289
}
290
291
case Not(child) =>
292
val simplifiedChild = simplifyFilter(child)
293
if (simplifiedChild != child) {
294
simplifyFilter(Not(simplifiedChild))
295
} else {
296
Not(simplifiedChild)
297
}
298
299
case other => other
300
}
301
302
def extractColumnReferences(filter: Filter): Set[String] = {
303
filter.references.toSet
304
}
305
306
def isSelectiveFilter(filter: Filter): Boolean = filter match {
307
case EqualTo(_, _) => true
308
case In(_, values) => values.length <= 10
309
case And(left, right) => isSelectiveFilter(left) && isSelectiveFilter(right)
310
case Or(left, right) => isSelectiveFilter(left) || isSelectiveFilter(right)
311
case _ => false
312
}
313
}
314
```
315
316
### Filter Conversion Utilities
317
318
```scala
319
object FilterConverter {
320
321
def toSqlString(filter: Filter): String = filter match {
322
case EqualTo(attr, value) => s"$attr = ${formatValue(value)}"
323
case EqualNullSafe(attr, value) => s"$attr <=> ${formatValue(value)}"
324
case GreaterThan(attr, value) => s"$attr > ${formatValue(value)}"
325
case GreaterThanOrEqual(attr, value) => s"$attr >= ${formatValue(value)}"
326
case LessThan(attr, value) => s"$attr < ${formatValue(value)}"
327
case LessThanOrEqual(attr, value) => s"$attr <= ${formatValue(value)}"
328
case In(attr, values) => s"$attr IN (${values.map(formatValue).mkString(", ")})"
329
case IsNull(attr) => s"$attr IS NULL"
330
case IsNotNull(attr) => s"$attr IS NOT NULL"
331
case And(left, right) => s"(${toSqlString(left)}) AND (${toSqlString(right)})"
332
case Or(left, right) => s"(${toSqlString(left)}) OR (${toSqlString(right)})"
333
case Not(child) => s"NOT (${toSqlString(child)})"
334
case StringStartsWith(attr, value) => s"$attr LIKE '${escapeString(value)}%'"
335
case StringEndsWith(attr, value) => s"$attr LIKE '%${escapeString(value)}'"
336
case StringContains(attr, value) => s"$attr LIKE '%${escapeString(value)}%'"
337
case AlwaysTrue() => "TRUE"
338
case AlwaysFalse() => "FALSE"
339
case _ => filter.toString
340
}
341
342
private def formatValue(value: Any): String = value match {
343
case null => "NULL"
344
case s: String => s"'${escapeString(s)}'"
345
case _ => value.toString
346
}
347
348
private def escapeString(s: String): String = {
349
s.replace("'", "''").replace("\\", "\\\\")
350
}
351
352
def toV2Predicate(filter: Filter): Predicate = {
353
// Convert V1 filter to V2 predicate
354
filter.toV2
355
}
356
}
357
```
358
359
## Data Source V1 Integration Patterns
360
361
### Filter Pushdown Implementation
362
363
```scala
364
trait PushdownDataSource {
365
366
def buildScan(filters: Array[Filter]): RDD[Row] = {
367
val (pushable, nonPushable) = partitionFilters(filters)
368
369
// Build scan with pushed filters
370
val baseRDD = buildScanWithFilters(pushable)
371
372
// Apply remaining filters in Spark
373
if (nonPushable.nonEmpty) {
374
val combinedFilter = nonPushable.reduce(And)
375
baseRDD.filter(row => evaluateFilter(combinedFilter, row))
376
} else {
377
baseRDD
378
}
379
}
380
381
def partitionFilters(filters: Array[Filter]): (Array[Filter], Array[Filter]) = {
382
filters.partition(canPushDown)
383
}
384
385
def canPushDown(filter: Filter): Boolean = filter match {
386
case EqualTo(_, _) => true
387
case GreaterThan(_, _) => true
388
case LessThan(_, _) => true
389
case GreaterThanOrEqual(_, _) => true
390
case LessThanOrEqual(_, _) => true
391
case In(_, _) => true
392
case IsNull(_) => true
393
case IsNotNull(_) => true
394
case And(left, right) => canPushDown(left) && canPushDown(right)
395
case Or(left, right) => canPushDown(left) && canPushDown(right)
396
case Not(child) => canPushDown(child)
397
case _ => false
398
}
399
400
def buildScanWithFilters(filters: Array[Filter]): RDD[Row]
401
402
def evaluateFilter(filter: Filter, row: Row): Boolean
403
}
404
```
405
406
### Legacy Data Source Implementation
407
408
```scala
409
class MyLegacyDataSource extends BaseRelation
410
with TableScan
411
with PrunedFilteredScan
412
with InsertableRelation {
413
414
override def schema: StructType = {
415
// Define schema for the data source
416
StructType(Seq(
417
StructField("id", IntegerType, nullable = false),
418
StructField("name", StringType, nullable = true),
419
StructField("age", IntegerType, nullable = true),
420
StructField("status", StringType, nullable = true)
421
))
422
}
423
424
override def buildScan(): RDD[Row] = {
425
buildScan(Array.empty, schema.fieldNames)
426
}
427
428
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
429
// Implement filtered and pruned scan
430
val pushableFilters = filters.filter(canPushDown)
431
val prunedSchema = StructType(schema.fields.filter(f => requiredColumns.contains(f.name)))
432
433
loadDataWithFiltersAndProjection(pushableFilters, prunedSchema)
434
}
435
436
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
437
// Implement data insertion
438
if (overwrite) {
439
// Clear existing data
440
clearData()
441
}
442
443
// Write new data
444
writeData(data)
445
}
446
447
private def loadDataWithFiltersAndProjection(filters: Array[Filter],
448
schema: StructType): RDD[Row] = {
449
// Implementation-specific data loading
450
// This would typically:
451
// 1. Apply filters during data loading
452
// 2. Project only required columns
453
// 3. Return RDD of rows
454
???
455
}
456
457
private def canPushDown(filter: Filter): Boolean = {
458
// Define which filters can be pushed to the data source
459
filter match {
460
case EqualTo(_, _) | GreaterThan(_, _) | LessThan(_, _) => true
461
case In(_, _) | IsNull(_) | IsNotNull(_) => true
462
case And(left, right) => canPushDown(left) && canPushDown(right)
463
case _ => false
464
}
465
}
466
}
467
```
468
469
## Migration from V1 to V2
470
471
### Filter Conversion Helper
472
473
```scala
474
object V1ToV2Migration {
475
476
def convertFilter(v1Filter: Filter): Predicate = v1Filter match {
477
case EqualTo(attr, value) =>
478
new org.apache.spark.sql.connector.expressions.filter.EqualTo(
479
Expressions.column(attr),
480
Expressions.literal(value)
481
)
482
483
case GreaterThan(attr, value) =>
484
new org.apache.spark.sql.connector.expressions.filter.GreaterThan(
485
Expressions.column(attr),
486
Expressions.literal(value)
487
)
488
489
case LessThan(attr, value) =>
490
new org.apache.spark.sql.connector.expressions.filter.LessThan(
491
Expressions.column(attr),
492
Expressions.literal(value)
493
)
494
495
case In(attr, values) =>
496
new org.apache.spark.sql.connector.expressions.filter.In(
497
Expressions.column(attr),
498
values.map(Expressions.literal)
499
)
500
501
case IsNull(attr) =>
502
new org.apache.spark.sql.connector.expressions.filter.IsNull(
503
Expressions.column(attr)
504
)
505
506
case And(left, right) =>
507
new org.apache.spark.sql.connector.expressions.filter.And(
508
convertFilter(left),
509
convertFilter(right)
510
)
511
512
case Or(left, right) =>
513
new org.apache.spark.sql.connector.expressions.filter.Or(
514
convertFilter(left),
515
convertFilter(right)
516
)
517
518
case Not(child) =>
519
new org.apache.spark.sql.connector.expressions.filter.Not(
520
convertFilter(child)
521
)
522
523
case AlwaysTrue() =>
524
new org.apache.spark.sql.connector.expressions.filter.AlwaysTrue()
525
526
case AlwaysFalse() =>
527
new org.apache.spark.sql.connector.expressions.filter.AlwaysFalse()
528
529
case _ =>
530
throw new UnsupportedOperationException(s"Cannot convert filter: $v1Filter")
531
}
532
533
def migrateDataSource(v1Source: BaseRelation): Table = {
534
new V2TableWrapper(v1Source)
535
}
536
}
537
538
class V2TableWrapper(v1Source: BaseRelation) extends Table with SupportsRead {
539
540
override def name(): String = v1Source.getClass.getSimpleName
541
542
override def schema(): StructType = v1Source.schema
543
544
override def capabilities(): java.util.Set[TableCapability] = {
545
java.util.Set.of(TableCapability.BATCH_READ)
546
}
547
548
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
549
new V1CompatScanBuilder(v1Source, schema())
550
}
551
}
552
```
553
554
## Best Practices and Performance
555
556
### Filter Optimization Strategies
557
558
```scala
559
object FilterOptimizationStrategies {
560
561
def optimizeFilterOrder(filters: Array[Filter]): Array[Filter] = {
562
// Order filters by selectivity (most selective first)
563
filters.sortBy(calculateSelectivity)
564
}
565
566
private def calculateSelectivity(filter: Filter): Double = filter match {
567
case EqualTo(_, _) => 0.01 // Very selective
568
case In(_, values) => Math.min(0.1, values.length * 0.01) // Based on value count
569
case IsNull(_) => 0.05 // Usually selective
570
case IsNotNull(_) => 0.95 // Usually not selective
571
case GreaterThan(_, _) | LessThan(_, _) => 0.3 // Moderately selective
572
case StringStartsWith(_, _) => 0.1 // Quite selective
573
case StringContains(_, _) => 0.2 // Less selective
574
case And(left, right) => calculateSelectivity(left) * calculateSelectivity(right)
575
case Or(left, right) => calculateSelectivity(left) + calculateSelectivity(right) -
576
(calculateSelectivity(left) * calculateSelectivity(right))
577
case Not(child) => 1.0 - calculateSelectivity(child)
578
case _ => 0.5 // Default moderate selectivity
579
}
580
581
def canUseIndex(filter: Filter, indexedColumns: Set[String]): Boolean = {
582
filter.references.exists(indexedColumns.contains)
583
}
584
}
585
```
586
587
### Legacy Performance Considerations
588
589
```scala
590
trait PerformantV1DataSource extends BaseRelation with PrunedFilteredScan {
591
592
// Cache parsed filters to avoid repeated parsing
593
private val filterCache = new ConcurrentHashMap[Array[Filter], Array[Filter]]()
594
595
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
596
// Use cached filter analysis
597
val optimizedFilters = filterCache.computeIfAbsent(filters, optimizeFilters)
598
599
// Minimize data transfer by projecting early
600
val projectedRDD = loadData(optimizedFilters)
601
602
// Apply column pruning
603
if (requiredColumns.length < schema.fields.length) {
604
val columnIndices = requiredColumns.map(schema.fieldIndex)
605
projectedRDD.map(row => Row.fromSeq(columnIndices.map(row.get)))
606
} else {
607
projectedRDD
608
}
609
}
610
611
private def optimizeFilters(filters: Array[Filter]): Array[Filter] = {
612
filters
613
.map(FilterOptimizer.simplifyFilter)
614
.filter(_ != AlwaysTrue())
615
}
616
}
617
```
618
619
The Legacy Data Source V1 APIs provide essential compatibility for existing Spark integrations while offering a migration path to the more powerful and flexible V2 APIs. Understanding these APIs is crucial for maintaining and upgrading existing Spark data source implementations.