or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer-base.mdindex.mdpartitioners.mdproducer-base.mdserialization.mdtable-api.md

table-api.mddocs/

0

# Table API Integration

1

2

Table sources and sinks for SQL layer integration supporting various data formats (JSON, Avro) with automatic schema inference, connector descriptors, and streaming table operations. This enables declarative SQL queries over Kafka streams.

3

4

## Capabilities

5

6

### KafkaTableSource

7

8

Abstract base class for Kafka table sources providing streaming table functionality with support for projection pushdown, filter pushdown, and watermark extraction.

9

10

```java { .api }

11

public abstract class KafkaTableSource implements StreamTableSource<Row>,

12

DefinedProctimeAttribute, DefinedRowtimeAttributes {

13

14

// Abstract methods implemented by concrete versions

15

protected abstract FlinkKafkaConsumerBase<Row> createKafkaConsumer(

16

String topic,

17

Properties properties,

18

DeserializationSchema<Row> deserializationSchema

19

);

20

}

21

```

22

23

**Key Interfaces:**

24

- `StreamTableSource<Row>` - Provides streaming data source for table API

25

- `DefinedProctimeAttribute` - Supports processing time attribute definition

26

- `DefinedRowtimeAttributes` - Supports event time attribute definition

27

28

### KafkaTableSink

29

30

Abstract base class for Kafka table sinks providing streaming table output functionality with partitioning support.

31

32

```java { .api }

33

public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {

34

35

// Abstract methods implemented by concrete versions

36

protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(

37

String topic,

38

Properties properties,

39

SerializationSchema<Row> serializationSchema,

40

FlinkKafkaPartitioner<Row> partitioner

41

);

42

}

43

```

44

45

**Key Interface:**

46

- `AppendStreamTableSink<Row>` - Supports append-only table sink operations

47

48

### JSON Table Sources

49

50

#### KafkaJsonTableSource

51

52

Table source for JSON-formatted Kafka messages with field mapping support and schema inference.

53

54

```java { .api }

55

public abstract class KafkaJsonTableSource extends KafkaTableSource implements DefinedFieldMapping {

56

57

public KafkaJsonTableSource(

58

TableSchema schema,

59

Optional<String> proctimeAttribute,

60

List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,

61

Optional<Map<String, String>> fieldMapping,

62

String topic,

63

Properties properties,

64

DeserializationSchema<Row> deserializationSchema,

65

boolean failOnMissingField,

66

boolean ignoreParseErrors

67

);

68

}

69

```

70

71

**Parameters:**

72

- `schema` - Table schema defining column names and types

73

- `proctimeAttribute` - Optional processing time attribute name

74

- `rowtimeAttributeDescriptors` - Event time attribute descriptors

75

- `fieldMapping` - Optional mapping from table fields to JSON fields

76

- `topic` - Kafka topic to consume from

77

- `properties` - Kafka consumer properties

78

- `deserializationSchema` - Row deserialization schema

79

- `failOnMissingField` - Whether to fail on missing JSON fields

80

- `ignoreParseErrors` - Whether to skip records with parse errors

81

82

**Usage Example:**

83

84

```java

85

// Define table schema

86

TableSchema schema = TableSchema.builder()

87

.field("user_id", DataTypes.STRING())

88

.field("action", DataTypes.STRING())

89

.field("timestamp", DataTypes.TIMESTAMP(3))

90

.field("proctime", DataTypes.TIMESTAMP(3))

91

.build();

92

93

// Create JSON table source

94

KafkaJsonTableSource source = new MyKafkaJsonTableSource(

95

schema,

96

Optional.of("proctime"),

97

Collections.emptyList(),

98

Optional.empty(),

99

"user-events",

100

kafkaProperties,

101

new JsonRowDeserializationSchema(schema.toRowType()),

102

false, // Don't fail on missing fields

103

true // Skip parse errors

104

);

105

```

106

107

#### KafkaJsonTableSourceFactory

108

109

Factory for creating KafkaJsonTableSource instances from table descriptors using SQL DDL.

110

111

```java { .api }

112

public abstract class KafkaJsonTableSourceFactory implements TableSourceFactory<Row> {

113

114

public Map<String, String> requiredContext();

115

public List<String> supportedProperties();

116

public TableSource<Row> createTableSource(Map<String, String> properties);

117

}

118

```

119

120

**SQL DDL Example:**

121

122

```sql

123

CREATE TABLE user_events (

124

user_id STRING,

125

action STRING,

126

event_time TIMESTAMP(3),

127

proctime AS PROCTIME(),

128

WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

129

) WITH (

130

'connector' = 'kafka',

131

'topic' = 'user-events',

132

'properties.bootstrap.servers' = 'localhost:9092',

133

'properties.group.id' = 'test-group',

134

'format' = 'json',

135

'scan.startup.mode' = 'earliest-offset'

136

);

137

```

138

139

### JSON Table Sinks

140

141

#### KafkaJsonTableSink

142

143

Table sink for writing JSON-formatted messages to Kafka with optional partitioning.

144

145

```java { .api }

146

public abstract class KafkaJsonTableSink extends KafkaTableSink {

147

148

public KafkaJsonTableSink(

149

TableSchema schema,

150

String topic,

151

Properties properties,

152

Optional<FlinkKafkaPartitioner<Row>> partitioner,

153

SerializationSchema<Row> serializationSchema

154

);

155

}

156

```

157

158

**Usage Example:**

159

160

```java

161

// Create JSON table sink

162

KafkaJsonTableSink sink = new MyKafkaJsonTableSink(

163

schema,

164

"output-events",

165

kafkaProperties,

166

Optional.of(new FlinkFixedPartitioner<>()),

167

new JsonRowSerializationSchema(schema.toRowType())

168

);

169

```

170

171

### Avro Table Sources

172

173

#### KafkaAvroTableSource

174

175

Table source for Avro-formatted Kafka messages with field mapping support and schema registry integration.

176

177

```java { .api }

178

public abstract class KafkaAvroTableSource extends KafkaTableSource implements DefinedFieldMapping {

179

180

public KafkaAvroTableSource(

181

TableSchema schema,

182

Optional<String> proctimeAttribute,

183

List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,

184

Optional<Map<String, String>> fieldMapping,

185

String topic,

186

Properties properties,

187

DeserializationSchema<Row> deserializationSchema

188

);

189

}

190

```

191

192

**Usage Example:**

193

194

```java

195

// Properties for Avro with Schema Registry

196

Properties props = new Properties();

197

props.setProperty("bootstrap.servers", "localhost:9092");

198

props.setProperty("schema.registry.url", "http://localhost:8081");

199

200

// Create Avro table source

201

KafkaAvroTableSource source = new MyKafkaAvroTableSource(

202

schema,

203

Optional.empty(),

204

Collections.emptyList(),

205

Optional.empty(),

206

"avro-events",

207

props,

208

new AvroRowDeserializationSchema(avroSchema)

209

);

210

```

211

212

## Table Descriptors

213

214

### Kafka Connector Descriptor

215

216

Programmatic configuration for Kafka table sources and sinks.

217

218

```java { .api }

219

public class Kafka extends ConnectorDescriptor {

220

public Kafka();

221

public Kafka version(String version);

222

public Kafka topic(String topic);

223

public Kafka properties(Properties properties);

224

public Kafka property(String key, String value);

225

public Kafka startFromEarliest();

226

public Kafka startFromLatest();

227

public Kafka startFromGroupOffsets();

228

public Kafka startFromSpecificOffsets(Map<Integer, Long> specificOffsets);

229

}

230

```

231

232

**Usage Example:**

233

234

```java

235

// Create table environment

236

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

237

238

// Define source using descriptor API

239

tEnv.connect(

240

new Kafka()

241

.version("universal")

242

.topic("input-topic")

243

.property("bootstrap.servers", "localhost:9092")

244

.property("group.id", "test-group")

245

.startFromEarliest()

246

)

247

.withFormat(new Json().failOnMissingField(false))

248

.withSchema(new Schema()

249

.field("user_id", DataTypes.STRING())

250

.field("action", DataTypes.STRING())

251

.field("timestamp", DataTypes.TIMESTAMP(3))

252

.field("proctime", DataTypes.TIMESTAMP(3).proctime())

253

.field("rowtime", DataTypes.TIMESTAMP(3).rowtime())

254

)

255

.createTemporaryTable("user_events");

256

```

257

258

### KafkaValidator

259

260

Validator for Kafka table descriptors ensuring proper configuration.

261

262

```java { .api }

263

public class KafkaValidator extends ConnectorDescriptorValidator {

264

public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";

265

public static final String CONNECTOR_VERSION = "connector.version";

266

public static final String CONNECTOR_TOPIC = "connector.topic";

267

public static final String CONNECTOR_PROPERTIES = "connector.properties";

268

public static final String CONNECTOR_STARTUP_MODE = "connector.startup-mode";

269

public static final String CONNECTOR_SPECIFIC_OFFSETS = "connector.specific-offsets";

270

}

271

```

272

273

## Time Attributes and Watermarks

274

275

### Processing Time

276

277

Define processing time attribute for time-based operations:

278

279

```java

280

// In table source constructor

281

Optional<String> proctimeAttribute = Optional.of("proctime");

282

283

// In SQL DDL

284

proctime AS PROCTIME()

285

```

286

287

### Event Time and Watermarks

288

289

Define event time attributes with watermark strategies:

290

291

```java

292

// Rowtime descriptor with watermark strategy

293

List<RowtimeAttributeDescriptor> rowtimeAttributes = Arrays.asList(

294

new RowtimeAttributeDescriptor(

295

"rowtime",

296

new ExistingField("timestamp"),

297

new BoundedOutOfOrderTimestamps(5000) // 5 second out-of-orderness

298

)

299

);

300

301

// In SQL DDL

302

WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

303

```

304

305

## Advanced Usage Examples

306

307

### Complex Query with Windowing

308

309

```sql

310

-- Create source table

311

CREATE TABLE user_events (

312

user_id STRING,

313

action STRING,

314

amount DECIMAL(10,2),

315

event_time TIMESTAMP(3),

316

proctime AS PROCTIME(),

317

WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND

318

) WITH (

319

'connector' = 'kafka',

320

'topic' = 'user-events',

321

'properties.bootstrap.servers' = 'localhost:9092',

322

'format' = 'json'

323

);

324

325

-- Create sink table

326

CREATE TABLE hourly_stats (

327

window_start TIMESTAMP(3),

328

window_end TIMESTAMP(3),

329

user_count BIGINT,

330

total_amount DECIMAL(10,2)

331

) WITH (

332

'connector' = 'kafka',

333

'topic' = 'hourly-stats',

334

'properties.bootstrap.servers' = 'localhost:9092',

335

'format' = 'json'

336

);

337

338

-- Windowed aggregation query

339

INSERT INTO hourly_stats

340

SELECT

341

TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,

342

TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end,

343

COUNT(DISTINCT user_id) as user_count,

344

SUM(amount) as total_amount

345

FROM user_events

346

GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);

347

```

348

349

### Join with Lookup Table

350

351

```sql

352

-- Kafka stream

353

CREATE TABLE orders (

354

order_id STRING,

355

user_id STRING,

356

product_id STRING,

357

quantity INT,

358

order_time TIMESTAMP(3),

359

WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND

360

) WITH (

361

'connector' = 'kafka',

362

'topic' = 'orders',

363

'properties.bootstrap.servers' = 'localhost:9092',

364

'format' = 'json'

365

);

366

367

-- Enriched output

368

CREATE TABLE enriched_orders (

369

order_id STRING,

370

user_id STRING,

371

product_name STRING,

372

quantity INT,

373

total_price DECIMAL(10,2),

374

order_time TIMESTAMP(3)

375

) WITH (

376

'connector' = 'kafka',

377

'topic' = 'enriched-orders',

378

'properties.bootstrap.servers' = 'localhost:9092',

379

'format' = 'json'

380

);

381

382

-- Join with product catalog (assuming JDBC lookup table)

383

INSERT INTO enriched_orders

384

SELECT

385

o.order_id,

386

o.user_id,

387

p.product_name,

388

o.quantity,

389

o.quantity * p.price as total_price,

390

o.order_time

391

FROM orders o

392

JOIN product_catalog FOR SYSTEM_TIME AS OF o.order_time AS p

393

ON o.product_id = p.product_id;

394

```

395

396

## Configuration Best Practices

397

398

### Consumer Configuration

399

400

```java

401

Properties consumerProps = new Properties();

402

consumerProps.setProperty("bootstrap.servers", "localhost:9092");

403

consumerProps.setProperty("group.id", "flink-table-consumer");

404

consumerProps.setProperty("auto.offset.reset", "earliest");

405

consumerProps.setProperty("enable.auto.commit", "false"); // Managed by Flink

406

consumerProps.setProperty("max.poll.records", "500");

407

```

408

409

### Producer Configuration

410

411

```java

412

Properties producerProps = new Properties();

413

producerProps.setProperty("bootstrap.servers", "localhost:9092");

414

producerProps.setProperty("transaction.timeout.ms", "900000");

415

producerProps.setProperty("enable.idempotence", "true");

416

producerProps.setProperty("acks", "all");

417

```

418

419

### Schema Registry Integration

420

421

```java

422

Properties avroProps = new Properties();

423

avroProps.setProperty("bootstrap.servers", "localhost:9092");

424

avroProps.setProperty("schema.registry.url", "http://localhost:8081");

425

avroProps.setProperty("specific.avro.reader", "true");

426

```