or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdserialization.mdstreaming-consumer.mdstreaming-producer.mdtable-api.md

table-api.mddocs/

0

# Table/SQL API Integration

1

2

The Flink Kafka 0.11 connector provides comprehensive Table API and SQL integration through factory classes that enable declarative table definitions and seamless integration with Flink's SQL engine.

3

4

## Capabilities

5

6

### Dynamic Table Factory

7

8

Modern factory implementation for SQL DDL support and dynamic table creation.

9

10

```java { .api }

11

/**

12

* Factory for creating dynamic table sources and sinks for Kafka 0.11.x

13

* Supports the new Dynamic Table API introduced in Flink 1.11+

14

*/

15

class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {

16

/**

17

* Factory identifier used in SQL DDL CREATE TABLE statements

18

* @return "kafka-0.11" identifier for connector specification

19

*/

20

String factoryIdentifier();

21

}

22

```

23

24

**Usage Examples:**

25

26

```sql

27

-- SQL DDL using the kafka-0.11 connector identifier

28

CREATE TABLE user_events (

29

user_id BIGINT,

30

event_type STRING,

31

event_data STRING,

32

event_timestamp TIMESTAMP(3),

33

WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '5' SECOND

34

) WITH (

35

'connector' = 'kafka-0.11',

36

'topic' = 'user-events',

37

'properties.bootstrap.servers' = 'localhost:9092',

38

'properties.group.id' = 'flink-consumer-group',

39

'scan.startup.mode' = 'earliest-offset',

40

'format' = 'json'

41

);

42

43

-- Create sink table

44

CREATE TABLE processed_events (

45

user_id BIGINT,

46

event_count BIGINT,

47

processing_time TIMESTAMP(3)

48

) WITH (

49

'connector' = 'kafka-0.11',

50

'topic' = 'processed-events',

51

'properties.bootstrap.servers' = 'localhost:9092',

52

'properties.transaction.timeout.ms' = '900000',

53

'sink.semantic' = 'exactly-once',

54

'format' = 'json'

55

);

56

```

57

58

### Dynamic Table Source

59

60

Kafka table source implementation for the modern Dynamic Table API.

61

62

```java { .api }

63

/**

64

* Dynamic table source for Kafka 0.11.x supporting advanced features

65

* like watermark generation and projection pushdown

66

*/

67

@Internal

68

class Kafka011DynamicSource extends KafkaDynamicSourceBase {

69

/**

70

* Constructor for dynamic table source

71

* @param outputDataType the output data type of the source

72

* @param topic the Kafka topic name

73

* @param properties Kafka consumer properties

74

* @param decodingFormat format for deserializing records

75

* @param startupMode how to start consuming (earliest, latest, etc.)

76

* @param specificStartupOffsets specific partition offsets for startup

77

* @param startupTimestampMillis timestamp for timestamp-based startup

78

*/

79

Kafka011DynamicSource(

80

DataType outputDataType,

81

String topic,

82

Properties properties,

83

DecodingFormat<DeserializationSchema<RowData>> decodingFormat,

84

StartupMode startupMode,

85

Map<KafkaTopicPartition, Long> specificStartupOffsets,

86

long startupTimestampMillis

87

);

88

89

/**

90

* Create a copy of this source for runtime instantiation

91

* @return copied source instance

92

*/

93

DynamicTableSource copy();

94

95

/**

96

* Summary string for debugging and logging

97

* @return "Kafka-0.11" description

98

*/

99

String asSummaryString();

100

}

101

```

102

103

### Dynamic Table Sink

104

105

Kafka table sink implementation for the modern Dynamic Table API.

106

107

```java { .api }

108

/**

109

* Dynamic table sink for Kafka 0.11.x supporting transactional writes

110

* and exactly-once semantics for table operations

111

*/

112

@Internal

113

class Kafka011DynamicSink extends KafkaDynamicSinkBase {

114

/**

115

* Constructor for dynamic table sink

116

* @param consumedDataType the data type consumed by the sink

117

* @param topic the target Kafka topic name

118

* @param properties Kafka producer properties

119

* @param partitioner optional custom partitioner for records

120

* @param encodingFormat format for serializing records

121

*/

122

Kafka011DynamicSink(

123

DataType consumedDataType,

124

String topic,

125

Properties properties,

126

Optional<FlinkKafkaPartitioner<RowData>> partitioner,

127

EncodingFormat<SerializationSchema<RowData>> encodingFormat

128

);

129

130

/**

131

* Create a copy of this sink for runtime instantiation

132

* @return copied sink instance

133

*/

134

DynamicTableSink copy();

135

136

/**

137

* Summary string for debugging and logging

138

* @return "Kafka 0.11 table sink" description

139

*/

140

String asSummaryString();

141

}

142

```

143

144

### Legacy Table API Support

145

146

Legacy factory and table implementations for backward compatibility with older Flink versions.

147

148

```java { .api }

149

/**

150

* Legacy table source for Kafka 0.11.x (pre-1.11 Table API)

151

* Maintained for backward compatibility

152

*/

153

@Internal

154

class Kafka011TableSource extends KafkaTableSourceBase {

155

// Full constructor with all table configuration options

156

Kafka011TableSource(

157

TableSchema schema,

158

Optional<String> proctimeAttribute,

159

List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,

160

Optional<Map<String, String>> fieldMapping,

161

String topic,

162

Properties properties,

163

DeserializationSchema<Row> deserializationSchema,

164

StartupMode startupMode,

165

Map<KafkaTopicPartition, Long> specificStartupOffsets,

166

long startupTimestampMillis

167

);

168

169

// Simplified constructor for basic use cases

170

Kafka011TableSource(

171

TableSchema schema,

172

String topic,

173

Properties properties,

174

DeserializationSchema<Row> deserializationSchema

175

);

176

}

177

178

/**

179

* Legacy table sink for Kafka 0.11.x (pre-1.11 Table API)

180

* Maintained for backward compatibility

181

*/

182

@Internal

183

class Kafka011TableSink extends KafkaTableSinkBase {

184

/**

185

* Constructor for legacy table sink

186

* @param schema table schema definition

187

* @param topic target Kafka topic name

188

* @param properties Kafka producer properties

189

* @param partitioner optional custom partitioner

190

* @param serializationSchema schema for serializing rows

191

*/

192

Kafka011TableSink(

193

TableSchema schema,

194

String topic,

195

Properties properties,

196

Optional<FlinkKafkaPartitioner<Row>> partitioner,

197

SerializationSchema<Row> serializationSchema

198

);

199

}

200

201

/**

202

* Legacy factory for creating table sources and sinks

203

* Maintained for backward compatibility with older Flink versions

204

*/

205

class Kafka011TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {

206

// Factory methods inherited from base class

207

// Used by Table API environment for table registration

208

}

209

```

210

211

**Usage Examples:**

212

213

```java

214

// Programmatic table registration (legacy approach)

215

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());

216

217

Properties properties = new Properties();

218

properties.setProperty("bootstrap.servers", "localhost:9092");

219

properties.setProperty("group.id", "table-consumer");

220

221

// Register source table

222

Kafka011TableSource source = new Kafka011TableSource(

223

TableSchema.builder()

224

.field("user_id", DataTypes.BIGINT())

225

.field("event_data", DataTypes.STRING())

226

.field("event_time", DataTypes.TIMESTAMP(3))

227

.build(),

228

"input-topic",

229

properties,

230

new JsonDeserializationSchema()

231

);

232

233

tableEnv.registerTableSource("kafka_source", source);

234

235

// Register sink table

236

Kafka011TableSink sink = new Kafka011TableSink(

237

TableSchema.builder()

238

.field("result", DataTypes.STRING())

239

.field("count", DataTypes.BIGINT())

240

.build(),

241

"output-topic",

242

properties,

243

Optional.empty(),

244

new JsonSerializationSchema()

245

);

246

247

tableEnv.registerTableSink("kafka_sink", sink);

248

```

249

250

## Table Configuration Options

251

252

SQL DDL configuration options for Kafka connector tables.

253

254

### Core Connection Options

255

256

```java { .api }

257

// Required configuration options

258

'connector' = 'kafka-0.11' // Connector identifier

259

'topic' = 'topic-name' // Kafka topic name

260

'properties.bootstrap.servers' = 'host:port' // Kafka broker addresses

261

```

262

263

### Consumer Configuration Options

264

265

```java { .api }

266

// Consumer-specific options for source tables

267

'properties.group.id' = 'consumer-group' // Consumer group ID

268

'scan.startup.mode' = 'mode' // Startup mode: earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp

269

'scan.startup.specific-offsets' = 'offsets' // Partition-specific offsets (partition:offset,partition:offset,...)

270

'scan.startup.timestamp-millis' = 'timestamp' // Timestamp for timestamp mode

271

```

272

273

### Producer Configuration Options

274

275

```java { .api }

276

// Producer-specific options for sink tables

277

'sink.partitioner' = 'partitioner' // Partitioner: fixed, round-robin, or custom class name

278

'sink.semantic' = 'semantic' // Delivery semantic: exactly-once, at-least-once, none

279

'properties.transaction.timeout.ms' = 'timeout' // Transaction timeout for exactly-once

280

```

281

282

**Complete SQL DDL Examples:**

283

284

```sql

285

-- Source table with watermarks and specific startup configuration

286

CREATE TABLE orders_source (

287

order_id BIGINT,

288

customer_id BIGINT,

289

product_id BIGINT,

290

quantity INT,

291

price DECIMAL(10,2),

292

order_timestamp TIMESTAMP(3),

293

WATERMARK FOR order_timestamp AS order_timestamp - INTERVAL '10' SECOND

294

) WITH (

295

'connector' = 'kafka-0.11',

296

'topic' = 'orders',

297

'properties.bootstrap.servers' = 'kafka-cluster:9092',

298

'properties.group.id' = 'orders-processor',

299

'properties.auto.offset.reset' = 'earliest',

300

'scan.startup.mode' = 'timestamp',

301

'scan.startup.timestamp-millis' = '1609459200000',

302

'format' = 'avro-confluent',

303

'avro-confluent.url' = 'http://schema-registry:8081'

304

);

305

306

-- Sink table with exactly-once semantics

307

CREATE TABLE order_aggregates_sink (

308

customer_id BIGINT,

309

total_orders BIGINT,

310

total_amount DECIMAL(15,2),

311

window_start TIMESTAMP(3),

312

window_end TIMESTAMP(3)

313

) WITH (

314

'connector' = 'kafka-0.11',

315

'topic' = 'order-aggregates',

316

'properties.bootstrap.servers' = 'kafka-cluster:9092',

317

'properties.transaction.timeout.ms' = '900000',

318

'sink.semantic' = 'exactly-once',

319

'sink.partitioner' = 'fixed',

320

'format' = 'json'

321

);

322

323

-- Use tables in SQL query

324

INSERT INTO order_aggregates_sink

325

SELECT

326

customer_id,

327

COUNT(*) as total_orders,

328

SUM(price * quantity) as total_amount,

329

TUMBLE_START(order_timestamp, INTERVAL '1' HOUR) as window_start,

330

TUMBLE_END(order_timestamp, INTERVAL '1' HOUR) as window_end

331

FROM orders_source

332

GROUP BY customer_id, TUMBLE(order_timestamp, INTERVAL '1' HOUR);

333

```

334

335

## Service Loader Integration

336

337

The connector registers its factories through Java's Service Loader mechanism for automatic discovery.

338

339

```java { .api }

340

// Service registration files:

341

// META-INF/services/org.apache.flink.table.factories.Factory

342

// -> org.apache.flink.streaming.connectors.kafka.table.Kafka011DynamicTableFactory

343

//

344

// META-INF/services/org.apache.flink.table.factories.TableFactory

345

// -> org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory

346

```

347

348

This enables automatic discovery and instantiation of the Kafka connector when using SQL DDL or programmatic table registration.