or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md

legacy-data-source-v1.mddocs/

0

# Legacy Data Source V1 APIs

1

2

The Legacy Data Source V1 APIs provide backward compatibility with older Spark data source implementations. While these APIs are deprecated in favor of Data Source V2, they are still widely used and supported for existing implementations.

3

4

## Filter System

5

6

### Filter Base Class

7

8

The core filter abstraction for predicate pushdown:

9

10

```scala { .api }

11

package org.apache.spark.sql.sources

12

13

abstract class Filter {

14

/**

15

* List of columns referenced by this filter

16

*/

17

def references: Array[String]

18

19

/**

20

* V2-style references (supporting nested fields)

21

*/

22

def v2references: Array[Array[String]]

23

24

/**

25

* Convert to V2 predicate format

26

*/

27

private[sql] def toV2: Predicate

28

}

29

```

30

31

### Comparison Filters

32

33

#### EqualTo

34

```scala { .api }

35

case class EqualTo(attribute: String, value: Any) extends Filter {

36

override def references: Array[String] = Array(attribute)

37

}

38

```

39

40

#### EqualNullSafe

41

```scala { .api }

42

case class EqualNullSafe(attribute: String, value: Any) extends Filter {

43

override def references: Array[String] = Array(attribute)

44

}

45

```

46

47

#### Inequality Filters

48

```scala { .api }

49

case class GreaterThan(attribute: String, value: Any) extends Filter {

50

override def references: Array[String] = Array(attribute)

51

}

52

53

case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter {

54

override def references: Array[String] = Array(attribute)

55

}

56

57

case class LessThan(attribute: String, value: Any) extends Filter {

58

override def references: Array[String] = Array(attribute)

59

}

60

61

case class LessThanOrEqual(attribute: String, value: Any) extends Filter {

62

override def references: Array[String] = Array(attribute)

63

}

64

```

65

66

### Set-Based Filters

67

68

#### In Filter

69

```scala { .api }

70

case class In(attribute: String, values: Array[Any]) extends Filter {

71

override def references: Array[String] = Array(attribute)

72

}

73

```

74

75

#### IsNull and IsNotNull

76

```scala { .api }

77

case class IsNull(attribute: String) extends Filter {

78

override def references: Array[String] = Array(attribute)

79

}

80

81

case class IsNotNull(attribute: String) extends Filter {

82

override def references: Array[String] = Array(attribute)

83

}

84

```

85

86

### Logical Filters

87

88

#### And Filter

89

```scala { .api }

90

case class And(left: Filter, right: Filter) extends Filter {

91

override def references: Array[String] = left.references ++ right.references

92

}

93

```

94

95

#### Or Filter

96

```scala { .api }

97

case class Or(left: Filter, right: Filter) extends Filter {

98

override def references: Array[String] = left.references ++ right.references

99

}

100

```

101

102

#### Not Filter

103

```scala { .api }

104

case class Not(child: Filter) extends Filter {

105

override def references: Array[String] = child.references

106

}

107

```

108

109

### String Pattern Filters

110

111

#### StringStartsWith

112

```scala { .api }

113

case class StringStartsWith(attribute: String, value: String) extends Filter {

114

override def references: Array[String] = Array(attribute)

115

}

116

```

117

118

#### StringEndsWith

119

```scala { .api }

120

case class StringEndsWith(attribute: String, value: String) extends Filter {

121

override def references: Array[String] = Array(attribute)

122

}

123

```

124

125

#### StringContains

126

```scala { .api }

127

case class StringContains(attribute: String, value: String) extends Filter {

128

override def references: Array[String] = Array(attribute)

129

}

130

```

131

132

### Constant Filters

133

134

#### AlwaysTrue and AlwaysFalse

135

```scala { .api }

136

case class AlwaysTrue() extends Filter {

137

override def references: Array[String] = Array.empty

138

}

139

140

case class AlwaysFalse() extends Filter {

141

override def references: Array[String] = Array.empty

142

}

143

```

144

145

## Filter Usage Examples

146

147

### Basic Filter Construction

148

149

```scala

150

import org.apache.spark.sql.sources._

151

152

// Comparison filters

153

val equalFilter = EqualTo("status", "active")

154

val nullSafeEqual = EqualNullSafe("status", "active")

155

val greaterThan = GreaterThan("age", 18)

156

val lessThanOrEqual = LessThanOrEqual("age", 65)

157

158

// Set-based filters

159

val inFilter = In("category", Array("A", "B", "C"))

160

val isNullFilter = IsNull("description")

161

val isNotNullFilter = IsNotNull("email")

162

163

// String pattern filters

164

val startsWithFilter = StringStartsWith("name", "John")

165

val endsWithFilter = StringEndsWith("email", "@company.com")

166

val containsFilter = StringContains("description", "important")

167

```

168

169

### Complex Filter Combinations

170

171

```scala

172

// Logical combinations

173

val activeAdults = And(

174

EqualTo("status", "active"),

175

GreaterThan("age", 18)

176

)

177

178

val eligibleUsers = Or(

179

And(EqualTo("status", "active"), GreaterThan("age", 18)),

180

EqualTo("priority", "VIP")

181

)

182

183

val complexFilter = And(

184

Or(

185

EqualTo("category", "premium"),

186

In("tier", Array("gold", "platinum"))

187

),

188

And(

189

IsNotNull("email"),

190

Not(StringContains("email", "temp"))

191

)

192

)

193

194

// Age range filter

195

val workingAge = And(

196

GreaterThanOrEqual("age", 18),

197

LessThan("age", 65)

198

)

199

```

200

201

### Filter Pattern Matching

202

203

```scala

204

def analyzeFilter(filter: Filter): String = filter match {

205

case EqualTo(attr, value) =>

206

s"Equality check: $attr = $value"

207

208

case GreaterThan(attr, value) =>

209

s"Range filter: $attr > $value"

210

211

case In(attr, values) =>

212

s"Set membership: $attr IN [${values.mkString(", ")}]"

213

214

case And(left, right) =>

215

s"Conjunction: (${analyzeFilter(left)}) AND (${analyzeFilter(right)})"

216

217

case Or(left, right) =>

218

s"Disjunction: (${analyzeFilter(left)}) OR (${analyzeFilter(right)})"

219

220

case Not(child) =>

221

s"Negation: NOT (${analyzeFilter(child)})"

222

223

case IsNull(attr) =>

224

s"Null check: $attr IS NULL"

225

226

case IsNotNull(attr) =>

227

s"Not null check: $attr IS NOT NULL"

228

229

case StringStartsWith(attr, value) =>

230

s"Prefix match: $attr LIKE '$value%'"

231

232

case StringEndsWith(attr, value) =>

233

s"Suffix match: $attr LIKE '%$value'"

234

235

case StringContains(attr, value) =>

236

s"Substring match: $attr LIKE '%$value%'"

237

238

case AlwaysTrue() =>

239

"Always true"

240

241

case AlwaysFalse() =>

242

"Always false"

243

244

case _ =>

245

s"Unknown filter: ${filter.getClass.getSimpleName}"

246

}

247

```

248

249

## Filter Optimization Utilities

250

251

### Filter Simplification

252

253

```scala

254

object FilterOptimizer {

255

256

def simplifyFilter(filter: Filter): Filter = filter match {

257

// Identity optimizations

258

case And(AlwaysTrue(), right) => simplifyFilter(right)

259

case And(left, AlwaysTrue()) => simplifyFilter(left)

260

case Or(AlwaysFalse(), right) => simplifyFilter(right)

261

case Or(left, AlwaysFalse()) => simplifyFilter(left)

262

263

// Contradiction optimizations

264

case And(AlwaysFalse(), _) => AlwaysFalse()

265

case And(_, AlwaysFalse()) => AlwaysFalse()

266

case Or(AlwaysTrue(), _) => AlwaysTrue()

267

case Or(_, AlwaysTrue()) => AlwaysTrue()

268

269

// Double negation

270

case Not(Not(child)) => simplifyFilter(child)

271

272

// Recursive simplification

273

case And(left, right) =>

274

val simplifiedLeft = simplifyFilter(left)

275

val simplifiedRight = simplifyFilter(right)

276

if (simplifiedLeft != left || simplifiedRight != right) {

277

simplifyFilter(And(simplifiedLeft, simplifiedRight))

278

} else {

279

And(simplifiedLeft, simplifiedRight)

280

}

281

282

case Or(left, right) =>

283

val simplifiedLeft = simplifyFilter(left)

284

val simplifiedRight = simplifyFilter(right)

285

if (simplifiedLeft != left || simplifiedRight != right) {

286

simplifyFilter(Or(simplifiedLeft, simplifiedRight))

287

} else {

288

Or(simplifiedLeft, simplifiedRight)

289

}

290

291

case Not(child) =>

292

val simplifiedChild = simplifyFilter(child)

293

if (simplifiedChild != child) {

294

simplifyFilter(Not(simplifiedChild))

295

} else {

296

Not(simplifiedChild)

297

}

298

299

case other => other

300

}

301

302

def extractColumnReferences(filter: Filter): Set[String] = {

303

filter.references.toSet

304

}

305

306

def isSelectiveFilter(filter: Filter): Boolean = filter match {

307

case EqualTo(_, _) => true

308

case In(_, values) => values.length <= 10

309

case And(left, right) => isSelectiveFilter(left) && isSelectiveFilter(right)

310

case Or(left, right) => isSelectiveFilter(left) || isSelectiveFilter(right)

311

case _ => false

312

}

313

}

314

```

315

316

### Filter Conversion Utilities

317

318

```scala

319

object FilterConverter {

320

321

def toSqlString(filter: Filter): String = filter match {

322

case EqualTo(attr, value) => s"$attr = ${formatValue(value)}"

323

case EqualNullSafe(attr, value) => s"$attr <=> ${formatValue(value)}"

324

case GreaterThan(attr, value) => s"$attr > ${formatValue(value)}"

325

case GreaterThanOrEqual(attr, value) => s"$attr >= ${formatValue(value)}"

326

case LessThan(attr, value) => s"$attr < ${formatValue(value)}"

327

case LessThanOrEqual(attr, value) => s"$attr <= ${formatValue(value)}"

328

case In(attr, values) => s"$attr IN (${values.map(formatValue).mkString(", ")})"

329

case IsNull(attr) => s"$attr IS NULL"

330

case IsNotNull(attr) => s"$attr IS NOT NULL"

331

case And(left, right) => s"(${toSqlString(left)}) AND (${toSqlString(right)})"

332

case Or(left, right) => s"(${toSqlString(left)}) OR (${toSqlString(right)})"

333

case Not(child) => s"NOT (${toSqlString(child)})"

334

case StringStartsWith(attr, value) => s"$attr LIKE '${escapeString(value)}%'"

335

case StringEndsWith(attr, value) => s"$attr LIKE '%${escapeString(value)}'"

336

case StringContains(attr, value) => s"$attr LIKE '%${escapeString(value)}%'"

337

case AlwaysTrue() => "TRUE"

338

case AlwaysFalse() => "FALSE"

339

case _ => filter.toString

340

}

341

342

private def formatValue(value: Any): String = value match {

343

case null => "NULL"

344

case s: String => s"'${escapeString(s)}'"

345

case _ => value.toString

346

}

347

348

private def escapeString(s: String): String = {

349

s.replace("'", "''").replace("\\", "\\\\")

350

}

351

352

def toV2Predicate(filter: Filter): Predicate = {

353

// Convert V1 filter to V2 predicate

354

filter.toV2

355

}

356

}

357

```

358

359

## Data Source V1 Integration Patterns

360

361

### Filter Pushdown Implementation

362

363

```scala

364

trait PushdownDataSource {

365

366

def buildScan(filters: Array[Filter]): RDD[Row] = {

367

val (pushable, nonPushable) = partitionFilters(filters)

368

369

// Build scan with pushed filters

370

val baseRDD = buildScanWithFilters(pushable)

371

372

// Apply remaining filters in Spark

373

if (nonPushable.nonEmpty) {

374

val combinedFilter = nonPushable.reduce(And)

375

baseRDD.filter(row => evaluateFilter(combinedFilter, row))

376

} else {

377

baseRDD

378

}

379

}

380

381

def partitionFilters(filters: Array[Filter]): (Array[Filter], Array[Filter]) = {

382

filters.partition(canPushDown)

383

}

384

385

def canPushDown(filter: Filter): Boolean = filter match {

386

case EqualTo(_, _) => true

387

case GreaterThan(_, _) => true

388

case LessThan(_, _) => true

389

case GreaterThanOrEqual(_, _) => true

390

case LessThanOrEqual(_, _) => true

391

case In(_, _) => true

392

case IsNull(_) => true

393

case IsNotNull(_) => true

394

case And(left, right) => canPushDown(left) && canPushDown(right)

395

case Or(left, right) => canPushDown(left) && canPushDown(right)

396

case Not(child) => canPushDown(child)

397

case _ => false

398

}

399

400

def buildScanWithFilters(filters: Array[Filter]): RDD[Row]

401

402

def evaluateFilter(filter: Filter, row: Row): Boolean

403

}

404

```

405

406

### Legacy Data Source Implementation

407

408

```scala

409

class MyLegacyDataSource extends BaseRelation

410

with TableScan

411

with PrunedFilteredScan

412

with InsertableRelation {

413

414

override def schema: StructType = {

415

// Define schema for the data source

416

StructType(Seq(

417

StructField("id", IntegerType, nullable = false),

418

StructField("name", StringType, nullable = true),

419

StructField("age", IntegerType, nullable = true),

420

StructField("status", StringType, nullable = true)

421

))

422

}

423

424

override def buildScan(): RDD[Row] = {

425

buildScan(Array.empty, schema.fieldNames)

426

}

427

428

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {

429

// Implement filtered and pruned scan

430

val pushableFilters = filters.filter(canPushDown)

431

val prunedSchema = StructType(schema.fields.filter(f => requiredColumns.contains(f.name)))

432

433

loadDataWithFiltersAndProjection(pushableFilters, prunedSchema)

434

}

435

436

override def insert(data: DataFrame, overwrite: Boolean): Unit = {

437

// Implement data insertion

438

if (overwrite) {

439

// Clear existing data

440

clearData()

441

}

442

443

// Write new data

444

writeData(data)

445

}

446

447

private def loadDataWithFiltersAndProjection(filters: Array[Filter],

448

schema: StructType): RDD[Row] = {

449

// Implementation-specific data loading

450

// This would typically:

451

// 1. Apply filters during data loading

452

// 2. Project only required columns

453

// 3. Return RDD of rows

454

???

455

}

456

457

private def canPushDown(filter: Filter): Boolean = {

458

// Define which filters can be pushed to the data source

459

filter match {

460

case EqualTo(_, _) | GreaterThan(_, _) | LessThan(_, _) => true

461

case In(_, _) | IsNull(_) | IsNotNull(_) => true

462

case And(left, right) => canPushDown(left) && canPushDown(right)

463

case _ => false

464

}

465

}

466

}

467

```

468

469

## Migration from V1 to V2

470

471

### Filter Conversion Helper

472

473

```scala

474

object V1ToV2Migration {

475

476

def convertFilter(v1Filter: Filter): Predicate = v1Filter match {

477

case EqualTo(attr, value) =>

478

new org.apache.spark.sql.connector.expressions.filter.EqualTo(

479

Expressions.column(attr),

480

Expressions.literal(value)

481

)

482

483

case GreaterThan(attr, value) =>

484

new org.apache.spark.sql.connector.expressions.filter.GreaterThan(

485

Expressions.column(attr),

486

Expressions.literal(value)

487

)

488

489

case LessThan(attr, value) =>

490

new org.apache.spark.sql.connector.expressions.filter.LessThan(

491

Expressions.column(attr),

492

Expressions.literal(value)

493

)

494

495

case In(attr, values) =>

496

new org.apache.spark.sql.connector.expressions.filter.In(

497

Expressions.column(attr),

498

values.map(Expressions.literal)

499

)

500

501

case IsNull(attr) =>

502

new org.apache.spark.sql.connector.expressions.filter.IsNull(

503

Expressions.column(attr)

504

)

505

506

case And(left, right) =>

507

new org.apache.spark.sql.connector.expressions.filter.And(

508

convertFilter(left),

509

convertFilter(right)

510

)

511

512

case Or(left, right) =>

513

new org.apache.spark.sql.connector.expressions.filter.Or(

514

convertFilter(left),

515

convertFilter(right)

516

)

517

518

case Not(child) =>

519

new org.apache.spark.sql.connector.expressions.filter.Not(

520

convertFilter(child)

521

)

522

523

case AlwaysTrue() =>

524

new org.apache.spark.sql.connector.expressions.filter.AlwaysTrue()

525

526

case AlwaysFalse() =>

527

new org.apache.spark.sql.connector.expressions.filter.AlwaysFalse()

528

529

case _ =>

530

throw new UnsupportedOperationException(s"Cannot convert filter: $v1Filter")

531

}

532

533

def migrateDataSource(v1Source: BaseRelation): Table = {

534

new V2TableWrapper(v1Source)

535

}

536

}

537

538

class V2TableWrapper(v1Source: BaseRelation) extends Table with SupportsRead {

539

540

override def name(): String = v1Source.getClass.getSimpleName

541

542

override def schema(): StructType = v1Source.schema

543

544

override def capabilities(): java.util.Set[TableCapability] = {

545

java.util.Set.of(TableCapability.BATCH_READ)

546

}

547

548

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {

549

new V1CompatScanBuilder(v1Source, schema())

550

}

551

}

552

```

553

554

## Best Practices and Performance

555

556

### Filter Optimization Strategies

557

558

```scala

559

object FilterOptimizationStrategies {

560

561

def optimizeFilterOrder(filters: Array[Filter]): Array[Filter] = {

562

// Order filters by selectivity (most selective first)

563

filters.sortBy(calculateSelectivity)

564

}

565

566

private def calculateSelectivity(filter: Filter): Double = filter match {

567

case EqualTo(_, _) => 0.01 // Very selective

568

case In(_, values) => Math.min(0.1, values.length * 0.01) // Based on value count

569

case IsNull(_) => 0.05 // Usually selective

570

case IsNotNull(_) => 0.95 // Usually not selective

571

case GreaterThan(_, _) | LessThan(_, _) => 0.3 // Moderately selective

572

case StringStartsWith(_, _) => 0.1 // Quite selective

573

case StringContains(_, _) => 0.2 // Less selective

574

case And(left, right) => calculateSelectivity(left) * calculateSelectivity(right)

575

case Or(left, right) => calculateSelectivity(left) + calculateSelectivity(right) -

576

(calculateSelectivity(left) * calculateSelectivity(right))

577

case Not(child) => 1.0 - calculateSelectivity(child)

578

case _ => 0.5 // Default moderate selectivity

579

}

580

581

def canUseIndex(filter: Filter, indexedColumns: Set[String]): Boolean = {

582

filter.references.exists(indexedColumns.contains)

583

}

584

}

585

```

586

587

### Legacy Performance Considerations

588

589

```scala

590

trait PerformantV1DataSource extends BaseRelation with PrunedFilteredScan {

591

592

// Cache parsed filters to avoid repeated parsing

593

private val filterCache = new ConcurrentHashMap[Array[Filter], Array[Filter]]()

594

595

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {

596

// Use cached filter analysis

597

val optimizedFilters = filterCache.computeIfAbsent(filters, optimizeFilters)

598

599

// Minimize data transfer by projecting early

600

val projectedRDD = loadData(optimizedFilters)

601

602

// Apply column pruning

603

if (requiredColumns.length < schema.fields.length) {

604

val columnIndices = requiredColumns.map(schema.fieldIndex)

605

projectedRDD.map(row => Row.fromSeq(columnIndices.map(row.get)))

606

} else {

607

projectedRDD

608

}

609

}

610

611

private def optimizeFilters(filters: Array[Filter]): Array[Filter] = {

612

filters

613

.map(FilterOptimizer.simplifyFilter)

614

.filter(_ != AlwaysTrue())

615

}

616

}

617

```

618

619

The Legacy Data Source V1 APIs provide essential compatibility for existing Spark integrations while offering a migration path to the more powerful and flexible V2 APIs. Understanding these APIs is crucial for maintaining and upgrading existing Spark data source implementations.