0
# Data Source Connectors
1
2
Catalyst's Data Source V2 API provides standardized interfaces for integrating external data sources with Spark SQL. The connector framework supports advanced features like predicate pushdown, column pruning, streaming, and transactional operations.
3
4
## Core Imports
5
6
```scala
7
import org.apache.spark.sql.connector.catalog._
8
import org.apache.spark.sql.connector.read._
9
import org.apache.spark.sql.connector.write._
10
import org.apache.spark.sql.connector.expressions._
11
import org.apache.spark.sql.types._
12
import org.apache.spark.sql.util.CaseInsensitiveStringMap
13
```
14
15
## Catalog APIs
16
17
### Catalog Interface
18
19
```scala { .api }
20
trait CatalogPlugin {
21
def initialize(name: String, options: CaseInsensitiveStringMap): Unit
22
def name(): String
23
}
24
25
trait TableCatalog extends CatalogPlugin {
26
def listTables(namespace: Array[String]): Array[Identifier]
27
def loadTable(ident: Identifier): Table
28
def tableExists(ident: Identifier): Boolean
29
def createTable(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table
30
def alterTable(ident: Identifier, changes: TableChange*): Table
31
def dropTable(ident: Identifier): Boolean
32
def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit
33
}
34
35
trait SupportsNamespaces extends CatalogPlugin {
36
def listNamespaces(): Array[Array[String]]
37
def listNamespaces(namespace: Array[String]): Array[Array[String]]
38
def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String]
39
def createNamespace(namespace: Array[String], metadata: util.Map[String, String]): Unit
40
def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit
41
def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean
42
}
43
```
44
45
**Usage Example:**
46
```scala
47
class MyTableCatalog extends TableCatalog {
48
private var catalogName: String = _
49
50
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
51
this.catalogName = name
52
}
53
54
override def name(): String = catalogName
55
56
override def loadTable(ident: Identifier): Table = {
57
// Load table from external catalog
58
new MyTable(ident, loadSchemaFromExternal(ident))
59
}
60
61
override def createTable(
62
ident: Identifier,
63
schema: StructType,
64
partitions: Array[Transform],
65
properties: util.Map[String, String]
66
): Table = {
67
// Create table in external system
68
createExternalTable(ident, schema, partitions, properties)
69
new MyTable(ident, schema, partitions, properties)
70
}
71
}
72
```
73
74
### Table Interface
75
76
```scala { .api }
77
trait Table {
78
def name(): String
79
def schema(): StructType
80
def partitioning(): Array[Transform]
81
def properties(): util.Map[String, String]
82
def capabilities(): util.Set[TableCapability]
83
}
84
85
trait SupportsRead extends Table {
86
def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
87
}
88
89
trait SupportsWrite extends Table {
90
def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder
91
}
92
93
trait SupportsDelete extends Table {
94
def newDeleteBuilder(info: LogicalWriteInfo): DeleteBuilder
95
}
96
97
trait SupportsUpdate extends Table {
98
def newUpdateBuilder(info: LogicalWriteInfo): UpdateBuilder
99
}
100
101
trait SupportsMerge extends Table {
102
def newMergeBuilder(info: LogicalWriteInfo): MergeBuilder
103
}
104
```
105
106
**Usage Example:**
107
```scala
108
class MyTable(
109
identifier: Identifier,
110
tableSchema: StructType,
111
tablePartitions: Array[Transform] = Array.empty,
112
tableProperties: util.Map[String, String] = Map.empty.asJava
113
) extends Table with SupportsRead with SupportsWrite {
114
115
override def name(): String = identifier.toString
116
override def schema(): StructType = tableSchema
117
override def partitioning(): Array[Transform] = tablePartitions
118
override def properties(): util.Map[String, String] = tableProperties
119
120
override def capabilities(): util.Set[TableCapability] = Set(
121
TableCapability.BATCH_READ,
122
TableCapability.BATCH_WRITE,
123
TableCapability.STREAMING_READ,
124
TableCapability.ACCEPT_ANY_SCHEMA
125
).asJava
126
127
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
128
new MyScanBuilder(tableSchema, options)
129
}
130
131
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
132
new MyWriteBuilder(info)
133
}
134
}
135
```
136
137
## Read APIs
138
139
### Scan Building
140
141
```scala { .api }
142
trait ScanBuilder {
143
def build(): Scan
144
}
145
146
trait SupportsPushDownFilters extends ScanBuilder {
147
def pushPredicates(predicates: Array[Predicate]): Array[Predicate]
148
def pushedPredicates(): Array[Predicate]
149
}
150
151
trait SupportsPushDownRequiredColumns extends ScanBuilder {
152
def pruneColumns(requiredSchema: StructType): Unit
153
}
154
155
trait SupportsPushDownLimit extends ScanBuilder {
156
def pushLimit(limit: Int): Boolean
157
}
158
159
trait SupportsPushDownTopN extends ScanBuilder {
160
def pushTopN(orders: Array[SortOrder], limit: Int): Boolean
161
}
162
163
trait SupportsPushDownAggregates extends ScanBuilder {
164
def supportCompletePushDown(aggregation: Aggregation): Boolean
165
def pushAggregation(aggregation: Aggregation): Boolean
166
}
167
```
168
169
**Usage Example:**
170
```scala
171
class MyScanBuilder(
172
schema: StructType,
173
options: CaseInsensitiveStringMap
174
) extends ScanBuilder with SupportsPushDownFilters with SupportsPushDownRequiredColumns {
175
176
private var pushedFilters: Array[Predicate] = Array.empty
177
private var requiredSchema: StructType = schema
178
179
override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
180
val (supported, unsupported) = predicates.partition(canPushDown)
181
this.pushedFilters = supported
182
unsupported
183
}
184
185
override def pushedPredicates(): Array[Predicate] = pushedFilters
186
187
override def pruneColumns(requiredSchema: StructType): Unit = {
188
this.requiredSchema = requiredSchema
189
}
190
191
override def build(): Scan = {
192
new MyScan(requiredSchema, pushedFilters, options)
193
}
194
195
private def canPushDown(predicate: Predicate): Boolean = {
196
// Determine which predicates can be pushed down to the data source
197
predicate match {
198
case _: sources.EqualTo => true
199
case _: sources.GreaterThan => true
200
case _: sources.LessThan => true
201
case _ => false
202
}
203
}
204
}
205
```
206
207
### Scan Execution
208
209
```scala { .api }
210
trait Scan {
211
def readSchema(): StructType
212
def description(): String
213
def toBatch: Batch
214
}
215
216
trait Batch {
217
def planInputPartitions(): Array[InputPartition]
218
def createReaderFactory(): PartitionReaderFactory
219
}
220
221
trait InputPartition extends Serializable
222
223
trait PartitionReaderFactory extends Serializable {
224
def createReader(partition: InputPartition): PartitionReader[InternalRow]
225
def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch]
226
def supportColumnarReads(partition: InputPartition): Boolean
227
}
228
229
trait PartitionReader[T] extends Closeable {
230
def next(): Boolean
231
def get(): T
232
}
233
```
234
235
**Usage Example:**
236
```scala
237
class MyScan(
238
schema: StructType,
239
filters: Array[Predicate],
240
options: CaseInsensitiveStringMap
241
) extends Scan with Batch {
242
243
override def readSchema(): StructType = schema
244
override def description(): String = s"MyScan(${schema.fieldNames.mkString(", ")})"
245
override def toBatch: Batch = this
246
247
override def planInputPartitions(): Array[InputPartition] = {
248
// Create partitions based on data source layout
249
(0 until getPartitionCount).map(i => MyInputPartition(i)).toArray
250
}
251
252
override def createReaderFactory(): PartitionReaderFactory = {
253
new MyPartitionReaderFactory(schema, filters, options)
254
}
255
}
256
257
case class MyInputPartition(partitionId: Int) extends InputPartition
258
259
class MyPartitionReaderFactory(
260
schema: StructType,
261
filters: Array[Predicate],
262
options: CaseInsensitiveStringMap
263
) extends PartitionReaderFactory {
264
265
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
266
new MyPartitionReader(partition.asInstanceOf[MyInputPartition], schema, filters)
267
}
268
269
override def supportColumnarReads(partition: InputPartition): Boolean = false
270
}
271
272
class MyPartitionReader(
273
partition: MyInputPartition,
274
schema: StructType,
275
filters: Array[Predicate]
276
) extends PartitionReader[InternalRow] {
277
278
private val iterator = createDataIterator()
279
private var currentRow: InternalRow = _
280
281
override def next(): Boolean = {
282
if (iterator.hasNext) {
283
currentRow = iterator.next()
284
true
285
} else {
286
false
287
}
288
}
289
290
override def get(): InternalRow = currentRow
291
292
override def close(): Unit = {
293
// Clean up resources
294
}
295
296
private def createDataIterator(): Iterator[InternalRow] = {
297
// Create iterator that reads data from external source
298
// applying filters and returning rows matching the schema
299
loadDataFromExternalSource(partition.partitionId, schema, filters)
300
}
301
}
302
```
303
304
## Write APIs
305
306
### Write Building
307
308
```scala { .api }
309
trait WriteBuilder {
310
def build(): Write
311
}
312
313
trait SupportsTruncate extends WriteBuilder {
314
def truncate(): WriteBuilder
315
}
316
317
trait SupportsOverwrite extends WriteBuilder {
318
def overwrite(filters: Array[Filter]): WriteBuilder
319
}
320
321
trait SupportsDynamicOverwrite extends WriteBuilder {
322
def overwriteDynamicPartitions(): WriteBuilder
323
}
324
325
trait SupportsStreamingWrite extends WriteBuilder {
326
def buildForStreaming(): StreamingWrite
327
}
328
```
329
330
**Usage Example:**
331
```scala
332
class MyWriteBuilder(info: LogicalWriteInfo) extends WriteBuilder with SupportsOverwrite with SupportsTruncate {
333
334
private var overwriteFilters: Array[Filter] = Array.empty
335
private var truncateTable: Boolean = false
336
337
override def overwrite(filters: Array[Filter]): WriteBuilder = {
338
this.overwriteFilters = filters
339
this
340
}
341
342
override def truncate(): WriteBuilder = {
343
this.truncateTable = true
344
this
345
}
346
347
override def build(): Write = {
348
if (truncateTable) {
349
new MyTruncateWrite(info)
350
} else if (overwriteFilters.nonEmpty) {
351
new MyOverwriteWrite(info, overwriteFilters)
352
} else {
353
new MyAppendWrite(info)
354
}
355
}
356
}
357
```
358
359
### Write Execution
360
361
```scala { .api }
362
trait Write {
363
def description(): String
364
def toBatch: BatchWrite
365
}
366
367
trait BatchWrite {
368
def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory
369
def commit(messages: Array[WriterCommitMessage]): Unit
370
def abort(messages: Array[WriterCommitMessage]): Unit
371
}
372
373
trait DataWriterFactory extends Serializable {
374
def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow]
375
}
376
377
trait DataWriter[T] extends Closeable {
378
def write(record: T): Unit
379
def commit(): WriterCommitMessage
380
def abort(): Unit
381
}
382
383
trait WriterCommitMessage extends Serializable
384
```
385
386
**Usage Example:**
387
```scala
388
class MyAppendWrite(info: LogicalWriteInfo) extends Write with BatchWrite {
389
390
override def description(): String = "MyAppendWrite"
391
override def toBatch: BatchWrite = this
392
393
override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
394
new MyDataWriterFactory(info.schema())
395
}
396
397
override def commit(messages: Array[WriterCommitMessage]): Unit = {
398
// Commit all partition writes atomically
399
messages.foreach {
400
case msg: MyWriterCommitMessage => commitPartition(msg)
401
case _ => throw new IllegalArgumentException("Unexpected commit message type")
402
}
403
}
404
405
override def abort(messages: Array[WriterCommitMessage]): Unit = {
406
// Clean up any partial writes
407
messages.foreach {
408
case msg: MyWriterCommitMessage => abortPartition(msg)
409
case _ => // Ignore unknown message types during abort
410
}
411
}
412
}
413
414
class MyDataWriterFactory(schema: StructType) extends DataWriterFactory {
415
override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
416
new MyDataWriter(partitionId, taskId, schema)
417
}
418
}
419
420
class MyDataWriter(
421
partitionId: Int,
422
taskId: Long,
423
schema: StructType
424
) extends DataWriter[InternalRow] {
425
426
private val outputPath = createOutputPath(partitionId, taskId)
427
private val writer = createExternalWriter(outputPath, schema)
428
private var recordCount = 0
429
430
override def write(record: InternalRow): Unit = {
431
writer.writeRecord(record)
432
recordCount += 1
433
}
434
435
override def commit(): WriterCommitMessage = {
436
writer.close()
437
MyWriterCommitMessage(outputPath, recordCount)
438
}
439
440
override def abort(): Unit = {
441
writer.close()
442
deleteOutputFile(outputPath)
443
}
444
445
override def close(): Unit = {
446
writer.close()
447
}
448
}
449
450
case class MyWriterCommitMessage(
451
outputPath: String,
452
recordCount: Int
453
) extends WriterCommitMessage
454
```
455
456
## Streaming APIs
457
458
### Streaming Read
459
460
```scala { .api }
461
trait SupportsAdmissionControl extends Table {
462
def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset
463
}
464
465
trait ContinuousStream extends SparkDataStream {
466
def mergeOffsets(offsets: Array[PartitionOffset]): Offset
467
def planInputPartitions(start: Offset): Array[InputPartition]
468
def createContinuousReaderFactory(): ContinuousPartitionReaderFactory
469
}
470
471
trait MicroBatchStream extends SparkDataStream {
472
def latestOffset(): Offset
473
def planInputPartitions(start: Offset, end: Offset): Array[InputPartition]
474
def createReaderFactory(): PartitionReaderFactory
475
}
476
```
477
478
**Usage Example:**
479
```scala
480
class MyMicroBatchStream(
481
schema: StructType,
482
options: CaseInsensitiveStringMap
483
) extends MicroBatchStream {
484
485
override def readSchema(): StructType = schema
486
487
override def initialOffset(): Offset = {
488
MyOffset(getCurrentOffsetFromSource())
489
}
490
491
override def latestOffset(): Offset = {
492
MyOffset(getLatestOffsetFromSource())
493
}
494
495
override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = {
496
val startOffset = start.asInstanceOf[MyOffset].value
497
val endOffset = end.asInstanceOf[MyOffset].value
498
499
createPartitionsForRange(startOffset, endOffset)
500
}
501
502
override def createReaderFactory(): PartitionReaderFactory = {
503
new MyStreamingReaderFactory(schema, options)
504
}
505
506
override def commit(end: Offset): Unit = {
507
// Commit offset in external system
508
commitOffsetInExternalSystem(end.asInstanceOf[MyOffset].value)
509
}
510
511
override def stop(): Unit = {
512
// Clean up streaming resources
513
}
514
}
515
516
case class MyOffset(value: Long) extends Offset {
517
override def json(): String = s"""{"offset":$value}"""
518
}
519
```
520
521
### Streaming Write
522
523
```scala { .api }
524
trait StreamingWrite {
525
def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory
526
def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit
527
def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit
528
}
529
530
trait StreamingDataWriterFactory extends Serializable {
531
def createWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[InternalRow]
532
}
533
```
534
535
## Data Source Provider
536
537
### TableProvider Interface
538
539
```scala { .api }
540
trait TableProvider {
541
def inferSchema(options: CaseInsensitiveStringMap): StructType
542
def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table
543
}
544
545
trait DataSourceV2 extends TableProvider {
546
def shortName(): String
547
}
548
```
549
550
**Usage Example:**
551
```scala
552
class MyDataSourceV2 extends DataSourceV2 {
553
554
override def shortName(): String = "mydatasource"
555
556
override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
557
// Infer schema from external data source
558
val path = options.get("path")
559
inferSchemaFromPath(path)
560
}
561
562
override def getTable(
563
schema: StructType,
564
partitioning: Array[Transform],
565
properties: util.Map[String, String]
566
): Table = {
567
val options = CaseInsensitiveStringMap.empty()
568
new MyTable(
569
Identifier.of(Array.empty, "my_table"),
570
schema,
571
partitioning,
572
properties
573
)
574
}
575
}
576
577
// Register the data source
578
class MyDataSourceV2Registration extends DataSourceRegister {
579
override def shortName(): String = "mydatasource"
580
}
581
```
582
583
## Expression Pushdown
584
585
### Predicate Types
586
587
```scala { .api }
588
sealed trait Predicate extends Serializable {
589
def references: Array[String]
590
}
591
592
case class EqualTo(attribute: String, value: Any) extends Predicate
593
case class EqualNullSafe(attribute: String, value: Any) extends Predicate
594
case class GreaterThan(attribute: String, value: Any) extends Predicate
595
case class GreaterThanOrEqual(attribute: String, value: Any) extends Predicate
596
case class LessThan(attribute: String, value: Any) extends Predicate
597
case class LessThanOrEqual(attribute: String, value: Any) extends Predicate
598
case class In(attribute: String, values: Array[Any]) extends Predicate
599
case class IsNull(attribute: String) extends Predicate
600
case class IsNotNull(attribute: String) extends Predicate
601
case class And(left: Predicate, right: Predicate) extends Predicate
602
case class Or(left: Predicate, right: Predicate) extends Predicate
603
case class Not(child: Predicate) extends Predicate
604
case class StringStartsWith(attribute: String, value: String) extends Predicate
605
case class StringEndsWith(attribute: String, value: String) extends Predicate
606
case class StringContains(attribute: String, value: String) extends Predicate
607
```
608
609
### Advanced Pushdown
610
611
```scala { .api }
612
trait SupportsPushDownCatalystFilters extends ScanBuilder {
613
def pushCatalystFilters(filters: Array[Expression]): Array[Expression]
614
def pushedCatalystFilters(): Array[Expression]
615
}
616
617
case class Aggregation(
618
aggregateExpressions: Array[AggregateFunc],
619
groupByExpressions: Array[Expression]
620
)
621
622
trait AggregateFunc extends Expression {
623
def aggregateFunction(): aggregate.AggregateFunction
624
}
625
```
626
627
The Data Source V2 API provides a comprehensive framework for building high-performance, feature-rich connectors that integrate seamlessly with Spark SQL's query planning and optimization capabilities.