0
# Sources and Sinks
1
2
The Flink Table API provides pluggable interfaces for integrating external data systems through table sources and sinks. Sources read data into tables, while sinks write table results to external systems.
3
4
## Capabilities
5
6
### Table Sources
7
8
Base interfaces for reading data from external systems into Flink tables.
9
10
```scala { .api }
11
/**
12
* Base interface for table sources
13
* @tparam T Type of records produced by the source
14
*/
15
trait TableSource[T] {
16
/**
17
* Gets the return type of the source
18
* @returns Type information for produced records
19
*/
20
def getReturnType: TypeInformation[T]
21
22
/**
23
* Gets the schema of the produced table
24
* @returns TableSchema describing field names and types
25
*/
26
def getTableSchema: TableSchema
27
28
/**
29
* Returns a string explanation of the source
30
* @returns Description of the source for debugging
31
*/
32
def explainSource(): String
33
}
34
```
35
36
### Batch Table Sources
37
38
Sources for batch processing that integrate with DataSet API.
39
40
```scala { .api }
41
/**
42
* Table source for batch processing
43
* @tparam T Type of records produced by the source
44
*/
45
trait BatchTableSource[T] extends TableSource[T] {
46
/**
47
* Creates a DataSet from the source
48
* @param execEnv Batch execution environment
49
* @returns DataSet containing the source data
50
*/
51
def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
52
}
53
```
54
55
**Usage Examples:**
56
57
```scala
58
// Custom CSV batch source
59
class CsvBatchSource(
60
filePath: String,
61
fieldNames: Array[String],
62
fieldTypes: Array[TypeInformation[_]]
63
) extends BatchTableSource[Row] {
64
65
override def getReturnType: TypeInformation[Row] = {
66
Types.ROW(fieldNames, fieldTypes)
67
}
68
69
override def getTableSchema: TableSchema = {
70
new TableSchema(fieldNames, fieldTypes)
71
}
72
73
override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
74
execEnv.readTextFile(filePath)
75
.map(line => {
76
val fields = line.split(",")
77
Row.of(fields: _*)
78
})
79
}
80
81
override def explainSource(): String = s"CsvBatchSource($filePath)"
82
}
83
84
// Register and use batch source
85
val csvSource = new CsvBatchSource(
86
"/path/to/data.csv",
87
Array("id", "name", "age"),
88
Array(Types.LONG, Types.STRING, Types.INT)
89
)
90
91
tEnv.registerTableSource("CsvData", csvSource)
92
val table = tEnv.scan("CsvData")
93
```
94
95
### Stream Table Sources
96
97
Sources for stream processing that integrate with DataStream API.
98
99
```scala { .api }
100
/**
101
* Table source for stream processing
102
* @tparam T Type of records produced by the source
103
*/
104
trait StreamTableSource[T] extends TableSource[T] {
105
/**
106
* Creates a DataStream from the source
107
* @param execEnv Stream execution environment
108
* @returns DataStream containing the source data
109
*/
110
def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
111
}
112
```
113
114
**Usage Examples:**
115
116
```scala
117
// Custom Kafka-like stream source
118
class KafkaStreamSource(
119
topic: String,
120
fieldNames: Array[String],
121
fieldTypes: Array[TypeInformation[_]]
122
) extends StreamTableSource[Row] {
123
124
override def getReturnType: TypeInformation[Row] = {
125
Types.ROW(fieldNames, fieldTypes)
126
}
127
128
override def getTableSchema: TableSchema = {
129
new TableSchema(fieldNames, fieldTypes)
130
}
131
132
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
133
// Simulate Kafka consumer
134
execEnv.addSource(new SourceFunction[Row] {
135
var running = true
136
137
override def run(ctx: SourceContext[Row]): Unit = {
138
while (running) {
139
// Emit sample data
140
ctx.collect(Row.of(System.currentTimeMillis(), "sample_data", 42))
141
Thread.sleep(1000)
142
}
143
}
144
145
override def cancel(): Unit = {
146
running = false
147
}
148
})
149
}
150
151
override def explainSource(): String = s"KafkaStreamSource($topic)"
152
}
153
154
// Register and use stream source
155
val kafkaSource = new KafkaStreamSource(
156
"events",
157
Array("timestamp", "message", "value"),
158
Array(Types.LONG, Types.STRING, Types.INT)
159
)
160
161
tEnv.registerTableSource("KafkaEvents", kafkaSource)
162
val eventTable = tEnv.scan("KafkaEvents")
163
```
164
165
### Advanced Source Capabilities
166
167
Enhanced source interfaces supporting optimization features.
168
169
```scala { .api }
170
/**
171
* Source that supports projection pushdown
172
*/
173
trait ProjectableTableSource[T] extends TableSource[T] {
174
/**
175
* Creates a new source with projected fields
176
* @param fields Array of projected field indices
177
* @returns New source instance with projection applied
178
*/
179
def projectFields(fields: Array[Int]): TableSource[T]
180
181
/**
182
* Checks if projection pushdown is supported
183
* @returns True if projection is supported
184
*/
185
def supportsProjection: Boolean = true
186
}
187
188
/**
189
* Source that supports filter pushdown
190
*/
191
trait FilterableTableSource[T] extends TableSource[T] {
192
/**
193
* Creates a new source with pushed-down filters
194
* @param predicates Array of filter expressions
195
* @returns New source instance with filters applied
196
*/
197
def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]
198
199
/**
200
* Checks if filter pushdown is supported
201
* @returns True if filtering is supported
202
*/
203
def supportsFiltering: Boolean = true
204
}
205
206
/**
207
* Source with custom field mapping
208
*/
209
trait DefinedFieldMapping extends TableSource[_] {
210
/**
211
* Defines mapping from physical to logical fields
212
* @returns Map from logical field name to physical field name
213
*/
214
def getFieldMapping: java.util.Map[String, String]
215
}
216
217
/**
218
* Source that defines rowtime attributes for event time
219
*/
220
trait DefinedRowtimeAttributes extends TableSource[_] {
221
/**
222
* Gets rowtime attribute descriptors
223
* @returns List of rowtime attribute descriptors
224
*/
225
def getRowtimeAttributeDescriptors: java.util.List[RowtimeAttributeDescriptor]
226
}
227
228
/**
229
* Source that defines processing time attribute
230
*/
231
trait DefinedProctimeAttribute extends TableSource[_] {
232
/**
233
* Gets the processing time attribute name
234
* @returns Processing time attribute name, null if none
235
*/
236
def getProctimeAttribute: String
237
}
238
```
239
240
**Usage Examples:**
241
242
```scala
243
// Advanced source with projection and filtering
244
class OptimizedCsvSource(
245
filePath: String,
246
fieldNames: Array[String],
247
fieldTypes: Array[TypeInformation[_]]
248
) extends BatchTableSource[Row]
249
with ProjectableTableSource[Row]
250
with FilterableTableSource[Row] {
251
252
private var projectedFields: Option[Array[Int]] = None
253
private var filters: List[Expression] = List.empty
254
255
override def projectFields(fields: Array[Int]): TableSource[Row] = {
256
val newSource = new OptimizedCsvSource(filePath, fieldNames, fieldTypes)
257
newSource.projectedFields = Some(fields)
258
newSource.filters = this.filters
259
newSource
260
}
261
262
override def applyPredicate(predicates: java.util.List[Expression]): TableSource[Row] = {
263
val newSource = new OptimizedCsvSource(filePath, fieldNames, fieldTypes)
264
newSource.projectedFields = this.projectedFields
265
newSource.filters = predicates.asScala.toList
266
newSource
267
}
268
269
override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
270
var dataSet = execEnv.readTextFile(filePath)
271
.map(line => {
272
val fields = line.split(",")
273
Row.of(fields: _*)
274
})
275
276
// Apply filters if present
277
filters.foreach { filter =>
278
// Apply filter logic (simplified)
279
dataSet = dataSet.filter(row => evaluateFilter(row, filter))
280
}
281
282
// Apply projection if present
283
projectedFields match {
284
case Some(fields) => dataSet.map(row => projectRow(row, fields))
285
case None => dataSet
286
}
287
}
288
289
private def evaluateFilter(row: Row, filter: Expression): Boolean = {
290
// Simplified filter evaluation
291
true
292
}
293
294
private def projectRow(row: Row, fields: Array[Int]): Row = {
295
Row.of(fields.map(row.getField): _*)
296
}
297
298
// Other required methods...
299
override def getReturnType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)
300
override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
301
override def explainSource(): String = s"OptimizedCsvSource($filePath)"
302
}
303
```
304
305
### Table Sinks
306
307
Base interfaces for writing table results to external systems.
308
309
```scala { .api }
310
/**
311
* Base interface for table sinks
312
* @tparam T Type of records consumed by the sink
313
*/
314
trait TableSink[T] {
315
/**
316
* Gets the expected input type
317
* @returns Type information for consumed records
318
*/
319
def getOutputType: TypeInformation[T]
320
321
/**
322
* Configures the sink with field information
323
* @param fieldNames Array of field names
324
* @param fieldTypes Array of field types
325
* @returns Configured sink instance
326
*/
327
def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[T]
328
}
329
```
330
331
### Batch Table Sinks
332
333
Sinks for batch processing that integrate with DataSet API.
334
335
```scala { .api }
336
/**
337
* Table sink for batch processing
338
* @tparam T Type of records consumed by the sink
339
*/
340
trait BatchTableSink[T] extends TableSink[T] {
341
/**
342
* Emits the DataSet to the sink
343
* @param dataSet DataSet to write
344
* @param execEnv Batch execution environment
345
*/
346
def emitDataSet(dataSet: DataSet[T], execEnv: ExecutionEnvironment): Unit
347
}
348
```
349
350
**Usage Examples:**
351
352
```scala
353
// Custom CSV batch sink
354
class CsvBatchSink(outputPath: String) extends BatchTableSink[Row] {
355
private var fieldNames: Array[String] = _
356
private var fieldTypes: Array[TypeInformation[_]] = _
357
358
override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
359
val newSink = new CsvBatchSink(outputPath)
360
newSink.fieldNames = fieldNames
361
newSink.fieldTypes = fieldTypes
362
newSink
363
}
364
365
override def getOutputType: TypeInformation[Row] = {
366
Types.ROW(fieldNames, fieldTypes)
367
}
368
369
override def emitDataSet(dataSet: DataSet[Row], execEnv: ExecutionEnvironment): Unit = {
370
dataSet
371
.map(row => (0 until row.getArity).map(row.getField).mkString(","))
372
.writeAsText(outputPath)
373
}
374
}
375
376
// Register and use batch sink
377
val csvSink = new CsvBatchSink("/path/to/output.csv")
378
tEnv.registerTableSink("CsvOutput", fieldNames, fieldTypes, csvSink)
379
table.insertInto("CsvOutput")
380
```
381
382
### Stream Table Sinks
383
384
Sinks for stream processing with different consistency guarantees.
385
386
```scala { .api }
387
/**
388
* Append-only stream sink for insert-only tables
389
* @tparam T Type of records consumed by the sink
390
*/
391
trait AppendStreamTableSink[T] extends TableSink[T] {
392
/**
393
* Emits the DataStream to the sink
394
* @param dataStream DataStream to write
395
* @param execEnv Stream execution environment
396
*/
397
def emitDataStream(dataStream: DataStream[T], execEnv: StreamExecutionEnvironment): Unit
398
}
399
400
/**
401
* Retract stream sink for tables with updates and deletes
402
* @tparam T Type of records consumed by the sink
403
*/
404
trait RetractStreamTableSink[T] extends TableSink[T] {
405
/**
406
* Emits the retract DataStream to the sink
407
* @param dataStream DataStream of (Boolean, T) where Boolean indicates add/retract
408
* @param execEnv Stream execution environment
409
*/
410
def emitDataStream(dataStream: DataStream[(Boolean, T)], execEnv: StreamExecutionEnvironment): Unit
411
}
412
413
/**
414
* Upsert stream sink for tables with primary keys
415
* @tparam T Type of records consumed by the sink
416
*/
417
trait UpsertStreamTableSink[T] extends TableSink[T] {
418
/**
419
* Gets the primary key fields for upsert operations
420
* @returns Array of primary key field names
421
*/
422
def getKeys: Array[String]
423
424
/**
425
* Indicates if the sink expects upsert or retract stream
426
* @returns True for upsert stream, false for retract stream
427
*/
428
def isUpsertMode: Boolean
429
430
/**
431
* Emits the upsert DataStream to the sink
432
* @param dataStream DataStream of (Boolean, T) for upsert/delete operations
433
* @param execEnv Stream execution environment
434
*/
435
def emitDataStream(dataStream: DataStream[(Boolean, T)], execEnv: StreamExecutionEnvironment): Unit
436
}
437
```
438
439
**Usage Examples:**
440
441
```scala
442
// Append-only stream sink
443
class PrintAppendSink extends AppendStreamTableSink[Row] {
444
private var fieldNames: Array[String] = _
445
private var fieldTypes: Array[TypeInformation[_]] = _
446
447
override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
448
val newSink = new PrintAppendSink()
449
newSink.fieldNames = fieldNames
450
newSink.fieldTypes = fieldTypes
451
newSink
452
}
453
454
override def getOutputType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)
455
456
override def emitDataStream(dataStream: DataStream[Row], execEnv: StreamExecutionEnvironment): Unit = {
457
dataStream.print()
458
}
459
}
460
461
// Retract stream sink for aggregated results
462
class DatabaseRetractSink(jdbcUrl: String) extends RetractStreamTableSink[Row] {
463
private var fieldNames: Array[String] = _
464
private var fieldTypes: Array[TypeInformation[_]] = _
465
466
override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
467
val newSink = new DatabaseRetractSink(jdbcUrl)
468
newSink.fieldNames = fieldNames
469
newSink.fieldTypes = fieldTypes
470
newSink
471
}
472
473
override def getOutputType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)
474
475
override def emitDataStream(dataStream: DataStream[(Boolean, Row)], execEnv: StreamExecutionEnvironment): Unit = {
476
dataStream.addSink(new SinkFunction[(Boolean, Row)] {
477
override def invoke(value: (Boolean, Row)): Unit = {
478
val (isAdd, row) = value
479
if (isAdd) {
480
// Insert or update row in database
481
insertOrUpdate(row)
482
} else {
483
// Delete row from database
484
delete(row)
485
}
486
}
487
})
488
}
489
490
private def insertOrUpdate(row: Row): Unit = {
491
// Database insertion/update logic
492
}
493
494
private def delete(row: Row): Unit = {
495
// Database deletion logic
496
}
497
}
498
499
// Upsert stream sink with primary key
500
class KafkaUpsertSink(topic: String, keyFields: Array[String]) extends UpsertStreamTableSink[Row] {
501
private var fieldNames: Array[String] = _
502
private var fieldTypes: Array[TypeInformation[_]] = _
503
504
override def getKeys: Array[String] = keyFields
505
override def isUpsertMode: Boolean = true
506
507
override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
508
val newSink = new KafkaUpsertSink(topic, keyFields)
509
newSink.fieldNames = fieldNames
510
newSink.fieldTypes = fieldTypes
511
newSink
512
}
513
514
override def getOutputType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)
515
516
override def emitDataStream(dataStream: DataStream[(Boolean, Row)], execEnv: StreamExecutionEnvironment): Unit = {
517
dataStream.addSink(new SinkFunction[(Boolean, Row)] {
518
override def invoke(value: (Boolean, Row)): Unit = {
519
val (isUpsert, row) = value
520
val key = extractKey(row)
521
val message = if (isUpsert) serializeRow(row) else null // null for delete
522
sendToKafka(topic, key, message)
523
}
524
})
525
}
526
527
private def extractKey(row: Row): String = {
528
// Extract key fields from row
529
keyFields.map(field => row.getField(fieldNames.indexOf(field))).mkString("|")
530
}
531
532
private def serializeRow(row: Row): String = {
533
// Serialize row to JSON or other format
534
(0 until row.getArity).map(row.getField).mkString(",")
535
}
536
537
private def sendToKafka(topic: String, key: String, message: String): Unit = {
538
// Send to Kafka
539
}
540
}
541
```
542
543
### Source and Sink Registration
544
545
Methods for registering sources and sinks with the table environment.
546
547
```scala { .api }
548
// TableEnvironment methods for source/sink registration
549
def registerTableSource(name: String, tableSource: TableSource[_]): Unit
550
def registerTableSink(name: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]], tableSink: TableSink[_]): Unit
551
552
// Creating tables from sources
553
def fromTableSource(source: TableSource[_]): Table
554
```
555
556
**Usage Examples:**
557
558
```scala
559
// Register multiple sources and sinks
560
val csvSource = new CsvBatchSource("/input/data.csv", Array("id", "name"), Array(Types.LONG, Types.STRING))
561
val kafkaSource = new KafkaStreamSource("events", Array("timestamp", "message"), Array(Types.LONG, Types.STRING))
562
val printSink = new PrintAppendSink()
563
564
tEnv.registerTableSource("CsvInput", csvSource)
565
tEnv.registerTableSource("KafkaInput", kafkaSource)
566
tEnv.registerTableSink("PrintOutput", Array("result"), Array(Types.STRING), printSink)
567
568
// Use registered sources and sinks
569
val csvTable = tEnv.scan("CsvInput")
570
val kafkaTable = tEnv.scan("KafkaInput")
571
val result = csvTable.union(kafkaTable.select('message.cast(Types.STRING)))
572
result.insertInto("PrintOutput")
573
```
574
575
## Types
576
577
```scala { .api }
578
trait TableSource[T]
579
trait BatchTableSource[T] extends TableSource[T]
580
trait StreamTableSource[T] extends TableSource[T]
581
trait ProjectableTableSource[T] extends TableSource[T]
582
trait FilterableTableSource[T] extends TableSource[T]
583
trait DefinedFieldMapping extends TableSource[_]
584
trait DefinedRowtimeAttributes extends TableSource[_]
585
trait DefinedProctimeAttribute extends TableSource[_]
586
587
trait TableSink[T]
588
trait BatchTableSink[T] extends TableSink[T]
589
trait AppendStreamTableSink[T] extends TableSink[T]
590
trait RetractStreamTableSink[T] extends TableSink[T]
591
trait UpsertStreamTableSink[T] extends TableSink[T]
592
593
case class RowtimeAttributeDescriptor(attributeName: String, timestampExtractor: TimestampExtractor, watermarkStrategy: WatermarkStrategy)
594
```