or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddata-types.mdexpressions.mdindex.mdquery-plans.md

connectors.mddocs/

0

# Data Source Connectors

1

2

Catalyst's Data Source V2 API provides standardized interfaces for integrating external data sources with Spark SQL. The connector framework supports advanced features like predicate pushdown, column pruning, streaming, and transactional operations.

3

4

## Core Imports

5

6

```scala

7

import org.apache.spark.sql.connector.catalog._

8

import org.apache.spark.sql.connector.read._

9

import org.apache.spark.sql.connector.write._

10

import org.apache.spark.sql.connector.expressions._

11

import org.apache.spark.sql.types._

12

import org.apache.spark.sql.util.CaseInsensitiveStringMap

13

```

14

15

## Catalog APIs

16

17

### Catalog Interface

18

19

```scala { .api }

20

trait CatalogPlugin {

21

def initialize(name: String, options: CaseInsensitiveStringMap): Unit

22

def name(): String

23

}

24

25

trait TableCatalog extends CatalogPlugin {

26

def listTables(namespace: Array[String]): Array[Identifier]

27

def loadTable(ident: Identifier): Table

28

def tableExists(ident: Identifier): Boolean

29

def createTable(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table

30

def alterTable(ident: Identifier, changes: TableChange*): Table

31

def dropTable(ident: Identifier): Boolean

32

def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit

33

}

34

35

trait SupportsNamespaces extends CatalogPlugin {

36

def listNamespaces(): Array[Array[String]]

37

def listNamespaces(namespace: Array[String]): Array[Array[String]]

38

def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String]

39

def createNamespace(namespace: Array[String], metadata: util.Map[String, String]): Unit

40

def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit

41

def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean

42

}

43

```

44

45

**Usage Example:**

46

```scala

47

class MyTableCatalog extends TableCatalog {

48

private var catalogName: String = _

49

50

override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {

51

this.catalogName = name

52

}

53

54

override def name(): String = catalogName

55

56

override def loadTable(ident: Identifier): Table = {

57

// Load table from external catalog

58

new MyTable(ident, loadSchemaFromExternal(ident))

59

}

60

61

override def createTable(

62

ident: Identifier,

63

schema: StructType,

64

partitions: Array[Transform],

65

properties: util.Map[String, String]

66

): Table = {

67

// Create table in external system

68

createExternalTable(ident, schema, partitions, properties)

69

new MyTable(ident, schema, partitions, properties)

70

}

71

}

72

```

73

74

### Table Interface

75

76

```scala { .api }

77

trait Table {

78

def name(): String

79

def schema(): StructType

80

def partitioning(): Array[Transform]

81

def properties(): util.Map[String, String]

82

def capabilities(): util.Set[TableCapability]

83

}

84

85

trait SupportsRead extends Table {

86

def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder

87

}

88

89

trait SupportsWrite extends Table {

90

def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder

91

}

92

93

trait SupportsDelete extends Table {

94

def newDeleteBuilder(info: LogicalWriteInfo): DeleteBuilder

95

}

96

97

trait SupportsUpdate extends Table {

98

def newUpdateBuilder(info: LogicalWriteInfo): UpdateBuilder

99

}

100

101

trait SupportsMerge extends Table {

102

def newMergeBuilder(info: LogicalWriteInfo): MergeBuilder

103

}

104

```

105

106

**Usage Example:**

107

```scala

108

class MyTable(

109

identifier: Identifier,

110

tableSchema: StructType,

111

tablePartitions: Array[Transform] = Array.empty,

112

tableProperties: util.Map[String, String] = Map.empty.asJava

113

) extends Table with SupportsRead with SupportsWrite {

114

115

override def name(): String = identifier.toString

116

override def schema(): StructType = tableSchema

117

override def partitioning(): Array[Transform] = tablePartitions

118

override def properties(): util.Map[String, String] = tableProperties

119

120

override def capabilities(): util.Set[TableCapability] = Set(

121

TableCapability.BATCH_READ,

122

TableCapability.BATCH_WRITE,

123

TableCapability.STREAMING_READ,

124

TableCapability.ACCEPT_ANY_SCHEMA

125

).asJava

126

127

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {

128

new MyScanBuilder(tableSchema, options)

129

}

130

131

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {

132

new MyWriteBuilder(info)

133

}

134

}

135

```

136

137

## Read APIs

138

139

### Scan Building

140

141

```scala { .api }

142

trait ScanBuilder {

143

def build(): Scan

144

}

145

146

trait SupportsPushDownFilters extends ScanBuilder {

147

def pushPredicates(predicates: Array[Predicate]): Array[Predicate]

148

def pushedPredicates(): Array[Predicate]

149

}

150

151

trait SupportsPushDownRequiredColumns extends ScanBuilder {

152

def pruneColumns(requiredSchema: StructType): Unit

153

}

154

155

trait SupportsPushDownLimit extends ScanBuilder {

156

def pushLimit(limit: Int): Boolean

157

}

158

159

trait SupportsPushDownTopN extends ScanBuilder {

160

def pushTopN(orders: Array[SortOrder], limit: Int): Boolean

161

}

162

163

trait SupportsPushDownAggregates extends ScanBuilder {

164

def supportCompletePushDown(aggregation: Aggregation): Boolean

165

def pushAggregation(aggregation: Aggregation): Boolean

166

}

167

```

168

169

**Usage Example:**

170

```scala

171

class MyScanBuilder(

172

schema: StructType,

173

options: CaseInsensitiveStringMap

174

) extends ScanBuilder with SupportsPushDownFilters with SupportsPushDownRequiredColumns {

175

176

private var pushedFilters: Array[Predicate] = Array.empty

177

private var requiredSchema: StructType = schema

178

179

override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {

180

val (supported, unsupported) = predicates.partition(canPushDown)

181

this.pushedFilters = supported

182

unsupported

183

}

184

185

override def pushedPredicates(): Array[Predicate] = pushedFilters

186

187

override def pruneColumns(requiredSchema: StructType): Unit = {

188

this.requiredSchema = requiredSchema

189

}

190

191

override def build(): Scan = {

192

new MyScan(requiredSchema, pushedFilters, options)

193

}

194

195

private def canPushDown(predicate: Predicate): Boolean = {

196

// Determine which predicates can be pushed down to the data source

197

predicate match {

198

case _: sources.EqualTo => true

199

case _: sources.GreaterThan => true

200

case _: sources.LessThan => true

201

case _ => false

202

}

203

}

204

}

205

```

206

207

### Scan Execution

208

209

```scala { .api }

210

trait Scan {

211

def readSchema(): StructType

212

def description(): String

213

def toBatch: Batch

214

}

215

216

trait Batch {

217

def planInputPartitions(): Array[InputPartition]

218

def createReaderFactory(): PartitionReaderFactory

219

}

220

221

trait InputPartition extends Serializable

222

223

trait PartitionReaderFactory extends Serializable {

224

def createReader(partition: InputPartition): PartitionReader[InternalRow]

225

def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch]

226

def supportColumnarReads(partition: InputPartition): Boolean

227

}

228

229

trait PartitionReader[T] extends Closeable {

230

def next(): Boolean

231

def get(): T

232

}

233

```

234

235

**Usage Example:**

236

```scala

237

class MyScan(

238

schema: StructType,

239

filters: Array[Predicate],

240

options: CaseInsensitiveStringMap

241

) extends Scan with Batch {

242

243

override def readSchema(): StructType = schema

244

override def description(): String = s"MyScan(${schema.fieldNames.mkString(", ")})"

245

override def toBatch: Batch = this

246

247

override def planInputPartitions(): Array[InputPartition] = {

248

// Create partitions based on data source layout

249

(0 until getPartitionCount).map(i => MyInputPartition(i)).toArray

250

}

251

252

override def createReaderFactory(): PartitionReaderFactory = {

253

new MyPartitionReaderFactory(schema, filters, options)

254

}

255

}

256

257

case class MyInputPartition(partitionId: Int) extends InputPartition

258

259

class MyPartitionReaderFactory(

260

schema: StructType,

261

filters: Array[Predicate],

262

options: CaseInsensitiveStringMap

263

) extends PartitionReaderFactory {

264

265

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {

266

new MyPartitionReader(partition.asInstanceOf[MyInputPartition], schema, filters)

267

}

268

269

override def supportColumnarReads(partition: InputPartition): Boolean = false

270

}

271

272

class MyPartitionReader(

273

partition: MyInputPartition,

274

schema: StructType,

275

filters: Array[Predicate]

276

) extends PartitionReader[InternalRow] {

277

278

private val iterator = createDataIterator()

279

private var currentRow: InternalRow = _

280

281

override def next(): Boolean = {

282

if (iterator.hasNext) {

283

currentRow = iterator.next()

284

true

285

} else {

286

false

287

}

288

}

289

290

override def get(): InternalRow = currentRow

291

292

override def close(): Unit = {

293

// Clean up resources

294

}

295

296

private def createDataIterator(): Iterator[InternalRow] = {

297

// Create iterator that reads data from external source

298

// applying filters and returning rows matching the schema

299

loadDataFromExternalSource(partition.partitionId, schema, filters)

300

}

301

}

302

```

303

304

## Write APIs

305

306

### Write Building

307

308

```scala { .api }

309

trait WriteBuilder {

310

def build(): Write

311

}

312

313

trait SupportsTruncate extends WriteBuilder {

314

def truncate(): WriteBuilder

315

}

316

317

trait SupportsOverwrite extends WriteBuilder {

318

def overwrite(filters: Array[Filter]): WriteBuilder

319

}

320

321

trait SupportsDynamicOverwrite extends WriteBuilder {

322

def overwriteDynamicPartitions(): WriteBuilder

323

}

324

325

trait SupportsStreamingWrite extends WriteBuilder {

326

def buildForStreaming(): StreamingWrite

327

}

328

```

329

330

**Usage Example:**

331

```scala

332

class MyWriteBuilder(info: LogicalWriteInfo) extends WriteBuilder with SupportsOverwrite with SupportsTruncate {

333

334

private var overwriteFilters: Array[Filter] = Array.empty

335

private var truncateTable: Boolean = false

336

337

override def overwrite(filters: Array[Filter]): WriteBuilder = {

338

this.overwriteFilters = filters

339

this

340

}

341

342

override def truncate(): WriteBuilder = {

343

this.truncateTable = true

344

this

345

}

346

347

override def build(): Write = {

348

if (truncateTable) {

349

new MyTruncateWrite(info)

350

} else if (overwriteFilters.nonEmpty) {

351

new MyOverwriteWrite(info, overwriteFilters)

352

} else {

353

new MyAppendWrite(info)

354

}

355

}

356

}

357

```

358

359

### Write Execution

360

361

```scala { .api }

362

trait Write {

363

def description(): String

364

def toBatch: BatchWrite

365

}

366

367

trait BatchWrite {

368

def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory

369

def commit(messages: Array[WriterCommitMessage]): Unit

370

def abort(messages: Array[WriterCommitMessage]): Unit

371

}

372

373

trait DataWriterFactory extends Serializable {

374

def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow]

375

}

376

377

trait DataWriter[T] extends Closeable {

378

def write(record: T): Unit

379

def commit(): WriterCommitMessage

380

def abort(): Unit

381

}

382

383

trait WriterCommitMessage extends Serializable

384

```

385

386

**Usage Example:**

387

```scala

388

class MyAppendWrite(info: LogicalWriteInfo) extends Write with BatchWrite {

389

390

override def description(): String = "MyAppendWrite"

391

override def toBatch: BatchWrite = this

392

393

override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {

394

new MyDataWriterFactory(info.schema())

395

}

396

397

override def commit(messages: Array[WriterCommitMessage]): Unit = {

398

// Commit all partition writes atomically

399

messages.foreach {

400

case msg: MyWriterCommitMessage => commitPartition(msg)

401

case _ => throw new IllegalArgumentException("Unexpected commit message type")

402

}

403

}

404

405

override def abort(messages: Array[WriterCommitMessage]): Unit = {

406

// Clean up any partial writes

407

messages.foreach {

408

case msg: MyWriterCommitMessage => abortPartition(msg)

409

case _ => // Ignore unknown message types during abort

410

}

411

}

412

}

413

414

class MyDataWriterFactory(schema: StructType) extends DataWriterFactory {

415

override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {

416

new MyDataWriter(partitionId, taskId, schema)

417

}

418

}

419

420

class MyDataWriter(

421

partitionId: Int,

422

taskId: Long,

423

schema: StructType

424

) extends DataWriter[InternalRow] {

425

426

private val outputPath = createOutputPath(partitionId, taskId)

427

private val writer = createExternalWriter(outputPath, schema)

428

private var recordCount = 0

429

430

override def write(record: InternalRow): Unit = {

431

writer.writeRecord(record)

432

recordCount += 1

433

}

434

435

override def commit(): WriterCommitMessage = {

436

writer.close()

437

MyWriterCommitMessage(outputPath, recordCount)

438

}

439

440

override def abort(): Unit = {

441

writer.close()

442

deleteOutputFile(outputPath)

443

}

444

445

override def close(): Unit = {

446

writer.close()

447

}

448

}

449

450

case class MyWriterCommitMessage(

451

outputPath: String,

452

recordCount: Int

453

) extends WriterCommitMessage

454

```

455

456

## Streaming APIs

457

458

### Streaming Read

459

460

```scala { .api }

461

trait SupportsAdmissionControl extends Table {

462

def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset

463

}

464

465

trait ContinuousStream extends SparkDataStream {

466

def mergeOffsets(offsets: Array[PartitionOffset]): Offset

467

def planInputPartitions(start: Offset): Array[InputPartition]

468

def createContinuousReaderFactory(): ContinuousPartitionReaderFactory

469

}

470

471

trait MicroBatchStream extends SparkDataStream {

472

def latestOffset(): Offset

473

def planInputPartitions(start: Offset, end: Offset): Array[InputPartition]

474

def createReaderFactory(): PartitionReaderFactory

475

}

476

```

477

478

**Usage Example:**

479

```scala

480

class MyMicroBatchStream(

481

schema: StructType,

482

options: CaseInsensitiveStringMap

483

) extends MicroBatchStream {

484

485

override def readSchema(): StructType = schema

486

487

override def initialOffset(): Offset = {

488

MyOffset(getCurrentOffsetFromSource())

489

}

490

491

override def latestOffset(): Offset = {

492

MyOffset(getLatestOffsetFromSource())

493

}

494

495

override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = {

496

val startOffset = start.asInstanceOf[MyOffset].value

497

val endOffset = end.asInstanceOf[MyOffset].value

498

499

createPartitionsForRange(startOffset, endOffset)

500

}

501

502

override def createReaderFactory(): PartitionReaderFactory = {

503

new MyStreamingReaderFactory(schema, options)

504

}

505

506

override def commit(end: Offset): Unit = {

507

// Commit offset in external system

508

commitOffsetInExternalSystem(end.asInstanceOf[MyOffset].value)

509

}

510

511

override def stop(): Unit = {

512

// Clean up streaming resources

513

}

514

}

515

516

case class MyOffset(value: Long) extends Offset {

517

override def json(): String = s"""{"offset":$value}"""

518

}

519

```

520

521

### Streaming Write

522

523

```scala { .api }

524

trait StreamingWrite {

525

def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory

526

def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit

527

def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit

528

}

529

530

trait StreamingDataWriterFactory extends Serializable {

531

def createWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[InternalRow]

532

}

533

```

534

535

## Data Source Provider

536

537

### TableProvider Interface

538

539

```scala { .api }

540

trait TableProvider {

541

def inferSchema(options: CaseInsensitiveStringMap): StructType

542

def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table

543

}

544

545

trait DataSourceV2 extends TableProvider {

546

def shortName(): String

547

}

548

```

549

550

**Usage Example:**

551

```scala

552

class MyDataSourceV2 extends DataSourceV2 {

553

554

override def shortName(): String = "mydatasource"

555

556

override def inferSchema(options: CaseInsensitiveStringMap): StructType = {

557

// Infer schema from external data source

558

val path = options.get("path")

559

inferSchemaFromPath(path)

560

}

561

562

override def getTable(

563

schema: StructType,

564

partitioning: Array[Transform],

565

properties: util.Map[String, String]

566

): Table = {

567

val options = CaseInsensitiveStringMap.empty()

568

new MyTable(

569

Identifier.of(Array.empty, "my_table"),

570

schema,

571

partitioning,

572

properties

573

)

574

}

575

}

576

577

// Register the data source

578

class MyDataSourceV2Registration extends DataSourceRegister {

579

override def shortName(): String = "mydatasource"

580

}

581

```

582

583

## Expression Pushdown

584

585

### Predicate Types

586

587

```scala { .api }

588

sealed trait Predicate extends Serializable {

589

def references: Array[String]

590

}

591

592

case class EqualTo(attribute: String, value: Any) extends Predicate

593

case class EqualNullSafe(attribute: String, value: Any) extends Predicate

594

case class GreaterThan(attribute: String, value: Any) extends Predicate

595

case class GreaterThanOrEqual(attribute: String, value: Any) extends Predicate

596

case class LessThan(attribute: String, value: Any) extends Predicate

597

case class LessThanOrEqual(attribute: String, value: Any) extends Predicate

598

case class In(attribute: String, values: Array[Any]) extends Predicate

599

case class IsNull(attribute: String) extends Predicate

600

case class IsNotNull(attribute: String) extends Predicate

601

case class And(left: Predicate, right: Predicate) extends Predicate

602

case class Or(left: Predicate, right: Predicate) extends Predicate

603

case class Not(child: Predicate) extends Predicate

604

case class StringStartsWith(attribute: String, value: String) extends Predicate

605

case class StringEndsWith(attribute: String, value: String) extends Predicate

606

case class StringContains(attribute: String, value: String) extends Predicate

607

```

608

609

### Advanced Pushdown

610

611

```scala { .api }

612

trait SupportsPushDownCatalystFilters extends ScanBuilder {

613

def pushCatalystFilters(filters: Array[Expression]): Array[Expression]

614

def pushedCatalystFilters(): Array[Expression]

615

}

616

617

case class Aggregation(

618

aggregateExpressions: Array[AggregateFunc],

619

groupByExpressions: Array[Expression]

620

)

621

622

trait AggregateFunc extends Expression {

623

def aggregateFunction(): aggregate.AggregateFunction

624

}

625

```

626

627

The Data Source V2 API provides a comprehensive framework for building high-performance, feature-rich connectors that integrate seamlessly with Spark SQL's query planning and optimization capabilities.