or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md

data-source-v2-apis.mddocs/

0

# Data Source V2 APIs

1

2

The Data Source V2 APIs provide a modern, comprehensive framework for implementing custom data sources in Apache Spark. These APIs support advanced optimizations like predicate pushdown, column pruning, and vectorized processing while maintaining clean separation of concerns.

3

4

## Core Read APIs

5

6

### ScanBuilder

7

8

The entry point for building scans with various optimizations:

9

10

```java { .api }

11

package org.apache.spark.sql.connector.read;

12

13

public interface ScanBuilder {

14

/**

15

* Build the final Scan object

16

*/

17

Scan build();

18

}

19

```

20

21

### Scan

22

23

Logical representation of a data scan:

24

25

```java { .api }

26

public interface Scan {

27

/**

28

* Returns the actual schema of this data source scan

29

*/

30

StructType readSchema();

31

32

/**

33

* Returns a human-readable description of this scan

34

*/

35

default String description();

36

37

/**

38

* Returns a Batch for batch queries (must implement if table supports BATCH_READ)

39

*/

40

default Batch toBatch();

41

42

/**

43

* Returns a MicroBatchStream for streaming queries (must implement if table supports MICRO_BATCH_READ)

44

*/

45

default MicroBatchStream toMicroBatchStream(String checkpointLocation);

46

47

/**

48

* Returns a ContinuousStream for continuous streaming queries (must implement if table supports CONTINUOUS_READ)

49

*/

50

default ContinuousStream toContinuousStream(String checkpointLocation);

51

52

/**

53

* Returns custom metrics that this scan supports

54

*/

55

default CustomMetric[] supportedCustomMetrics();

56

57

/**

58

* Returns custom task metrics reported from driver side

59

*/

60

default CustomTaskMetric[] reportDriverMetrics();

61

62

/**

63

* Returns the columnar support mode for vectorized processing

64

*/

65

default ColumnarSupportMode columnarSupportMode();

66

}

67

```

68

69

### Batch

70

71

Physical representation for batch execution:

72

73

```java { .api }

74

public interface Batch {

75

/**

76

* Plan input partitions for parallel processing

77

*/

78

InputPartition[] planInputPartitions();

79

80

/**

81

* Create reader factory for processing partitions

82

*/

83

PartitionReaderFactory createReaderFactory();

84

}

85

```

86

87

### Basic Data Source Implementation

88

89

```java

90

public class MyDataSource implements Table, SupportsRead {

91

private final String name;

92

private final StructType schema;

93

private final String[] paths;

94

95

public MyDataSource(String name, StructType schema, String[] paths) {

96

this.name = name;

97

this.schema = schema;

98

this.paths = paths;

99

}

100

101

@Override

102

public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {

103

return new MyScanBuilder(schema, paths, options);

104

}

105

}

106

107

public class MyScanBuilder implements ScanBuilder {

108

private final StructType schema;

109

private final String[] paths;

110

private final CaseInsensitiveStringMap options;

111

112

public MyScanBuilder(StructType schema, String[] paths,

113

CaseInsensitiveStringMap options) {

114

this.schema = schema;

115

this.paths = paths;

116

this.options = options;

117

}

118

119

@Override

120

public Scan build() {

121

return new MyScan(schema, paths);

122

}

123

}

124

125

public class MyScan implements Scan {

126

private final StructType schema;

127

private final String[] paths;

128

129

@Override

130

public StructType readSchema() {

131

return schema;

132

}

133

134

@Override

135

public String description() {

136

return String.format("MyScan[paths=%s]", Arrays.toString(paths));

137

}

138

139

@Override

140

public Batch toBatch() {

141

return new MyBatch(schema, paths);

142

}

143

}

144

```

145

146

## Partition Processing

147

148

### InputPartition

149

150

Represents a partition of input data:

151

152

```java { .api }

153

public interface InputPartition extends Serializable {

154

// Marker interface - implementations can add partition-specific data

155

}

156

```

157

158

### PartitionReader

159

160

Reads data from a single partition:

161

162

```java { .api }

163

public interface PartitionReader<T> extends Closeable {

164

/**

165

* Advance to next record

166

*/

167

boolean next() throws IOException;

168

169

/**

170

* Get current record

171

*/

172

T get();

173

}

174

```

175

176

### PartitionReaderFactory

177

178

Factory for creating partition readers:

179

180

```java { .api }

181

public interface PartitionReaderFactory extends Serializable {

182

/**

183

* Create reader for given partition

184

*/

185

PartitionReader<InternalRow> createReader(InputPartition partition);

186

}

187

```

188

189

**Complete Partition Processing Implementation:**

190

191

```java

192

public class MyBatch implements Batch {

193

private final StructType schema;

194

private final String[] paths;

195

196

@Override

197

public InputPartition[] planInputPartitions() {

198

// Create one partition per file/path

199

return Arrays.stream(paths)

200

.map(MyInputPartition::new)

201

.toArray(InputPartition[]::new);

202

}

203

204

@Override

205

public PartitionReaderFactory createReaderFactory() {

206

return new MyPartitionReaderFactory(schema);

207

}

208

}

209

210

public class MyInputPartition implements InputPartition {

211

private final String path;

212

213

public MyInputPartition(String path) {

214

this.path = path;

215

}

216

217

public String getPath() {

218

return path;

219

}

220

}

221

222

public class MyPartitionReaderFactory implements PartitionReaderFactory {

223

private final StructType schema;

224

225

public MyPartitionReaderFactory(StructType schema) {

226

this.schema = schema;

227

}

228

229

@Override

230

public PartitionReader<InternalRow> createReader(InputPartition partition) {

231

MyInputPartition myPartition = (MyInputPartition) partition;

232

return new MyPartitionReader(schema, myPartition.getPath());

233

}

234

}

235

236

public class MyPartitionReader implements PartitionReader<InternalRow> {

237

private final StructType schema;

238

private final String path;

239

private Iterator<InternalRow> iterator;

240

private InternalRow currentRow;

241

242

public MyPartitionReader(StructType schema, String path) {

243

this.schema = schema;

244

this.path = path;

245

this.iterator = loadDataFromPath(path);

246

}

247

248

@Override

249

public boolean next() {

250

if (iterator.hasNext()) {

251

currentRow = iterator.next();

252

return true;

253

}

254

return false;

255

}

256

257

@Override

258

public InternalRow get() {

259

return currentRow;

260

}

261

262

@Override

263

public void close() throws IOException {

264

// Clean up resources

265

}

266

}

267

```

268

269

## Pushdown Optimizations

270

271

### Filter Pushdown

272

273

#### Legacy Filter Pushdown (V1)

274

```java { .api }

275

public interface SupportsPushDownFilters {

276

/**

277

* Push filters down to data source

278

* @return filters that could not be pushed down

279

*/

280

Filter[] pushFilters(Filter[] filters);

281

282

/**

283

* Get filters that were successfully pushed down

284

*/

285

Filter[] pushedFilters();

286

}

287

```

288

289

#### Modern Filter Pushdown (V2)

290

```java { .api }

291

public interface SupportsPushDownV2Filters {

292

/**

293

* Push V2 predicates down to data source

294

* @return predicates that could not be pushed down

295

*/

296

Predicate[] pushPredicates(Predicate[] predicates);

297

298

/**

299

* Get predicates that were successfully pushed down

300

*/

301

Predicate[] pushedPredicates();

302

}

303

```

304

305

**Filter Pushdown Implementation:**

306

307

```java

308

public class MyScanBuilder implements ScanBuilder, SupportsPushDownV2Filters {

309

private final StructType schema;

310

private final String[] paths;

311

private Predicate[] pushedPredicates = new Predicate[0];

312

313

@Override

314

public Predicate[] pushPredicates(Predicate[] predicates) {

315

List<Predicate> supported = new ArrayList<>();

316

List<Predicate> unsupported = new ArrayList<>();

317

318

for (Predicate predicate : predicates) {

319

if (canPushDown(predicate)) {

320

supported.add(predicate);

321

} else {

322

unsupported.add(predicate);

323

}

324

}

325

326

this.pushedPredicates = supported.toArray(new Predicate[0]);

327

return unsupported.toArray(new Predicate[0]);

328

}

329

330

@Override

331

public Predicate[] pushedPredicates() {

332

return pushedPredicates.clone();

333

}

334

335

private boolean canPushDown(Predicate predicate) {

336

// Check if predicate can be evaluated by data source

337

if (predicate instanceof EqualTo) {

338

return true;

339

}

340

if (predicate instanceof GreaterThan || predicate instanceof LessThan) {

341

return true;

342

}

343

if (predicate instanceof And || predicate instanceof Or) {

344

return true;

345

}

346

return false;

347

}

348

349

@Override

350

public Scan build() {

351

return new MyScan(schema, paths, pushedPredicates);

352

}

353

}

354

```

355

356

### Column Pruning

357

358

```java { .api }

359

public interface SupportsPushDownRequiredColumns {

360

/**

361

* Prune columns to only those required

362

*/

363

void pruneColumns(StructType requiredSchema);

364

}

365

```

366

367

**Implementation:**

368

369

```java

370

public class MyScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns {

371

private StructType schema;

372

private StructType prunedSchema;

373

374

@Override

375

public void pruneColumns(StructType requiredSchema) {

376

// Only read required columns

377

this.prunedSchema = requiredSchema;

378

}

379

380

@Override

381

public Scan build() {

382

StructType finalSchema = prunedSchema != null ? prunedSchema : schema;

383

return new MyScan(finalSchema, paths, pushedPredicates);

384

}

385

}

386

```

387

388

### Aggregate Pushdown

389

390

```java { .api }

391

public interface SupportsPushDownAggregates {

392

/**

393

* Push aggregation down to data source

394

* @return true if aggregation can be completely pushed down

395

*/

396

boolean pushAggregation(Aggregation aggregation);

397

398

/**

399

* Whether data source can completely handle the aggregation

400

*/

401

boolean supportCompletePushDown(Aggregation aggregation);

402

}

403

```

404

405

**Implementation:**

406

407

```java

408

public class MyScanBuilder implements ScanBuilder, SupportsPushDownAggregates {

409

private Aggregation pushedAggregation;

410

private boolean completeAggregation;

411

412

@Override

413

public boolean pushAggregation(Aggregation aggregation) {

414

// Check if we can handle this aggregation

415

AggregateFunc[] aggregates = aggregation.aggregateExpressions();

416

Expression[] groupBy = aggregation.groupByExpressions();

417

418

// Simple aggregations we can handle

419

for (AggregateFunc func : aggregates) {

420

if (!(func instanceof Count || func instanceof Sum)) {

421

return false; // Can't handle complex aggregations

422

}

423

}

424

425

this.pushedAggregation = aggregation;

426

this.completeAggregation = true;

427

return true;

428

}

429

430

@Override

431

public boolean supportCompletePushDown(Aggregation aggregation) {

432

return completeAggregation;

433

}

434

}

435

```

436

437

### Limit and Offset Pushdown

438

439

```java { .api }

440

public interface SupportsPushDownLimit {

441

boolean pushLimit(int limit);

442

int pushedLimit();

443

}

444

445

public interface SupportsPushDownOffset {

446

boolean pushOffset(long offset);

447

long pushedOffset();

448

}

449

```

450

451

### TopN Pushdown

452

453

```java { .api }

454

public interface SupportsPushDownTopN {

455

boolean pushTopN(SortOrder[] orders, int limit);

456

SortOrder[] pushedTopNOrders();

457

int pushedTopNLimit();

458

}

459

```

460

461

**TopN Implementation:**

462

463

```java

464

public class MyScanBuilder implements ScanBuilder, SupportsPushDownTopN {

465

private SortOrder[] pushedOrders;

466

private int pushedLimit = -1;

467

468

@Override

469

public boolean pushTopN(SortOrder[] orders, int limit) {

470

// Check if we can handle the sort orders

471

for (SortOrder order : orders) {

472

if (!canSortBy(order)) {

473

return false;

474

}

475

}

476

477

this.pushedOrders = orders;

478

this.pushedLimit = limit;

479

return true;

480

}

481

482

private boolean canSortBy(SortOrder order) {

483

// Check if column is sortable in our data source

484

return order.expression() instanceof NamedReference;

485

}

486

}

487

```

488

489

## Write APIs

490

491

### WriteBuilder

492

493

Entry point for building write operations:

494

495

```java { .api }

496

package org.apache.spark.sql.connector.write;

497

498

public interface WriteBuilder {

499

/**

500

* Build the final Write object

501

*/

502

Write build();

503

504

/**

505

* Set the save mode for this write

506

*/

507

WriteBuilder mode(SaveMode mode);

508

}

509

```

510

511

### Write

512

513

Logical representation of a write operation:

514

515

```java { .api }

516

public interface Write {

517

/**

518

* Returns the description associated with this write

519

*/

520

default String description();

521

522

/**

523

* Returns a BatchWrite to write data to batch source (must implement if table supports BATCH_WRITE)

524

*/

525

default BatchWrite toBatch();

526

527

/**

528

* Returns a StreamingWrite for streaming writes (must implement if table supports STREAMING_WRITE)

529

*/

530

default StreamingWrite toStreaming();

531

532

/**

533

* Returns custom metrics that this write supports

534

*/

535

default CustomMetric[] supportedCustomMetrics();

536

}

537

```

538

539

### BatchWrite

540

541

Physical batch write implementation:

542

543

```java { .api }

544

public interface BatchWrite {

545

/**

546

* Create writer factory for partitions

547

*/

548

DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info);

549

550

/**

551

* Whether to use Spark's commit coordinator

552

*/

553

boolean useCommitCoordinator();

554

555

/**

556

* Called when a data writer commits

557

*/

558

void onDataWriterCommit(WriterCommitMessage message);

559

560

/**

561

* Commit the entire write operation

562

*/

563

void commit(WriterCommitMessage[] messages);

564

565

/**

566

* Abort the write operation

567

*/

568

void abort(WriterCommitMessage[] messages);

569

}

570

```

571

572

### DataWriter

573

574

Writes data for a single partition:

575

576

```java { .api }

577

public interface DataWriter<T> extends Closeable {

578

/**

579

* Write a single record

580

*/

581

void write(T record) throws IOException;

582

583

/**

584

* Commit this writer's work

585

*/

586

WriterCommitMessage commit() throws IOException;

587

588

/**

589

* Abort this writer's work

590

*/

591

void abort() throws IOException;

592

}

593

```

594

595

**Complete Write Implementation:**

596

597

```java

598

public class MyDataSource implements Table, SupportsWrite {

599

@Override

600

public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {

601

return new MyWriteBuilder(info);

602

}

603

}

604

605

public class MyWriteBuilder implements WriteBuilder {

606

private final LogicalWriteInfo info;

607

private SaveMode mode = SaveMode.ErrorIfExists;

608

609

@Override

610

public WriteBuilder mode(SaveMode mode) {

611

this.mode = mode;

612

return this;

613

}

614

615

@Override

616

public Write build() {

617

return new MyWrite(info, mode);

618

}

619

}

620

621

public class MyWrite implements Write {

622

private final LogicalWriteInfo info;

623

private final SaveMode mode;

624

625

@Override

626

public BatchWrite toBatch() {

627

return new MyBatchWrite(info, mode);

628

}

629

}

630

631

public class MyBatchWrite implements BatchWrite {

632

private final LogicalWriteInfo info;

633

private final SaveMode mode;

634

635

@Override

636

public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {

637

return new MyDataWriterFactory(info.schema());

638

}

639

640

@Override

641

public boolean useCommitCoordinator() {

642

return true; // Use Spark's coordinator for ACID guarantees

643

}

644

645

@Override

646

public void commit(WriterCommitMessage[] messages) {

647

// Commit all partition writes atomically

648

for (WriterCommitMessage message : messages) {

649

MyCommitMessage myMessage = (MyCommitMessage) message;

650

finalizePartition(myMessage);

651

}

652

}

653

654

@Override

655

public void abort(WriterCommitMessage[] messages) {

656

// Clean up any partially written data

657

for (WriterCommitMessage message : messages) {

658

MyCommitMessage myMessage = (MyCommitMessage) message;

659

cleanupPartition(myMessage);

660

}

661

}

662

}

663

664

public class MyDataWriterFactory implements DataWriterFactory {

665

private final StructType schema;

666

667

@Override

668

public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {

669

return new MyDataWriter(schema, partitionId, taskId);

670

}

671

}

672

673

public class MyDataWriter implements DataWriter<InternalRow> {

674

private final StructType schema;

675

private final int partitionId;

676

private final long taskId;

677

private final List<InternalRow> buffer = new ArrayList<>();

678

679

@Override

680

public void write(InternalRow record) throws IOException {

681

buffer.add(record.copy()); // Make defensive copy

682

}

683

684

@Override

685

public WriterCommitMessage commit() throws IOException {

686

// Write buffered data to storage

687

String outputPath = writeDataToStorage(buffer);

688

return new MyCommitMessage(partitionId, taskId, outputPath, buffer.size());

689

}

690

691

@Override

692

public void abort() throws IOException {

693

buffer.clear();

694

// Clean up any temporary files

695

}

696

}

697

```

698

699

## Write Support Interfaces

700

701

### SupportsOverwrite

702

703

```java { .api }

704

public interface SupportsOverwrite {

705

WriteBuilder overwrite(Filter[] filters);

706

}

707

```

708

709

### SupportsOverwriteV2

710

711

```java { .api }

712

public interface SupportsOverwriteV2 {

713

WriteBuilder overwrite(Predicate[] predicates);

714

}

715

```

716

717

### SupportsDynamicOverwrite

718

719

```java { .api }

720

public interface SupportsDynamicOverwrite {

721

WriteBuilder overwriteDynamicPartitions();

722

}

723

```

724

725

### SupportsTruncate

726

727

```java { .api }

728

public interface SupportsTruncate {

729

WriteBuilder truncate();

730

}

731

```

732

733

**Complete Write Builder with All Support:**

734

735

```java

736

public class MyWriteBuilder implements WriteBuilder, SupportsOverwriteV2,

737

SupportsDynamicOverwrite, SupportsTruncate {

738

private final LogicalWriteInfo info;

739

private SaveMode mode = SaveMode.ErrorIfExists;

740

private Predicate[] overwritePredicates;

741

private boolean dynamicOverwrite = false;

742

private boolean truncate = false;

743

744

@Override

745

public WriteBuilder overwrite(Predicate[] predicates) {

746

this.overwritePredicates = predicates;

747

this.mode = SaveMode.Overwrite;

748

return this;

749

}

750

751

@Override

752

public WriteBuilder overwriteDynamicPartitions() {

753

this.dynamicOverwrite = true;

754

this.mode = SaveMode.Overwrite;

755

return this;

756

}

757

758

@Override

759

public WriteBuilder truncate() {

760

this.truncate = true;

761

return this;

762

}

763

764

@Override

765

public Write build() {

766

return new MyWrite(info, mode, overwritePredicates, dynamicOverwrite, truncate);

767

}

768

}

769

```

770

771

## Distribution APIs

772

773

### Distribution

774

775

Represents how data should be distributed across partitions:

776

777

```java { .api }

778

package org.apache.spark.sql.connector.distributions;

779

780

public interface Distribution {

781

// Marker interface for different distribution strategies

782

}

783

```

784

785

### Distributions Factory

786

787

```java { .api }

788

public class Distributions {

789

/**

790

* No specific distribution requirement

791

*/

792

public static Distribution unspecified() { ... }

793

794

/**

795

* Data clustered by expressions (hash partitioning)

796

*/

797

public static Distribution clustered(Expression[] expressions) { ... }

798

799

/**

800

* Data ordered by sort expressions

801

*/

802

public static Distribution ordered(SortOrder[] ordering) { ... }

803

}

804

```

805

806

**Usage Example:**

807

808

```java

809

public class MyBatchWrite implements BatchWrite, SupportsReportPartitioning {

810

@Override

811

public Distribution requiredDistribution() {

812

// Require data to be hash partitioned by user_id

813

return Distributions.clustered(new Expression[] {

814

Expressions.column("user_id")

815

});

816

}

817

818

@Override

819

public int numPartitions() {

820

return 10; // Write to 10 partitions

821

}

822

}

823

```

824

825

## Advanced Patterns

826

827

### Vectorized Reading

828

829

For high-performance reading, implement columnar batch processing:

830

831

```java

832

public class MyVectorizedPartitionReader implements PartitionReader<ColumnarBatch> {

833

private final ColumnVector[] columns;

834

private int batchSize = 1000;

835

836

@Override

837

public boolean next() throws IOException {

838

// Load next batch of data into column vectors

839

return loadNextBatch();

840

}

841

842

@Override

843

public ColumnarBatch get() {

844

return new ColumnarBatch(columns, batchSize);

845

}

846

847

private boolean loadNextBatch() {

848

// Efficient columnar data loading

849

for (int i = 0; i < columns.length; i++) {

850

loadColumnData(columns[i], i);

851

}

852

return true;

853

}

854

}

855

```

856

857

### Transactional Writes

858

859

Implement ACID transactions using commit coordination:

860

861

```java

862

public class TransactionalBatchWrite implements BatchWrite {

863

private final String transactionId;

864

865

@Override

866

public boolean useCommitCoordinator() {

867

return true; // Essential for transactions

868

}

869

870

@Override

871

public void commit(WriterCommitMessage[] messages) {

872

try {

873

// Start transaction

874

beginTransaction(transactionId);

875

876

// Commit all partitions

877

for (WriterCommitMessage message : messages) {

878

commitPartition(message);

879

}

880

881

// Commit transaction

882

commitTransaction(transactionId);

883

} catch (Exception e) {

884

abortTransaction(transactionId);

885

throw new RuntimeException("Transaction failed", e);

886

}

887

}

888

}

889

```

890

891

### Partition-Aware Writing

892

893

Optimize writes for partitioned tables:

894

895

```java

896

public class PartitionAwareDataWriter implements DataWriter<InternalRow> {

897

private final Map<String, List<InternalRow>> partitionBuffers = new HashMap<>();

898

private final String[] partitionColumns;

899

900

@Override

901

public void write(InternalRow record) throws IOException {

902

String partitionKey = extractPartitionKey(record);

903

partitionBuffers.computeIfAbsent(partitionKey, k -> new ArrayList<>())

904

.add(record.copy());

905

}

906

907

@Override

908

public WriterCommitMessage commit() throws IOException {

909

Map<String, String> partitionPaths = new HashMap<>();

910

911

// Write each partition separately

912

for (Map.Entry<String, List<InternalRow>> entry : partitionBuffers.entrySet()) {

913

String partitionKey = entry.getKey();

914

List<InternalRow> rows = entry.getValue();

915

String path = writePartition(partitionKey, rows);

916

partitionPaths.put(partitionKey, path);

917

}

918

919

return new PartitionedCommitMessage(partitionPaths);

920

}

921

}

922

```

923

924

The Data Source V2 APIs provide a powerful, flexible framework for implementing high-performance, feature-rich data sources with comprehensive optimization support and clean architectural patterns.