or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdtable-api.md

table-api.mddocs/

0

# Table API Integration

1

2

Comprehensive API reference for Table API integration classes in the Apache Flink Kafka Connector 0.8. These classes enable declarative SQL and Table API access to Kafka topics.

3

4

**Important Note**: All Table API classes are marked as `@Internal`, indicating they are not part of the stable public API and may change between versions.

5

6

## Kafka08TableSource

7

8

**Package:** `org.apache.flink.streaming.connectors.kafka`

9

**Annotations:** `@Internal`

10

**Extends:** `KafkaTableSourceBase`

11

**Description:** Kafka StreamTableSource for Kafka 0.8, providing Table API access to Kafka topics as streaming tables.

12

13

### Class Declaration

14

15

```java { .api }

16

@Internal

17

public class Kafka08TableSource extends KafkaTableSourceBase

18

```

19

20

### Constructors

21

22

#### Full Constructor

23

24

```java { .api }

25

/**

26

* Creates a Kafka08TableSource with full configuration options.

27

*

28

* @param schema The table schema defining column names and types

29

* @param proctimeAttribute Optional processing time attribute name

30

* @param rowtimeAttributeDescriptors List of rowtime attribute descriptors for event time

31

* @param fieldMapping Optional mapping from table fields to Kafka message fields

32

* @param topic The Kafka topic name to read from

33

* @param properties Kafka consumer properties

34

* @param deserializationSchema Schema to deserialize Kafka messages to Row objects

35

* @param startupMode How the consumer should start reading (EARLIEST, LATEST, GROUP_OFFSETS, SPECIFIC_OFFSETS)

36

* @param specificStartupOffsets Map of partition to offset for SPECIFIC_OFFSETS startup mode

37

*/

38

public Kafka08TableSource(

39

TableSchema schema,

40

Optional<String> proctimeAttribute,

41

List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,

42

Optional<Map<String, String>> fieldMapping,

43

String topic,

44

Properties properties,

45

DeserializationSchema<Row> deserializationSchema,

46

StartupMode startupMode,

47

Map<KafkaTopicPartition, Long> specificStartupOffsets)

48

```

49

50

#### Basic Constructor

51

52

```java { .api }

53

/**

54

* Creates a basic Kafka08TableSource with minimal configuration.

55

*

56

* @param schema The table schema defining column names and types

57

* @param topic The Kafka topic name to read from

58

* @param properties Kafka consumer properties

59

* @param deserializationSchema Schema to deserialize Kafka messages to Row objects

60

*/

61

public Kafka08TableSource(

62

TableSchema schema,

63

String topic,

64

Properties properties,

65

DeserializationSchema<Row> deserializationSchema)

66

```

67

68

### Protected Methods

69

70

```java { .api }

71

/**

72

* Creates the underlying FlinkKafkaConsumer for this table source.

73

*

74

* @param topic The Kafka topic name

75

* @param properties Kafka consumer properties

76

* @param deserializationSchema The deserialization schema for Row objects

77

* @return FlinkKafkaConsumerBase instance configured for Kafka 0.8

78

*/

79

protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(

80

String topic,

81

Properties properties,

82

DeserializationSchema<Row> deserializationSchema)

83

```

84

85

### Usage Examples

86

87

#### Basic Table Source Setup

88

89

```java { .api }

90

import org.apache.flink.table.api.TableSchema;

91

import org.apache.flink.table.api.Types;

92

import org.apache.flink.api.common.serialization.DeserializationSchema;

93

import org.apache.flink.types.Row;

94

import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;

95

96

// Define table schema

97

TableSchema schema = TableSchema.builder()

98

.field("user_id", Types.LONG())

99

.field("user_name", Types.STRING())

100

.field("user_email", Types.STRING())

101

.field("registration_time", Types.SQL_TIMESTAMP())

102

.build();

103

104

// Kafka properties

105

Properties properties = new Properties();

106

properties.setProperty("zookeeper.connect", "localhost:2181");

107

properties.setProperty("group.id", "table-api-group");

108

109

// Custom deserialization schema for JSON messages

110

DeserializationSchema<Row> deserializer = new JsonRowDeserializationSchema.Builder(schema)

111

.build();

112

113

// Create table source

114

Kafka08TableSource tableSource = new Kafka08TableSource(

115

schema,

116

"users-topic",

117

properties,

118

deserializer

119

);

120

```

121

122

#### Advanced Table Source with Time Attributes

123

124

```java { .api }

125

import org.apache.flink.table.descriptors.RowtimeAttributeDescriptor;

126

import org.apache.flink.table.descriptors.Rowtime;

127

import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;

128

129

// Schema with processing time and event time

130

TableSchema schema = TableSchema.builder()

131

.field("transaction_id", Types.STRING())

132

.field("amount", Types.DECIMAL())

133

.field("event_time", Types.SQL_TIMESTAMP())

134

.field("proc_time", Types.SQL_TIMESTAMP())

135

.build();

136

137

// Define rowtime attribute for event time processing

138

List<RowtimeAttributeDescriptor> rowtimeDescriptors = Arrays.asList(

139

new RowtimeAttributeDescriptor(

140

"event_time", // rowtime attribute name

141

new JsonRowtimeTimestampExtractor(), // timestamp extractor

142

new BoundedOutOfOrdernessWatermarkStrategy(5000) // watermark strategy

143

)

144

);

145

146

// Field mapping from Kafka message to table columns

147

Map<String, String> fieldMapping = new HashMap<>();

148

fieldMapping.put("transaction_id", "txn_id");

149

fieldMapping.put("amount", "txn_amount");

150

fieldMapping.put("event_time", "timestamp");

151

152

// Specific startup offsets

153

Map<KafkaTopicPartition, Long> startupOffsets = new HashMap<>();

154

startupOffsets.put(new KafkaTopicPartition("transactions", 0), 1000L);

155

startupOffsets.put(new KafkaTopicPartition("transactions", 1), 2000L);

156

157

// Create advanced table source

158

Kafka08TableSource tableSource = new Kafka08TableSource(

159

schema,

160

Optional.of("proc_time"), // processing time attribute

161

rowtimeDescriptors, // rowtime attributes

162

Optional.of(fieldMapping), // field mapping

163

"transactions", // topic

164

properties, // Kafka properties

165

deserializer, // deserialization schema

166

StartupMode.SPECIFIC_OFFSETS, // startup mode

167

startupOffsets // specific offsets

168

);

169

```

170

171

## Kafka08TableSink

172

173

**Package:** `org.apache.flink.streaming.connectors.kafka`

174

**Annotations:** `@Internal`

175

**Extends:** `KafkaTableSinkBase`

176

**Description:** Kafka 0.8 table sink for writing Table API results to Kafka topics.

177

178

### Class Declaration

179

180

```java { .api }

181

@Internal

182

public class Kafka08TableSink extends KafkaTableSinkBase

183

```

184

185

### Constructor

186

187

```java { .api }

188

/**

189

* Creates a Kafka08TableSink for writing table data to Kafka.

190

*

191

* @param schema The table schema defining the structure of rows to be written

192

* @param topic The target Kafka topic name

193

* @param properties Kafka producer properties

194

* @param partitioner Optional custom partitioner for determining target partition

195

* @param serializationSchema Schema to serialize Row objects to byte arrays

196

*/

197

public Kafka08TableSink(

198

TableSchema schema,

199

String topic,

200

Properties properties,

201

Optional<FlinkKafkaPartitioner<Row>> partitioner,

202

SerializationSchema<Row> serializationSchema)

203

```

204

205

### Protected Methods

206

207

```java { .api }

208

/**

209

* Creates the underlying FlinkKafkaProducer for this table sink.

210

*

211

* @param topic The Kafka topic name

212

* @param properties Kafka producer properties

213

* @param serializationSchema The serialization schema for Row objects

214

* @param partitioner Optional custom partitioner

215

* @return FlinkKafkaProducerBase instance configured for Kafka 0.8

216

*/

217

protected FlinkKafkaProducerBase<Row> createKafkaProducer(

218

String topic,

219

Properties properties,

220

SerializationSchema<Row> serializationSchema,

221

Optional<FlinkKafkaPartitioner<Row>> partitioner)

222

```

223

224

### Usage Examples

225

226

#### Basic Table Sink Setup

227

228

```java { .api }

229

import org.apache.flink.api.common.serialization.SerializationSchema;

230

import org.apache.flink.streaming.connectors.kafka.Kafka08TableSink;

231

232

// Define output table schema

233

TableSchema outputSchema = TableSchema.builder()

234

.field("result_id", Types.STRING())

235

.field("computed_value", Types.DOUBLE())

236

.field("processing_time", Types.SQL_TIMESTAMP())

237

.build();

238

239

// Kafka producer properties

240

Properties producerProps = new Properties();

241

producerProps.setProperty("metadata.broker.list", "localhost:9092");

242

243

// JSON serialization schema

244

SerializationSchema<Row> serializer = new JsonRowSerializationSchema.Builder(outputSchema)

245

.build();

246

247

// Create table sink

248

Kafka08TableSink tableSink = new Kafka08TableSink(

249

outputSchema,

250

"results-topic",

251

producerProps,

252

Optional.empty(), // No custom partitioner

253

serializer

254

);

255

```

256

257

#### Table Sink with Custom Partitioner

258

259

```java { .api }

260

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

261

262

// Custom partitioner based on result type

263

FlinkKafkaPartitioner<Row> partitioner = new FlinkKafkaPartitioner<Row>() {

264

@Override

265

public int partition(Row record, byte[] key, byte[] value,

266

String targetTopic, int[] partitions) {

267

// Partition based on the first field (result_id)

268

String resultId = (String) record.getField(0);

269

return Math.abs(resultId.hashCode() % partitions.length);

270

}

271

};

272

273

Kafka08TableSink tableSink = new Kafka08TableSink(

274

outputSchema,

275

"partitioned-results",

276

producerProps,

277

Optional.of(partitioner), // Custom partitioner

278

serializer

279

);

280

```

281

282

## Kafka08TableSourceSinkFactory

283

284

**Package:** `org.apache.flink.streaming.connectors.kafka`

285

**Extends:** `KafkaTableSourceSinkFactoryBase`

286

**Description:** Factory for creating configured instances of Kafka08TableSource and Kafka08TableSink from descriptors.

287

288

### Class Declaration

289

290

```java { .api }

291

public class Kafka08TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase

292

```

293

294

### Protected Methods

295

296

```java { .api }

297

/**

298

* Returns the Kafka version string for this factory.

299

*

300

* @return "0.8" indicating Kafka version 0.8 support

301

*/

302

protected String kafkaVersion()

303

304

/**

305

* Indicates whether this Kafka version supports timestamps.

306

* Kafka 0.8 does not support message timestamps.

307

*

308

* @return false, as Kafka 0.8 doesn't support timestamps

309

*/

310

protected boolean supportsKafkaTimestamps()

311

312

/**

313

* Creates a KafkaTableSource instance with the provided configuration.

314

*

315

* @param schema The table schema

316

* @param topic The Kafka topic name

317

* @param properties Kafka consumer properties

318

* @param deserializationSchema Row deserialization schema

319

* @param startupMode Consumer startup mode

320

* @param specificStartupOffsets Specific startup offsets (if applicable)

321

* @param proctimeAttribute Processing time attribute name (optional)

322

* @param rowtimeAttributeDescriptors Rowtime attribute descriptors

323

* @param fieldMapping Field mapping configuration (optional)

324

* @return Configured Kafka08TableSource instance

325

*/

326

protected KafkaTableSourceBase createKafkaTableSource(

327

TableSchema schema,

328

String topic,

329

Properties properties,

330

DeserializationSchema<Row> deserializationSchema,

331

StartupMode startupMode,

332

Map<KafkaTopicPartition, Long> specificStartupOffsets,

333

Optional<String> proctimeAttribute,

334

List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,

335

Optional<Map<String, String>> fieldMapping)

336

337

/**

338

* Creates a KafkaTableSink instance with the provided configuration.

339

*

340

* @param schema The table schema

341

* @param topic The Kafka topic name

342

* @param properties Kafka producer properties

343

* @param partitioner Optional partitioner

344

* @param serializationSchema Row serialization schema

345

* @return Configured Kafka08TableSink instance

346

*/

347

protected KafkaTableSinkBase createKafkaTableSink(

348

TableSchema schema,

349

String topic,

350

Properties properties,

351

Optional<FlinkKafkaPartitioner<Row>> partitioner,

352

SerializationSchema<Row> serializationSchema)

353

```

354

355

### Factory Usage Examples

356

357

#### Programmatic Factory Usage

358

359

```java { .api }

360

import org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory;

361

362

Kafka08TableSourceSinkFactory factory = new Kafka08TableSourceSinkFactory();

363

364

// Create table source via factory

365

KafkaTableSourceBase source = factory.createKafkaTableSource(

366

schema, // Table schema

367

"input-topic", // Topic name

368

consumerProperties, // Kafka properties

369

deserializationSchema, // Deserialization schema

370

StartupMode.EARLIEST, // Startup mode

371

Collections.emptyMap(), // No specific offsets

372

Optional.of("proc_time"), // Processing time attribute

373

Collections.emptyList(), // No rowtime attributes

374

Optional.empty() // No field mapping

375

);

376

377

// Create table sink via factory

378

KafkaTableSinkBase sink = factory.createKafkaTableSink(

379

outputSchema, // Output table schema

380

"output-topic", // Topic name

381

producerProperties, // Kafka properties

382

Optional.empty(), // No custom partitioner

383

serializationSchema // Serialization schema

384

);

385

```

386

387

## Table API Integration Examples

388

389

### Complete Table API Workflow

390

391

```java { .api }

392

import org.apache.flink.table.api.EnvironmentSettings;

393

import org.apache.flink.table.api.TableEnvironment;

394

import org.apache.flink.table.api.Table;

395

396

// Setup Table API environment

397

EnvironmentSettings settings = EnvironmentSettings.newInstance()

398

.inStreamingMode()

399

.build();

400

TableEnvironment tEnv = TableEnvironment.create(settings);

401

402

// Register Kafka source table

403

TableSchema sourceSchema = TableSchema.builder()

404

.field("user_id", Types.LONG())

405

.field("product_id", Types.STRING())

406

.field("quantity", Types.INT())

407

.field("price", Types.DECIMAL())

408

.field("order_time", Types.SQL_TIMESTAMP())

409

.field("proc_time", Types.SQL_TIMESTAMP())

410

.build();

411

412

Properties sourceProps = new Properties();

413

sourceProps.setProperty("zookeeper.connect", "localhost:2181");

414

sourceProps.setProperty("group.id", "analytics-group");

415

416

Kafka08TableSource source = new Kafka08TableSource(

417

sourceSchema,

418

"orders",

419

sourceProps,

420

new JsonRowDeserializationSchema.Builder(sourceSchema).build()

421

);

422

423

tEnv.registerTableSource("Orders", source);

424

425

// Register Kafka sink table

426

TableSchema sinkSchema = TableSchema.builder()

427

.field("product_id", Types.STRING())

428

.field("total_quantity", Types.LONG())

429

.field("total_revenue", Types.DECIMAL())

430

.field("window_start", Types.SQL_TIMESTAMP())

431

.field("window_end", Types.SQL_TIMESTAMP())

432

.build();

433

434

Properties sinkProps = new Properties();

435

sinkProps.setProperty("metadata.broker.list", "localhost:9092");

436

437

Kafka08TableSink sink = new Kafka08TableSink(

438

sinkSchema,

439

"product-analytics",

440

sinkProps,

441

Optional.empty(),

442

new JsonRowSerializationSchema.Builder(sinkSchema).build()

443

);

444

445

tEnv.registerTableSink("ProductAnalytics", sink);

446

447

// Execute SQL query

448

String sql = """

449

INSERT INTO ProductAnalytics

450

SELECT

451

product_id,

452

SUM(quantity) as total_quantity,

453

SUM(quantity * price) as total_revenue,

454

TUMBLE_START(proc_time, INTERVAL '1' HOUR) as window_start,

455

TUMBLE_END(proc_time, INTERVAL '1' HOUR) as window_end

456

FROM Orders

457

GROUP BY

458

product_id,

459

TUMBLE(proc_time, INTERVAL '1' HOUR)

460

""";

461

462

tEnv.executeSql(sql);

463

```

464

465

### Dynamic Table Registration

466

467

```java { .api }

468

import org.apache.flink.table.descriptors.*;

469

470

// Register source using descriptors (alternative approach)

471

tEnv.connect(

472

new Kafka()

473

.version("0.8")

474

.topic("user-events")

475

.property("zookeeper.connect", "localhost:2181")

476

.property("group.id", "event-processors")

477

)

478

.withFormat(

479

new Json()

480

.failOnMissingField(false)

481

.deriveSchema()

482

)

483

.withSchema(

484

new Schema()

485

.field("event_id", Types.STRING())

486

.field("user_id", Types.LONG())

487

.field("event_type", Types.STRING())

488

.field("event_time", Types.SQL_TIMESTAMP())

489

.field("proc_time", Types.SQL_TIMESTAMP()).proctime()

490

)

491

.createTemporaryTable("UserEvents");

492

493

// Register sink using descriptors

494

tEnv.connect(

495

new Kafka()

496

.version("0.8")

497

.topic("processed-events")

498

.property("metadata.broker.list", "localhost:9092")

499

)

500

.withFormat(

501

new Json()

502

)

503

.withSchema(

504

new Schema()

505

.field("user_id", Types.LONG())

506

.field("event_count", Types.LONG())

507

.field("processing_time", Types.SQL_TIMESTAMP())

508

)

509

.createTemporaryTable("ProcessedEvents");

510

```

511

512

## Serialization Schemas for Table API

513

514

### JSON Row Serialization

515

516

```java { .api }

517

import org.apache.flink.formats.json.JsonRowDeserializationSchema;

518

import org.apache.flink.formats.json.JsonRowSerializationSchema;

519

520

// JSON deserialization for table source

521

JsonRowDeserializationSchema.Builder deserBuilder =

522

new JsonRowDeserializationSchema.Builder(sourceSchema);

523

524

DeserializationSchema<Row> deserializer = deserBuilder

525

.ignoreParseErrors() // Skip malformed JSON

526

.build();

527

528

// JSON serialization for table sink

529

JsonRowSerializationSchema.Builder serBuilder =

530

new JsonRowSerializationSchema.Builder(sinkSchema);

531

532

SerializationSchema<Row> serializer = serBuilder.build();

533

```

534

535

### Custom Row Serialization

536

537

```java { .api }

538

// Custom deserialization schema

539

public class CsvRowDeserializationSchema implements DeserializationSchema<Row> {

540

private final TableSchema schema;

541

private final String delimiter;

542

543

public CsvRowDeserializationSchema(TableSchema schema, String delimiter) {

544

this.schema = schema;

545

this.delimiter = delimiter;

546

}

547

548

@Override

549

public Row deserialize(byte[] message) throws IOException {

550

String line = new String(message);

551

String[] fields = line.split(delimiter);

552

553

Row row = new Row(schema.getFieldCount());

554

for (int i = 0; i < fields.length && i < schema.getFieldCount(); i++) {

555

row.setField(i, convertField(fields[i], schema.getFieldTypes()[i]));

556

}

557

return row;

558

}

559

560

@Override

561

public boolean isEndOfStream(Row nextElement) {

562

return false;

563

}

564

565

@Override

566

public TypeInformation<Row> getProducedType() {

567

return schema.toRowType();

568

}

569

570

private Object convertField(String field, TypeInformation<?> type) {

571

// Type conversion logic based on schema

572

if (type == Types.STRING()) return field;

573

if (type == Types.LONG()) return Long.parseLong(field);

574

if (type == Types.INT()) return Integer.parseInt(field);

575

// ... other type conversions

576

return field;

577

}

578

}

579

```

580

581

## Configuration and Limitations

582

583

### Kafka 0.8 Table API Limitations

584

585

1. **No Timestamp Support**: Kafka 0.8 doesn't support message timestamps, affecting rowtime attributes

586

2. **Internal API**: All classes are marked `@Internal` and may change between versions

587

3. **Limited Watermark Strategies**: Restricted by Kafka 0.8's metadata capabilities

588

4. **ZooKeeper Dependency**: Requires ZooKeeper configuration for consumer operations

589

590

### Recommended Configuration

591

592

```java { .api }

593

// Source configuration for reliability

594

Properties sourceProps = new Properties();

595

sourceProps.setProperty("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181");

596

sourceProps.setProperty("group.id", "table-api-consumer");

597

sourceProps.setProperty("auto.offset.reset", "earliest");

598

sourceProps.setProperty("auto.commit.enable", "false"); // Managed by Flink

599

600

// Sink configuration for reliability

601

Properties sinkProps = new Properties();

602

sinkProps.setProperty("metadata.broker.list", "broker1:9092,broker2:9092");

603

sinkProps.setProperty("request.required.acks", "1");

604

sinkProps.setProperty("message.send.max.retries", "3");

605

606

// Enable checkpointing for exactly-once processing (source side only)

607

env.enableCheckpointing(60000);

608

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

609

```

610

611

### Migration Considerations

612

613

For production Table API usage, consider upgrading to Kafka 0.9+ connectors that provide:

614

615

1. **Stable Public API**: Non-internal classes with stability guarantees

616

2. **Message Timestamp Support**: Better rowtime attribute support

617

3. **Improved Reliability**: Exactly-once semantics for both source and sink

618

4. **Enhanced Performance**: Better resource utilization and throughput