or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdsources-sinks.mdsql-integration.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.mdwindow-operations.md

sources-sinks.mddocs/

0

# Sources and Sinks

1

2

The Flink Table API provides pluggable interfaces for integrating external data systems through table sources and sinks. Sources read data into tables, while sinks write table results to external systems.

3

4

## Capabilities

5

6

### Table Sources

7

8

Base interfaces for reading data from external systems into Flink tables.

9

10

```scala { .api }

11

/**

12

* Base interface for table sources

13

* @tparam T Type of records produced by the source

14

*/

15

trait TableSource[T] {

16

/**

17

* Gets the return type of the source

18

* @returns Type information for produced records

19

*/

20

def getReturnType: TypeInformation[T]

21

22

/**

23

* Gets the schema of the produced table

24

* @returns TableSchema describing field names and types

25

*/

26

def getTableSchema: TableSchema

27

28

/**

29

* Returns a string explanation of the source

30

* @returns Description of the source for debugging

31

*/

32

def explainSource(): String

33

}

34

```

35

36

### Batch Table Sources

37

38

Sources for batch processing that integrate with DataSet API.

39

40

```scala { .api }

41

/**

42

* Table source for batch processing

43

* @tparam T Type of records produced by the source

44

*/

45

trait BatchTableSource[T] extends TableSource[T] {

46

/**

47

* Creates a DataSet from the source

48

* @param execEnv Batch execution environment

49

* @returns DataSet containing the source data

50

*/

51

def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]

52

}

53

```

54

55

**Usage Examples:**

56

57

```scala

58

// Custom CSV batch source

59

class CsvBatchSource(

60

filePath: String,

61

fieldNames: Array[String],

62

fieldTypes: Array[TypeInformation[_]]

63

) extends BatchTableSource[Row] {

64

65

override def getReturnType: TypeInformation[Row] = {

66

Types.ROW(fieldNames, fieldTypes)

67

}

68

69

override def getTableSchema: TableSchema = {

70

new TableSchema(fieldNames, fieldTypes)

71

}

72

73

override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {

74

execEnv.readTextFile(filePath)

75

.map(line => {

76

val fields = line.split(",")

77

Row.of(fields: _*)

78

})

79

}

80

81

override def explainSource(): String = s"CsvBatchSource($filePath)"

82

}

83

84

// Register and use batch source

85

val csvSource = new CsvBatchSource(

86

"/path/to/data.csv",

87

Array("id", "name", "age"),

88

Array(Types.LONG, Types.STRING, Types.INT)

89

)

90

91

tEnv.registerTableSource("CsvData", csvSource)

92

val table = tEnv.scan("CsvData")

93

```

94

95

### Stream Table Sources

96

97

Sources for stream processing that integrate with DataStream API.

98

99

```scala { .api }

100

/**

101

* Table source for stream processing

102

* @tparam T Type of records produced by the source

103

*/

104

trait StreamTableSource[T] extends TableSource[T] {

105

/**

106

* Creates a DataStream from the source

107

* @param execEnv Stream execution environment

108

* @returns DataStream containing the source data

109

*/

110

def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]

111

}

112

```

113

114

**Usage Examples:**

115

116

```scala

117

// Custom Kafka-like stream source

118

class KafkaStreamSource(

119

topic: String,

120

fieldNames: Array[String],

121

fieldTypes: Array[TypeInformation[_]]

122

) extends StreamTableSource[Row] {

123

124

override def getReturnType: TypeInformation[Row] = {

125

Types.ROW(fieldNames, fieldTypes)

126

}

127

128

override def getTableSchema: TableSchema = {

129

new TableSchema(fieldNames, fieldTypes)

130

}

131

132

override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {

133

// Simulate Kafka consumer

134

execEnv.addSource(new SourceFunction[Row] {

135

var running = true

136

137

override def run(ctx: SourceContext[Row]): Unit = {

138

while (running) {

139

// Emit sample data

140

ctx.collect(Row.of(System.currentTimeMillis(), "sample_data", 42))

141

Thread.sleep(1000)

142

}

143

}

144

145

override def cancel(): Unit = {

146

running = false

147

}

148

})

149

}

150

151

override def explainSource(): String = s"KafkaStreamSource($topic)"

152

}

153

154

// Register and use stream source

155

val kafkaSource = new KafkaStreamSource(

156

"events",

157

Array("timestamp", "message", "value"),

158

Array(Types.LONG, Types.STRING, Types.INT)

159

)

160

161

tEnv.registerTableSource("KafkaEvents", kafkaSource)

162

val eventTable = tEnv.scan("KafkaEvents")

163

```

164

165

### Advanced Source Capabilities

166

167

Enhanced source interfaces supporting optimization features.

168

169

```scala { .api }

170

/**

171

* Source that supports projection pushdown

172

*/

173

trait ProjectableTableSource[T] extends TableSource[T] {

174

/**

175

* Creates a new source with projected fields

176

* @param fields Array of projected field indices

177

* @returns New source instance with projection applied

178

*/

179

def projectFields(fields: Array[Int]): TableSource[T]

180

181

/**

182

* Checks if projection pushdown is supported

183

* @returns True if projection is supported

184

*/

185

def supportsProjection: Boolean = true

186

}

187

188

/**

189

* Source that supports filter pushdown

190

*/

191

trait FilterableTableSource[T] extends TableSource[T] {

192

/**

193

* Creates a new source with pushed-down filters

194

* @param predicates Array of filter expressions

195

* @returns New source instance with filters applied

196

*/

197

def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]

198

199

/**

200

* Checks if filter pushdown is supported

201

* @returns True if filtering is supported

202

*/

203

def supportsFiltering: Boolean = true

204

}

205

206

/**

207

* Source with custom field mapping

208

*/

209

trait DefinedFieldMapping extends TableSource[_] {

210

/**

211

* Defines mapping from physical to logical fields

212

* @returns Map from logical field name to physical field name

213

*/

214

def getFieldMapping: java.util.Map[String, String]

215

}

216

217

/**

218

* Source that defines rowtime attributes for event time

219

*/

220

trait DefinedRowtimeAttributes extends TableSource[_] {

221

/**

222

* Gets rowtime attribute descriptors

223

* @returns List of rowtime attribute descriptors

224

*/

225

def getRowtimeAttributeDescriptors: java.util.List[RowtimeAttributeDescriptor]

226

}

227

228

/**

229

* Source that defines processing time attribute

230

*/

231

trait DefinedProctimeAttribute extends TableSource[_] {

232

/**

233

* Gets the processing time attribute name

234

* @returns Processing time attribute name, null if none

235

*/

236

def getProctimeAttribute: String

237

}

238

```

239

240

**Usage Examples:**

241

242

```scala

243

// Advanced source with projection and filtering

244

class OptimizedCsvSource(

245

filePath: String,

246

fieldNames: Array[String],

247

fieldTypes: Array[TypeInformation[_]]

248

) extends BatchTableSource[Row]

249

with ProjectableTableSource[Row]

250

with FilterableTableSource[Row] {

251

252

private var projectedFields: Option[Array[Int]] = None

253

private var filters: List[Expression] = List.empty

254

255

override def projectFields(fields: Array[Int]): TableSource[Row] = {

256

val newSource = new OptimizedCsvSource(filePath, fieldNames, fieldTypes)

257

newSource.projectedFields = Some(fields)

258

newSource.filters = this.filters

259

newSource

260

}

261

262

override def applyPredicate(predicates: java.util.List[Expression]): TableSource[Row] = {

263

val newSource = new OptimizedCsvSource(filePath, fieldNames, fieldTypes)

264

newSource.projectedFields = this.projectedFields

265

newSource.filters = predicates.asScala.toList

266

newSource

267

}

268

269

override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {

270

var dataSet = execEnv.readTextFile(filePath)

271

.map(line => {

272

val fields = line.split(",")

273

Row.of(fields: _*)

274

})

275

276

// Apply filters if present

277

filters.foreach { filter =>

278

// Apply filter logic (simplified)

279

dataSet = dataSet.filter(row => evaluateFilter(row, filter))

280

}

281

282

// Apply projection if present

283

projectedFields match {

284

case Some(fields) => dataSet.map(row => projectRow(row, fields))

285

case None => dataSet

286

}

287

}

288

289

private def evaluateFilter(row: Row, filter: Expression): Boolean = {

290

// Simplified filter evaluation

291

true

292

}

293

294

private def projectRow(row: Row, fields: Array[Int]): Row = {

295

Row.of(fields.map(row.getField): _*)

296

}

297

298

// Other required methods...

299

override def getReturnType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)

300

override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)

301

override def explainSource(): String = s"OptimizedCsvSource($filePath)"

302

}

303

```

304

305

### Table Sinks

306

307

Base interfaces for writing table results to external systems.

308

309

```scala { .api }

310

/**

311

* Base interface for table sinks

312

* @tparam T Type of records consumed by the sink

313

*/

314

trait TableSink[T] {

315

/**

316

* Gets the expected input type

317

* @returns Type information for consumed records

318

*/

319

def getOutputType: TypeInformation[T]

320

321

/**

322

* Configures the sink with field information

323

* @param fieldNames Array of field names

324

* @param fieldTypes Array of field types

325

* @returns Configured sink instance

326

*/

327

def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[T]

328

}

329

```

330

331

### Batch Table Sinks

332

333

Sinks for batch processing that integrate with DataSet API.

334

335

```scala { .api }

336

/**

337

* Table sink for batch processing

338

* @tparam T Type of records consumed by the sink

339

*/

340

trait BatchTableSink[T] extends TableSink[T] {

341

/**

342

* Emits the DataSet to the sink

343

* @param dataSet DataSet to write

344

* @param execEnv Batch execution environment

345

*/

346

def emitDataSet(dataSet: DataSet[T], execEnv: ExecutionEnvironment): Unit

347

}

348

```

349

350

**Usage Examples:**

351

352

```scala

353

// Custom CSV batch sink

354

class CsvBatchSink(outputPath: String) extends BatchTableSink[Row] {

355

private var fieldNames: Array[String] = _

356

private var fieldTypes: Array[TypeInformation[_]] = _

357

358

override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {

359

val newSink = new CsvBatchSink(outputPath)

360

newSink.fieldNames = fieldNames

361

newSink.fieldTypes = fieldTypes

362

newSink

363

}

364

365

override def getOutputType: TypeInformation[Row] = {

366

Types.ROW(fieldNames, fieldTypes)

367

}

368

369

override def emitDataSet(dataSet: DataSet[Row], execEnv: ExecutionEnvironment): Unit = {

370

dataSet

371

.map(row => (0 until row.getArity).map(row.getField).mkString(","))

372

.writeAsText(outputPath)

373

}

374

}

375

376

// Register and use batch sink

377

val csvSink = new CsvBatchSink("/path/to/output.csv")

378

tEnv.registerTableSink("CsvOutput", fieldNames, fieldTypes, csvSink)

379

table.insertInto("CsvOutput")

380

```

381

382

### Stream Table Sinks

383

384

Sinks for stream processing with different consistency guarantees.

385

386

```scala { .api }

387

/**

388

* Append-only stream sink for insert-only tables

389

* @tparam T Type of records consumed by the sink

390

*/

391

trait AppendStreamTableSink[T] extends TableSink[T] {

392

/**

393

* Emits the DataStream to the sink

394

* @param dataStream DataStream to write

395

* @param execEnv Stream execution environment

396

*/

397

def emitDataStream(dataStream: DataStream[T], execEnv: StreamExecutionEnvironment): Unit

398

}

399

400

/**

401

* Retract stream sink for tables with updates and deletes

402

* @tparam T Type of records consumed by the sink

403

*/

404

trait RetractStreamTableSink[T] extends TableSink[T] {

405

/**

406

* Emits the retract DataStream to the sink

407

* @param dataStream DataStream of (Boolean, T) where Boolean indicates add/retract

408

* @param execEnv Stream execution environment

409

*/

410

def emitDataStream(dataStream: DataStream[(Boolean, T)], execEnv: StreamExecutionEnvironment): Unit

411

}

412

413

/**

414

* Upsert stream sink for tables with primary keys

415

* @tparam T Type of records consumed by the sink

416

*/

417

trait UpsertStreamTableSink[T] extends TableSink[T] {

418

/**

419

* Gets the primary key fields for upsert operations

420

* @returns Array of primary key field names

421

*/

422

def getKeys: Array[String]

423

424

/**

425

* Indicates if the sink expects upsert or retract stream

426

* @returns True for upsert stream, false for retract stream

427

*/

428

def isUpsertMode: Boolean

429

430

/**

431

* Emits the upsert DataStream to the sink

432

* @param dataStream DataStream of (Boolean, T) for upsert/delete operations

433

* @param execEnv Stream execution environment

434

*/

435

def emitDataStream(dataStream: DataStream[(Boolean, T)], execEnv: StreamExecutionEnvironment): Unit

436

}

437

```

438

439

**Usage Examples:**

440

441

```scala

442

// Append-only stream sink

443

class PrintAppendSink extends AppendStreamTableSink[Row] {

444

private var fieldNames: Array[String] = _

445

private var fieldTypes: Array[TypeInformation[_]] = _

446

447

override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {

448

val newSink = new PrintAppendSink()

449

newSink.fieldNames = fieldNames

450

newSink.fieldTypes = fieldTypes

451

newSink

452

}

453

454

override def getOutputType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)

455

456

override def emitDataStream(dataStream: DataStream[Row], execEnv: StreamExecutionEnvironment): Unit = {

457

dataStream.print()

458

}

459

}

460

461

// Retract stream sink for aggregated results

462

class DatabaseRetractSink(jdbcUrl: String) extends RetractStreamTableSink[Row] {

463

private var fieldNames: Array[String] = _

464

private var fieldTypes: Array[TypeInformation[_]] = _

465

466

override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {

467

val newSink = new DatabaseRetractSink(jdbcUrl)

468

newSink.fieldNames = fieldNames

469

newSink.fieldTypes = fieldTypes

470

newSink

471

}

472

473

override def getOutputType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)

474

475

override def emitDataStream(dataStream: DataStream[(Boolean, Row)], execEnv: StreamExecutionEnvironment): Unit = {

476

dataStream.addSink(new SinkFunction[(Boolean, Row)] {

477

override def invoke(value: (Boolean, Row)): Unit = {

478

val (isAdd, row) = value

479

if (isAdd) {

480

// Insert or update row in database

481

insertOrUpdate(row)

482

} else {

483

// Delete row from database

484

delete(row)

485

}

486

}

487

})

488

}

489

490

private def insertOrUpdate(row: Row): Unit = {

491

// Database insertion/update logic

492

}

493

494

private def delete(row: Row): Unit = {

495

// Database deletion logic

496

}

497

}

498

499

// Upsert stream sink with primary key

500

class KafkaUpsertSink(topic: String, keyFields: Array[String]) extends UpsertStreamTableSink[Row] {

501

private var fieldNames: Array[String] = _

502

private var fieldTypes: Array[TypeInformation[_]] = _

503

504

override def getKeys: Array[String] = keyFields

505

override def isUpsertMode: Boolean = true

506

507

override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {

508

val newSink = new KafkaUpsertSink(topic, keyFields)

509

newSink.fieldNames = fieldNames

510

newSink.fieldTypes = fieldTypes

511

newSink

512

}

513

514

override def getOutputType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)

515

516

override def emitDataStream(dataStream: DataStream[(Boolean, Row)], execEnv: StreamExecutionEnvironment): Unit = {

517

dataStream.addSink(new SinkFunction[(Boolean, Row)] {

518

override def invoke(value: (Boolean, Row)): Unit = {

519

val (isUpsert, row) = value

520

val key = extractKey(row)

521

val message = if (isUpsert) serializeRow(row) else null // null for delete

522

sendToKafka(topic, key, message)

523

}

524

})

525

}

526

527

private def extractKey(row: Row): String = {

528

// Extract key fields from row

529

keyFields.map(field => row.getField(fieldNames.indexOf(field))).mkString("|")

530

}

531

532

private def serializeRow(row: Row): String = {

533

// Serialize row to JSON or other format

534

(0 until row.getArity).map(row.getField).mkString(",")

535

}

536

537

private def sendToKafka(topic: String, key: String, message: String): Unit = {

538

// Send to Kafka

539

}

540

}

541

```

542

543

### Source and Sink Registration

544

545

Methods for registering sources and sinks with the table environment.

546

547

```scala { .api }

548

// TableEnvironment methods for source/sink registration

549

def registerTableSource(name: String, tableSource: TableSource[_]): Unit

550

def registerTableSink(name: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]], tableSink: TableSink[_]): Unit

551

552

// Creating tables from sources

553

def fromTableSource(source: TableSource[_]): Table

554

```

555

556

**Usage Examples:**

557

558

```scala

559

// Register multiple sources and sinks

560

val csvSource = new CsvBatchSource("/input/data.csv", Array("id", "name"), Array(Types.LONG, Types.STRING))

561

val kafkaSource = new KafkaStreamSource("events", Array("timestamp", "message"), Array(Types.LONG, Types.STRING))

562

val printSink = new PrintAppendSink()

563

564

tEnv.registerTableSource("CsvInput", csvSource)

565

tEnv.registerTableSource("KafkaInput", kafkaSource)

566

tEnv.registerTableSink("PrintOutput", Array("result"), Array(Types.STRING), printSink)

567

568

// Use registered sources and sinks

569

val csvTable = tEnv.scan("CsvInput")

570

val kafkaTable = tEnv.scan("KafkaInput")

571

val result = csvTable.union(kafkaTable.select('message.cast(Types.STRING)))

572

result.insertInto("PrintOutput")

573

```

574

575

## Types

576

577

```scala { .api }

578

trait TableSource[T]

579

trait BatchTableSource[T] extends TableSource[T]

580

trait StreamTableSource[T] extends TableSource[T]

581

trait ProjectableTableSource[T] extends TableSource[T]

582

trait FilterableTableSource[T] extends TableSource[T]

583

trait DefinedFieldMapping extends TableSource[_]

584

trait DefinedRowtimeAttributes extends TableSource[_]

585

trait DefinedProctimeAttribute extends TableSource[_]

586

587

trait TableSink[T]

588

trait BatchTableSink[T] extends TableSink[T]

589

trait AppendStreamTableSink[T] extends TableSink[T]

590

trait RetractStreamTableSink[T] extends TableSink[T]

591

trait UpsertStreamTableSink[T] extends TableSink[T]

592

593

case class RowtimeAttributeDescriptor(attributeName: String, timestampExtractor: TimestampExtractor, watermarkStrategy: WatermarkStrategy)

594

```