or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-consumption.mddata-production.mdindex.mdtable-api-integration.md

table-api-integration.mddocs/

0

# Table API Integration

1

2

Table API and SQL integration for declarative stream processing with Kafka 0.9 sources and sinks through factory-based configuration and schema-aware data processing.

3

4

## Capabilities

5

6

### Kafka09TableSourceSinkFactory

7

8

Factory class for creating Kafka 0.9 table sources and sinks in the Table API ecosystem.

9

10

```java { .api }

11

/**

12

* Factory for creating configured instances of Kafka 0.9 table sources and sinks.

13

* Extends the base Kafka table factory with version-specific implementations.

14

*/

15

public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {

16

17

/**

18

* Returns the Kafka version identifier for this factory.

19

*

20

* @return "0.9" as the version string

21

*/

22

@Override

23

protected String kafkaVersion();

24

25

/**

26

* Indicates whether this version supports Kafka timestamps.

27

*

28

* @return false, as Kafka 0.9 does not support message timestamps

29

*/

30

@Override

31

protected boolean supportsKafkaTimestamps();

32

33

/**

34

* Creates a Kafka 0.9 table source with the provided configuration.

35

*

36

* @param schema Schema of the produced table

37

* @param proctimeAttribute Field name of the processing time attribute

38

* @param rowtimeAttributeDescriptors Descriptor for rowtime attributes

39

* @param fieldMapping Mapping for table schema fields to physical returned type fields

40

* @param topic Kafka topic to consume

41

* @param properties Properties for the Kafka consumer

42

* @param deserializationSchema Deserialization schema for decoding records from Kafka

43

* @param startupMode Startup mode for the contained consumer

44

* @param specificStartupOffsets Specific startup offsets (when using SPECIFIC_OFFSETS mode)

45

* @return Configured Kafka09TableSource instance

46

*/

47

@Override

48

protected KafkaTableSourceBase createKafkaTableSource(

49

TableSchema schema,

50

Optional<String> proctimeAttribute,

51

List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,

52

Map<String, String> fieldMapping,

53

String topic,

54

Properties properties,

55

DeserializationSchema<Row> deserializationSchema,

56

StartupMode startupMode,

57

Map<KafkaTopicPartition, Long> specificStartupOffsets);

58

59

/**

60

* Creates a Kafka 0.9 table sink with the provided configuration.

61

*

62

* @param schema Schema of the table to be written

63

* @param topic Target Kafka topic

64

* @param properties Properties for the Kafka producer

65

* @param partitioner Optional custom partitioner for message distribution

66

* @param serializationSchema Serialization schema for encoding records to Kafka

67

* @return Configured Kafka09TableSink instance

68

*/

69

@Override

70

protected KafkaTableSinkBase createKafkaTableSink(

71

TableSchema schema,

72

String topic,

73

Properties properties,

74

Optional<FlinkKafkaPartitioner<Row>> partitioner,

75

SerializationSchema<Row> serializationSchema);

76

}

77

```

78

79

### Kafka09TableSource

80

81

Table source implementation for consuming Kafka 0.9 data in Table API queries.

82

83

```java { .api }

84

/**

85

* Kafka table source for Kafka 0.9 - internal implementation.

86

* Provides streaming table source capabilities for Table API and SQL.

87

*/

88

@Internal

89

public class Kafka09TableSource extends KafkaTableSourceBase {

90

91

/**

92

* Creates a Kafka 0.9 table source with full configuration options.

93

*

94

* @param schema Schema of the produced table

95

* @param proctimeAttribute Field name of the processing time attribute

96

* @param rowtimeAttributeDescriptors Descriptor for rowtime attributes

97

* @param fieldMapping Optional mapping for table schema fields to physical type fields

98

* @param topic Kafka topic to consume

99

* @param properties Properties for the Kafka consumer

100

* @param deserializationSchema Deserialization schema for decoding records

101

* @param startupMode Startup mode for the consumer

102

* @param specificStartupOffsets Specific startup offsets for SPECIFIC_OFFSETS mode

103

*/

104

public Kafka09TableSource(

105

TableSchema schema,

106

Optional<String> proctimeAttribute,

107

List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,

108

Optional<Map<String, String>> fieldMapping,

109

String topic,

110

Properties properties,

111

DeserializationSchema<Row> deserializationSchema,

112

StartupMode startupMode,

113

Map<KafkaTopicPartition, Long> specificStartupOffsets);

114

115

/**

116

* Creates a simple Kafka 0.9 table source with basic configuration.

117

*

118

* @param schema Schema of the produced table

119

* @param topic Kafka topic to consume

120

* @param properties Properties for the Kafka consumer

121

* @param deserializationSchema Deserialization schema for decoding records

122

*/

123

public Kafka09TableSource(

124

TableSchema schema,

125

String topic,

126

Properties properties,

127

DeserializationSchema<Row> deserializationSchema);

128

}

129

```

130

131

### Kafka09TableSink

132

133

Table sink implementation for writing to Kafka 0.9 topics from Table API queries.

134

135

```java { .api }

136

/**

137

* Kafka table sink for Kafka 0.9 - internal implementation.

138

* Provides streaming table sink capabilities for Table API and SQL.

139

*/

140

@Internal

141

public class Kafka09TableSink extends KafkaTableSinkBase {

142

143

/**

144

* Creates a Kafka 0.9 table sink with the provided configuration.

145

*

146

* @param schema Schema of the table to be written

147

* @param topic Target Kafka topic

148

* @param properties Properties for the Kafka producer

149

* @param partitioner Optional custom partitioner for message distribution

150

* @param serializationSchema Serialization schema for encoding records

151

*/

152

public Kafka09TableSink(

153

TableSchema schema,

154

String topic,

155

Properties properties,

156

Optional<FlinkKafkaPartitioner<Row>> partitioner,

157

SerializationSchema<Row> serializationSchema);

158

}

159

```

160

161

## Usage Examples

162

163

### Table API with Kafka Source

164

165

```java

166

import org.apache.flink.table.api.EnvironmentSettings;

167

import org.apache.flink.table.api.TableEnvironment;

168

import org.apache.flink.table.api.Table;

169

170

// Create table environment

171

EnvironmentSettings settings = EnvironmentSettings.newInstance()

172

.useBlinkPlanner()

173

.inStreamingMode()

174

.build();

175

TableEnvironment tableEnv = TableEnvironment.create(settings);

176

177

// Create Kafka source table using DDL

178

tableEnv.executeSql(

179

"CREATE TABLE kafka_source (" +

180

" user_id STRING," +

181

" event_time TIMESTAMP(3)," +

182

" event_type STRING," +

183

" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +

184

") WITH (" +

185

" 'connector' = 'kafka-0.9'," +

186

" 'topic' = 'user-events'," +

187

" 'properties.bootstrap.servers' = 'localhost:9092'," +

188

" 'properties.group.id' = 'table-consumer-group'," +

189

" 'format' = 'json'" +

190

")"

191

);

192

193

// Query the Kafka source

194

Table result = tableEnv.sqlQuery(

195

"SELECT user_id, COUNT(*) as event_count " +

196

"FROM kafka_source " +

197

"WHERE event_type = 'login' " +

198

"GROUP BY user_id"

199

);

200

```

201

202

### Table API with Kafka Sink

203

204

```java

205

// Create Kafka sink table

206

tableEnv.executeSql(

207

"CREATE TABLE kafka_sink (" +

208

" user_id STRING," +

209

" event_count BIGINT" +

210

") WITH (" +

211

" 'connector' = 'kafka-0.9'," +

212

" 'topic' = 'processed-events'," +

213

" 'properties.bootstrap.servers' = 'localhost:9092'," +

214

" 'format' = 'json'" +

215

")"

216

);

217

218

// Insert query results into Kafka

219

result.executeInsert("kafka_sink");

220

```

221

222

### Programmatic Table Source Creation

223

224

```java

225

import org.apache.flink.api.common.serialization.DeserializationSchema;

226

import org.apache.flink.formats.json.JsonRowDeserializationSchema;

227

import org.apache.flink.table.api.DataTypes;

228

import org.apache.flink.table.api.TableSchema;

229

import org.apache.flink.streaming.connectors.kafka.Kafka09TableSource;

230

import java.util.Properties;

231

232

// Define table schema

233

TableSchema schema = TableSchema.builder()

234

.field("user_id", DataTypes.STRING())

235

.field("timestamp", DataTypes.TIMESTAMP(3))

236

.field("action", DataTypes.STRING())

237

.field("value", DataTypes.DOUBLE())

238

.build();

239

240

// Configure Kafka properties

241

Properties properties = new Properties();

242

properties.setProperty("bootstrap.servers", "localhost:9092");

243

properties.setProperty("group.id", "programmatic-consumer");

244

245

// Create JSON deserializer

246

DeserializationSchema<Row> deserializer = new JsonRowDeserializationSchema.Builder(

247

schema.toRowType()

248

).build();

249

250

// Create Kafka table source

251

Kafka09TableSource kafkaSource = new Kafka09TableSource(

252

schema,

253

"user-actions",

254

properties,

255

deserializer

256

);

257

258

// Register as table

259

tableEnv.registerTableSource("user_actions", kafkaSource);

260

261

// Use in SQL

262

Table queryResult = tableEnv.sqlQuery(

263

"SELECT user_id, SUM(value) as total_value " +

264

"FROM user_actions " +

265

"WHERE action = 'purchase' " +

266

"GROUP BY user_id"

267

);

268

```

269

270

### Complex Table Processing Pipeline

271

272

```java

273

// Multiple Kafka sources and sinks

274

tableEnv.executeSql(

275

"CREATE TABLE orders (" +

276

" order_id STRING," +

277

" customer_id STRING," +

278

" product_id STRING," +

279

" quantity INT," +

280

" price DECIMAL(10,2)," +

281

" order_time TIMESTAMP(3)," +

282

" WATERMARK FOR order_time AS order_time - INTERVAL '1' MINUTE" +

283

") WITH (" +

284

" 'connector' = 'kafka-0.9'," +

285

" 'topic' = 'orders'," +

286

" 'properties.bootstrap.servers' = 'localhost:9092'," +

287

" 'properties.group.id' = 'order-processor'," +

288

" 'format' = 'json'" +

289

")"

290

);

291

292

tableEnv.executeSql(

293

"CREATE TABLE order_summary (" +

294

" window_start TIMESTAMP(3)," +

295

" window_end TIMESTAMP(3)," +

296

" total_orders BIGINT," +

297

" total_revenue DECIMAL(12,2)," +

298

" avg_order_value DECIMAL(10,2)" +

299

") WITH (" +

300

" 'connector' = 'kafka-0.9'," +

301

" 'topic' = 'order-summary'," +

302

" 'properties.bootstrap.servers' = 'localhost:9092'," +

303

" 'format' = 'json'" +

304

")"

305

);

306

307

// Windowed aggregation query

308

tableEnv.executeSql(

309

"INSERT INTO order_summary " +

310

"SELECT " +

311

" TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start," +

312

" TUMBLE_END(order_time, INTERVAL '1' HOUR) as window_end," +

313

" COUNT(*) as total_orders," +

314

" SUM(quantity * price) as total_revenue," +

315

" AVG(quantity * price) as avg_order_value " +

316

"FROM orders " +

317

"GROUP BY TUMBLE(order_time, INTERVAL '1' HOUR)"

318

);

319

```

320

321

### Custom Format Integration

322

323

```java

324

import org.apache.flink.api.common.serialization.DeserializationSchema;

325

import org.apache.flink.api.common.typeinfo.TypeInformation;

326

import org.apache.flink.types.Row;

327

328

// Custom CSV-like format deserializer

329

DeserializationSchema<Row> csvDeserializer = new DeserializationSchema<Row>() {

330

@Override

331

public Row deserialize(byte[] message) throws IOException {

332

String line = new String(message);

333

String[] fields = line.split(",");

334

335

Row row = new Row(3);

336

row.setField(0, fields[0]); // id

337

row.setField(1, fields[1]); // name

338

row.setField(2, Double.parseDouble(fields[2])); // value

339

return row;

340

}

341

342

@Override

343

public boolean isEndOfStream(Row nextElement) {

344

return false;

345

}

346

347

@Override

348

public TypeInformation<Row> getProducedType() {

349

return Types.ROW(Types.STRING, Types.STRING, Types.DOUBLE);

350

}

351

};

352

353

TableSchema csvSchema = TableSchema.builder()

354

.field("id", DataTypes.STRING())

355

.field("name", DataTypes.STRING())

356

.field("value", DataTypes.DOUBLE())

357

.build();

358

359

Kafka09TableSource csvSource = new Kafka09TableSource(

360

csvSchema,

361

"csv-data",

362

properties,

363

csvDeserializer

364

);

365

```

366

367

## Configuration Options

368

369

### Connector Properties

370

371

```sql

372

-- Basic Kafka 0.9 connector configuration

373

'connector' = 'kafka-0.9'

374

'topic' = 'my-topic' -- Required: Kafka topic name

375

'properties.bootstrap.servers' = 'localhost:9092' -- Required: Kafka broker addresses

376

'properties.group.id' = 'my-consumer-group' -- Required for sources: Consumer group ID

377

378

-- Consumer-specific properties (sources)

379

'properties.auto.offset.reset' = 'earliest' -- latest, earliest, none

380

'properties.enable.auto.commit' = 'false' -- Flink manages commits

381

'properties.fetch.min.bytes' = '1024' -- Minimum fetch size

382

'properties.max.partition.fetch.bytes' = '1048576' -- Maximum per-partition fetch

383

384

-- Producer-specific properties (sinks)

385

'properties.acks' = '1' -- 0, 1, all

386

'properties.retries' = '3' -- Retry count

387

'properties.batch.size' = '16384' -- Batch size in bytes

388

'properties.linger.ms' = '5' -- Batch linger time

389

'properties.compression.type' = 'snappy' -- none, gzip, snappy, lz4

390

391

-- Format configuration

392

'format' = 'json' -- json, csv, avro, etc.

393

'json.fail-on-missing-field' = 'false' -- Handle missing JSON fields

394

'json.ignore-parse-errors' = 'true' -- Skip malformed records

395

```

396

397

### Startup Modes

398

399

```sql

400

-- Start from earliest available offset

401

'scan.startup.mode' = 'earliest-offset'

402

403

-- Start from latest available offset

404

'scan.startup.mode' = 'latest-offset'

405

406

-- Start from consumer group's committed offset

407

'scan.startup.mode' = 'group-offsets'

408

409

-- Start from specific offsets (Kafka 0.10+ feature, limited in 0.9)

410

'scan.startup.mode' = 'specific-offsets'

411

'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'

412

```

413

414

## Limitations and Considerations

415

416

### Kafka 0.9 Specific Limitations

417

418

- **No timestamp support**: Cannot use message timestamps for watermark generation

419

- **Limited offset management**: Fewer startup mode options compared to newer versions

420

- **No exactly-once semantics**: Producer doesn't support transactions or idempotence

421

- **Basic consumer features**: Missing some advanced consumer configuration options

422

423

### Table API Integration Notes

424

425

- Sources and sinks are created through the factory pattern

426

- Schema must be explicitly defined in DDL or programmatically

427

- Time attributes require careful configuration for event time processing

428

- Custom formats require implementing appropriate serialization/deserialization schemas

429

430

### Performance Considerations

431

432

- Use appropriate batch sizes and linger times for producers

433

- Configure consumer fetch sizes based on message volume

434

- Consider partition count vs parallelism for optimal throughput

435

- Monitor consumer lag and producer throughput metrics