or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer.mdindex.mdproducer.mdtable-api.md

table-api.mddocs/

0

# Table API Integration

1

2

The Flink Kafka 0.10 connector provides comprehensive integration with Flink's Table API and SQL, enabling declarative stream processing with Kafka sources and sinks. It supports both legacy table factories and the new dynamic table API.

3

4

## Capabilities

5

6

### Dynamic Table Factory

7

8

Modern table factory implementation for creating Kafka sources and sinks through SQL DDL and Table API.

9

10

```java { .api }

11

/**

12

* Dynamic table factory for Kafka 0.10.x integration

13

*/

14

public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {

15

/** Connector identifier used in SQL DDL */

16

public static final String IDENTIFIER = "kafka-0.10";

17

}

18

19

/**

20

* Dynamic table source implementation for Kafka 0.10.x

21

*/

22

public class Kafka010DynamicSource extends KafkaDynamicSourceBase {

23

// Inherits all functionality from base dynamic source

24

}

25

26

/**

27

* Dynamic table sink implementation for Kafka 0.10.x

28

*/

29

public class Kafka010DynamicSink extends KafkaDynamicSinkBase {

30

// Inherits all functionality from base dynamic sink

31

}

32

```

33

34

**Usage Examples:**

35

36

```sql

37

-- Create Kafka source table

38

CREATE TABLE kafka_source (

39

user_id BIGINT,

40

event_name STRING,

41

event_time TIMESTAMP(3),

42

WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

43

) WITH (

44

'connector' = 'kafka-0.10',

45

'topic' = 'user-events',

46

'properties.bootstrap.servers' = 'localhost:9092',

47

'properties.group.id' = 'my-consumer-group',

48

'format' = 'json',

49

'scan.startup.mode' = 'earliest-offset'

50

);

51

52

-- Create Kafka sink table

53

CREATE TABLE kafka_sink (

54

user_id BIGINT,

55

aggregated_count BIGINT,

56

window_start TIMESTAMP(3),

57

window_end TIMESTAMP(3)

58

) WITH (

59

'connector' = 'kafka-0.10',

60

'topic' = 'user-aggregates',

61

'properties.bootstrap.servers' = 'localhost:9092',

62

'format' = 'json',

63

'sink.partitioner' = 'round-robin'

64

);

65

66

-- Query using the tables

67

INSERT INTO kafka_sink

68

SELECT

69

user_id,

70

COUNT(*) as aggregated_count,

71

TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,

72

TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end

73

FROM kafka_source

74

GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE);

75

```

76

77

```java

78

// Table API usage

79

import org.apache.flink.table.api.EnvironmentSettings;

80

import org.apache.flink.table.api.TableEnvironment;

81

82

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());

83

84

// Create source table descriptor

85

TableDescriptor sourceDescriptor = TableDescriptor.forConnector("kafka-0.10")

86

.schema(Schema.newBuilder()

87

.column("user_id", DataTypes.BIGINT())

88

.column("event_name", DataTypes.STRING())

89

.column("event_time", DataTypes.TIMESTAMP(3))

90

.watermark("event_time", "event_time - INTERVAL '5' SECOND")

91

.build())

92

.option("topic", "user-events")

93

.option("properties.bootstrap.servers", "localhost:9092")

94

.option("properties.group.id", "my-consumer-group")

95

.option("format", "json")

96

.option("scan.startup.mode", "earliest-offset")

97

.build();

98

99

tableEnv.createTemporaryTable("kafka_source", sourceDescriptor);

100

```

101

102

### Legacy Table Factory

103

104

Legacy table factory implementation for backward compatibility with older Table API versions.

105

106

```java { .api }

107

/**

108

* Legacy table factory for creating Kafka 0.10.x sources and sinks

109

*/

110

public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {

111

/**

112

* Returns the Kafka version supported by this factory

113

* @return Kafka version string "0.10"

114

*/

115

protected String kafkaVersion();

116

117

/**

118

* Indicates whether this connector supports Kafka timestamps

119

* @return true, as Kafka 0.10.x supports timestamps

120

*/

121

protected boolean supportsKafkaTimestamps();

122

}

123

124

/**

125

* Legacy table source implementation for consuming from Kafka 0.10.x in Table API

126

*/

127

public class Kafka010TableSource extends KafkaTableSourceBase {

128

// Internal implementation - extends base Kafka table source

129

}

130

131

/**

132

* Legacy table sink implementation for writing to Kafka 0.10.x in Table API

133

*/

134

public class Kafka010TableSink extends KafkaTableSinkBase {

135

// Internal implementation - extends base Kafka table sink

136

}

137

```

138

139

**Usage Examples:**

140

141

```java

142

import org.apache.flink.table.api.TableEnvironment;

143

import org.apache.flink.table.descriptors.Kafka;

144

import org.apache.flink.table.descriptors.Json;

145

import org.apache.flink.table.descriptors.Schema;

146

147

// Legacy descriptor-based approach

148

TableEnvironment tableEnv = TableEnvironment.create(...);

149

150

tableEnv.connect(

151

new Kafka()

152

.version("0.10")

153

.topic("user-events")

154

.property("bootstrap.servers", "localhost:9092")

155

.property("group.id", "my-consumer-group")

156

.startFromEarliest()

157

)

158

.withFormat(new Json())

159

.withSchema(new Schema()

160

.field("user_id", DataTypes.BIGINT())

161

.field("event_name", DataTypes.STRING())

162

.field("event_time", DataTypes.TIMESTAMP(3))

163

)

164

.createTemporaryTable("legacy_kafka_source");

165

```

166

167

## Configuration Options

168

169

### Source Configuration

170

171

**Required Options:**

172

- `connector`: Must be "kafka-0.10"

173

- `topic`: Kafka topic name (or list of topics separated by semicolon)

174

- `properties.bootstrap.servers`: Kafka broker addresses

175

176

**Startup Mode Options:**

177

- `scan.startup.mode`: How to start consuming ("earliest-offset", "latest-offset", "group-offsets", "timestamp", "specific-offsets")

178

- `scan.startup.timestamp-millis`: Start timestamp when mode is "timestamp"

179

- `scan.startup.specific-offsets`: Specific partition offsets when mode is "specific-offsets"

180

181

**Consumer Properties:**

182

- `properties.group.id`: Consumer group ID

183

- `properties.auto.offset.reset`: Offset reset behavior ("earliest", "latest")

184

- `properties.flink.poll-timeout`: Polling timeout in milliseconds

185

186

**Pattern Subscription:**

187

- `topic-pattern`: Regular expression pattern for topic subscription instead of specific topics

188

189

### Sink Configuration

190

191

**Required Options:**

192

- `connector`: Must be "kafka-0.10"

193

- `topic`: Target Kafka topic name

194

- `properties.bootstrap.servers`: Kafka broker addresses

195

196

**Partitioning Options:**

197

- `sink.partitioner`: Partitioning strategy ("default", "round-robin", "custom")

198

- `sink.partitioner-class`: Custom partitioner class when using "custom"

199

200

**Producer Properties:**

201

- `properties.acks`: Acknowledgment mode ("all", "1", "0")

202

- `properties.retries`: Number of retries for failed sends

203

- `properties.enable.idempotence`: Enable idempotent producer for exactly-once

204

205

**Timestamp Options:**

206

- `sink.timestamp-field`: Field to use as Kafka record timestamp

207

- `sink.timestamp-format`: Timestamp format specification

208

209

### Format Integration

210

211

The connector works with various format specifications:

212

213

```sql

214

-- JSON format

215

CREATE TABLE kafka_json_source (...) WITH (

216

'connector' = 'kafka-0.10',

217

'format' = 'json',

218

'json.ignore-parse-errors' = 'true',

219

'json.timestamp-format.standard' = 'ISO-8601'

220

);

221

222

-- Avro format

223

CREATE TABLE kafka_avro_source (...) WITH (

224

'connector' = 'kafka-0.10',

225

'format' = 'avro',

226

'avro.schema-registry.url' = 'http://localhost:8081'

227

);

228

229

-- CSV format

230

CREATE TABLE kafka_csv_source (...) WITH (

231

'connector' = 'kafka-0.10',

232

'format' = 'csv',

233

'csv.field-delimiter' = ',',

234

'csv.ignore-parse-errors' = 'true'

235

);

236

```

237

238

## Watermark Strategies

239

240

Configure watermark generation for event time processing:

241

242

```sql

243

-- Bounded out-of-orderness watermarks

244

CREATE TABLE kafka_source (

245

user_id BIGINT,

246

event_name STRING,

247

event_time TIMESTAMP(3),

248

WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND

249

) WITH (

250

'connector' = 'kafka-0.10',

251

'topic' = 'events'

252

);

253

254

-- Ascending timestamps watermarks

255

CREATE TABLE kafka_ordered_source (

256

user_id BIGINT,

257

event_time TIMESTAMP(3),

258

WATERMARK FOR event_time AS event_time

259

) WITH (

260

'connector' = 'kafka-0.10',

261

'topic' = 'ordered-events'

262

);

263

```

264

265

## Multiple Topics and Pattern Subscription

266

267

### Multiple Topics

268

269

```sql

270

-- Multiple specific topics

271

CREATE TABLE multi_topic_source (...) WITH (

272

'connector' = 'kafka-0.10',

273

'topic' = 'topic1;topic2;topic3',

274

'properties.bootstrap.servers' = 'localhost:9092'

275

);

276

```

277

278

### Pattern-Based Subscription

279

280

```sql

281

-- Pattern-based topic subscription

282

CREATE TABLE pattern_source (...) WITH (

283

'connector' = 'kafka-0.10',

284

'topic-pattern' = 'logs-.*',

285

'properties.bootstrap.servers' = 'localhost:9092',

286

'properties.flink.partition-discovery.interval-millis' = '30000'

287

);

288

```

289

290

## Exactly-Once Semantics

291

292

Configure exactly-once processing for both sources and sinks:

293

294

### Source Configuration

295

296

```sql

297

CREATE TABLE exactly_once_source (...) WITH (

298

'connector' = 'kafka-0.10',

299

'properties.isolation.level' = 'read_committed',

300

'properties.enable.auto.commit' = 'false'

301

);

302

```

303

304

### Sink Configuration

305

306

```sql

307

CREATE TABLE exactly_once_sink (...) WITH (

308

'connector' = 'kafka-0.10',

309

'properties.acks' = 'all',

310

'properties.retries' = '3',

311

'properties.enable.idempotence' = 'true',

312

'properties.max.in.flight.requests.per.connection' = '1'

313

);

314

```

315

316

## Service Provider Registration

317

318

The connector automatically registers itself with Flink's service provider mechanism:

319

320

**META-INF/services/org.apache.flink.table.factories.Factory:**

321

- `org.apache.flink.streaming.connectors.kafka.table.Kafka010DynamicTableFactory`

322

323

**META-INF/services/org.apache.flink.table.factories.TableFactory:**

324

- `org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory`

325

326

## Migration from Legacy to Dynamic API

327

328

When migrating from legacy descriptor-based API to modern SQL DDL:

329

330

**Legacy (Deprecated):**

331

```java

332

tableEnv.connect(new Kafka().version("0.10").topic("my-topic"))

333

.withFormat(new Json())

334

.withSchema(new Schema().field("id", DataTypes.BIGINT()))

335

.createTemporaryTable("my_table");

336

```

337

338

**Modern (Recommended):**

339

```sql

340

CREATE TABLE my_table (

341

id BIGINT

342

) WITH (

343

'connector' = 'kafka-0.10',

344

'topic' = 'my-topic',

345

'format' = 'json'

346

);

347

```

348

349

## Error Handling

350

351

Table API integration provides several error handling strategies:

352

353

- **Parse Errors**: Configure format options to ignore or fail on parse errors

354

- **Serialization Errors**: Log-only mode for non-critical failures

355

- **Connection Errors**: Automatic retry mechanisms with exponential backoff

356

- **Schema Evolution**: Support for schema registry integration with Avro/JSON formats