0
# Table API Integration
1
2
Comprehensive API reference for Table API integration classes in the Apache Flink Kafka Connector 0.8. These classes enable declarative SQL and Table API access to Kafka topics.
3
4
**Important Note**: All Table API classes are marked as `@Internal`, indicating they are not part of the stable public API and may change between versions.
5
6
## Kafka08TableSource
7
8
**Package:** `org.apache.flink.streaming.connectors.kafka`
9
**Annotations:** `@Internal`
10
**Extends:** `KafkaTableSourceBase`
11
**Description:** Kafka StreamTableSource for Kafka 0.8, providing Table API access to Kafka topics as streaming tables.
12
13
### Class Declaration
14
15
```java { .api }
16
@Internal
17
public class Kafka08TableSource extends KafkaTableSourceBase
18
```
19
20
### Constructors
21
22
#### Full Constructor
23
24
```java { .api }
25
/**
26
* Creates a Kafka08TableSource with full configuration options.
27
*
28
* @param schema The table schema defining column names and types
29
* @param proctimeAttribute Optional processing time attribute name
30
* @param rowtimeAttributeDescriptors List of rowtime attribute descriptors for event time
31
* @param fieldMapping Optional mapping from table fields to Kafka message fields
32
* @param topic The Kafka topic name to read from
33
* @param properties Kafka consumer properties
34
* @param deserializationSchema Schema to deserialize Kafka messages to Row objects
35
* @param startupMode How the consumer should start reading (EARLIEST, LATEST, GROUP_OFFSETS, SPECIFIC_OFFSETS)
36
* @param specificStartupOffsets Map of partition to offset for SPECIFIC_OFFSETS startup mode
37
*/
38
public Kafka08TableSource(
39
TableSchema schema,
40
Optional<String> proctimeAttribute,
41
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
42
Optional<Map<String, String>> fieldMapping,
43
String topic,
44
Properties properties,
45
DeserializationSchema<Row> deserializationSchema,
46
StartupMode startupMode,
47
Map<KafkaTopicPartition, Long> specificStartupOffsets)
48
```
49
50
#### Basic Constructor
51
52
```java { .api }
53
/**
54
* Creates a basic Kafka08TableSource with minimal configuration.
55
*
56
* @param schema The table schema defining column names and types
57
* @param topic The Kafka topic name to read from
58
* @param properties Kafka consumer properties
59
* @param deserializationSchema Schema to deserialize Kafka messages to Row objects
60
*/
61
public Kafka08TableSource(
62
TableSchema schema,
63
String topic,
64
Properties properties,
65
DeserializationSchema<Row> deserializationSchema)
66
```
67
68
### Protected Methods
69
70
```java { .api }
71
/**
72
* Creates the underlying FlinkKafkaConsumer for this table source.
73
*
74
* @param topic The Kafka topic name
75
* @param properties Kafka consumer properties
76
* @param deserializationSchema The deserialization schema for Row objects
77
* @return FlinkKafkaConsumerBase instance configured for Kafka 0.8
78
*/
79
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(
80
String topic,
81
Properties properties,
82
DeserializationSchema<Row> deserializationSchema)
83
```
84
85
### Usage Examples
86
87
#### Basic Table Source Setup
88
89
```java { .api }
90
import org.apache.flink.table.api.TableSchema;
91
import org.apache.flink.table.api.Types;
92
import org.apache.flink.api.common.serialization.DeserializationSchema;
93
import org.apache.flink.types.Row;
94
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;
95
96
// Define table schema
97
TableSchema schema = TableSchema.builder()
98
.field("user_id", Types.LONG())
99
.field("user_name", Types.STRING())
100
.field("user_email", Types.STRING())
101
.field("registration_time", Types.SQL_TIMESTAMP())
102
.build();
103
104
// Kafka properties
105
Properties properties = new Properties();
106
properties.setProperty("zookeeper.connect", "localhost:2181");
107
properties.setProperty("group.id", "table-api-group");
108
109
// Custom deserialization schema for JSON messages
110
DeserializationSchema<Row> deserializer = new JsonRowDeserializationSchema.Builder(schema)
111
.build();
112
113
// Create table source
114
Kafka08TableSource tableSource = new Kafka08TableSource(
115
schema,
116
"users-topic",
117
properties,
118
deserializer
119
);
120
```
121
122
#### Advanced Table Source with Time Attributes
123
124
```java { .api }
125
import org.apache.flink.table.descriptors.RowtimeAttributeDescriptor;
126
import org.apache.flink.table.descriptors.Rowtime;
127
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
128
129
// Schema with processing time and event time
130
TableSchema schema = TableSchema.builder()
131
.field("transaction_id", Types.STRING())
132
.field("amount", Types.DECIMAL())
133
.field("event_time", Types.SQL_TIMESTAMP())
134
.field("proc_time", Types.SQL_TIMESTAMP())
135
.build();
136
137
// Define rowtime attribute for event time processing
138
List<RowtimeAttributeDescriptor> rowtimeDescriptors = Arrays.asList(
139
new RowtimeAttributeDescriptor(
140
"event_time", // rowtime attribute name
141
new JsonRowtimeTimestampExtractor(), // timestamp extractor
142
new BoundedOutOfOrdernessWatermarkStrategy(5000) // watermark strategy
143
)
144
);
145
146
// Field mapping from Kafka message to table columns
147
Map<String, String> fieldMapping = new HashMap<>();
148
fieldMapping.put("transaction_id", "txn_id");
149
fieldMapping.put("amount", "txn_amount");
150
fieldMapping.put("event_time", "timestamp");
151
152
// Specific startup offsets
153
Map<KafkaTopicPartition, Long> startupOffsets = new HashMap<>();
154
startupOffsets.put(new KafkaTopicPartition("transactions", 0), 1000L);
155
startupOffsets.put(new KafkaTopicPartition("transactions", 1), 2000L);
156
157
// Create advanced table source
158
Kafka08TableSource tableSource = new Kafka08TableSource(
159
schema,
160
Optional.of("proc_time"), // processing time attribute
161
rowtimeDescriptors, // rowtime attributes
162
Optional.of(fieldMapping), // field mapping
163
"transactions", // topic
164
properties, // Kafka properties
165
deserializer, // deserialization schema
166
StartupMode.SPECIFIC_OFFSETS, // startup mode
167
startupOffsets // specific offsets
168
);
169
```
170
171
## Kafka08TableSink
172
173
**Package:** `org.apache.flink.streaming.connectors.kafka`
174
**Annotations:** `@Internal`
175
**Extends:** `KafkaTableSinkBase`
176
**Description:** Kafka 0.8 table sink for writing Table API results to Kafka topics.
177
178
### Class Declaration
179
180
```java { .api }
181
@Internal
182
public class Kafka08TableSink extends KafkaTableSinkBase
183
```
184
185
### Constructor
186
187
```java { .api }
188
/**
189
* Creates a Kafka08TableSink for writing table data to Kafka.
190
*
191
* @param schema The table schema defining the structure of rows to be written
192
* @param topic The target Kafka topic name
193
* @param properties Kafka producer properties
194
* @param partitioner Optional custom partitioner for determining target partition
195
* @param serializationSchema Schema to serialize Row objects to byte arrays
196
*/
197
public Kafka08TableSink(
198
TableSchema schema,
199
String topic,
200
Properties properties,
201
Optional<FlinkKafkaPartitioner<Row>> partitioner,
202
SerializationSchema<Row> serializationSchema)
203
```
204
205
### Protected Methods
206
207
```java { .api }
208
/**
209
* Creates the underlying FlinkKafkaProducer for this table sink.
210
*
211
* @param topic The Kafka topic name
212
* @param properties Kafka producer properties
213
* @param serializationSchema The serialization schema for Row objects
214
* @param partitioner Optional custom partitioner
215
* @return FlinkKafkaProducerBase instance configured for Kafka 0.8
216
*/
217
protected FlinkKafkaProducerBase<Row> createKafkaProducer(
218
String topic,
219
Properties properties,
220
SerializationSchema<Row> serializationSchema,
221
Optional<FlinkKafkaPartitioner<Row>> partitioner)
222
```
223
224
### Usage Examples
225
226
#### Basic Table Sink Setup
227
228
```java { .api }
229
import org.apache.flink.api.common.serialization.SerializationSchema;
230
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSink;
231
232
// Define output table schema
233
TableSchema outputSchema = TableSchema.builder()
234
.field("result_id", Types.STRING())
235
.field("computed_value", Types.DOUBLE())
236
.field("processing_time", Types.SQL_TIMESTAMP())
237
.build();
238
239
// Kafka producer properties
240
Properties producerProps = new Properties();
241
producerProps.setProperty("metadata.broker.list", "localhost:9092");
242
243
// JSON serialization schema
244
SerializationSchema<Row> serializer = new JsonRowSerializationSchema.Builder(outputSchema)
245
.build();
246
247
// Create table sink
248
Kafka08TableSink tableSink = new Kafka08TableSink(
249
outputSchema,
250
"results-topic",
251
producerProps,
252
Optional.empty(), // No custom partitioner
253
serializer
254
);
255
```
256
257
#### Table Sink with Custom Partitioner
258
259
```java { .api }
260
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
261
262
// Custom partitioner based on result type
263
FlinkKafkaPartitioner<Row> partitioner = new FlinkKafkaPartitioner<Row>() {
264
@Override
265
public int partition(Row record, byte[] key, byte[] value,
266
String targetTopic, int[] partitions) {
267
// Partition based on the first field (result_id)
268
String resultId = (String) record.getField(0);
269
return Math.abs(resultId.hashCode() % partitions.length);
270
}
271
};
272
273
Kafka08TableSink tableSink = new Kafka08TableSink(
274
outputSchema,
275
"partitioned-results",
276
producerProps,
277
Optional.of(partitioner), // Custom partitioner
278
serializer
279
);
280
```
281
282
## Kafka08TableSourceSinkFactory
283
284
**Package:** `org.apache.flink.streaming.connectors.kafka`
285
**Extends:** `KafkaTableSourceSinkFactoryBase`
286
**Description:** Factory for creating configured instances of Kafka08TableSource and Kafka08TableSink from descriptors.
287
288
### Class Declaration
289
290
```java { .api }
291
public class Kafka08TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase
292
```
293
294
### Protected Methods
295
296
```java { .api }
297
/**
298
* Returns the Kafka version string for this factory.
299
*
300
* @return "0.8" indicating Kafka version 0.8 support
301
*/
302
protected String kafkaVersion()
303
304
/**
305
* Indicates whether this Kafka version supports timestamps.
306
* Kafka 0.8 does not support message timestamps.
307
*
308
* @return false, as Kafka 0.8 doesn't support timestamps
309
*/
310
protected boolean supportsKafkaTimestamps()
311
312
/**
313
* Creates a KafkaTableSource instance with the provided configuration.
314
*
315
* @param schema The table schema
316
* @param topic The Kafka topic name
317
* @param properties Kafka consumer properties
318
* @param deserializationSchema Row deserialization schema
319
* @param startupMode Consumer startup mode
320
* @param specificStartupOffsets Specific startup offsets (if applicable)
321
* @param proctimeAttribute Processing time attribute name (optional)
322
* @param rowtimeAttributeDescriptors Rowtime attribute descriptors
323
* @param fieldMapping Field mapping configuration (optional)
324
* @return Configured Kafka08TableSource instance
325
*/
326
protected KafkaTableSourceBase createKafkaTableSource(
327
TableSchema schema,
328
String topic,
329
Properties properties,
330
DeserializationSchema<Row> deserializationSchema,
331
StartupMode startupMode,
332
Map<KafkaTopicPartition, Long> specificStartupOffsets,
333
Optional<String> proctimeAttribute,
334
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
335
Optional<Map<String, String>> fieldMapping)
336
337
/**
338
* Creates a KafkaTableSink instance with the provided configuration.
339
*
340
* @param schema The table schema
341
* @param topic The Kafka topic name
342
* @param properties Kafka producer properties
343
* @param partitioner Optional partitioner
344
* @param serializationSchema Row serialization schema
345
* @return Configured Kafka08TableSink instance
346
*/
347
protected KafkaTableSinkBase createKafkaTableSink(
348
TableSchema schema,
349
String topic,
350
Properties properties,
351
Optional<FlinkKafkaPartitioner<Row>> partitioner,
352
SerializationSchema<Row> serializationSchema)
353
```
354
355
### Factory Usage Examples
356
357
#### Programmatic Factory Usage
358
359
```java { .api }
360
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory;
361
362
Kafka08TableSourceSinkFactory factory = new Kafka08TableSourceSinkFactory();
363
364
// Create table source via factory
365
KafkaTableSourceBase source = factory.createKafkaTableSource(
366
schema, // Table schema
367
"input-topic", // Topic name
368
consumerProperties, // Kafka properties
369
deserializationSchema, // Deserialization schema
370
StartupMode.EARLIEST, // Startup mode
371
Collections.emptyMap(), // No specific offsets
372
Optional.of("proc_time"), // Processing time attribute
373
Collections.emptyList(), // No rowtime attributes
374
Optional.empty() // No field mapping
375
);
376
377
// Create table sink via factory
378
KafkaTableSinkBase sink = factory.createKafkaTableSink(
379
outputSchema, // Output table schema
380
"output-topic", // Topic name
381
producerProperties, // Kafka properties
382
Optional.empty(), // No custom partitioner
383
serializationSchema // Serialization schema
384
);
385
```
386
387
## Table API Integration Examples
388
389
### Complete Table API Workflow
390
391
```java { .api }
392
import org.apache.flink.table.api.EnvironmentSettings;
393
import org.apache.flink.table.api.TableEnvironment;
394
import org.apache.flink.table.api.Table;
395
396
// Setup Table API environment
397
EnvironmentSettings settings = EnvironmentSettings.newInstance()
398
.inStreamingMode()
399
.build();
400
TableEnvironment tEnv = TableEnvironment.create(settings);
401
402
// Register Kafka source table
403
TableSchema sourceSchema = TableSchema.builder()
404
.field("user_id", Types.LONG())
405
.field("product_id", Types.STRING())
406
.field("quantity", Types.INT())
407
.field("price", Types.DECIMAL())
408
.field("order_time", Types.SQL_TIMESTAMP())
409
.field("proc_time", Types.SQL_TIMESTAMP())
410
.build();
411
412
Properties sourceProps = new Properties();
413
sourceProps.setProperty("zookeeper.connect", "localhost:2181");
414
sourceProps.setProperty("group.id", "analytics-group");
415
416
Kafka08TableSource source = new Kafka08TableSource(
417
sourceSchema,
418
"orders",
419
sourceProps,
420
new JsonRowDeserializationSchema.Builder(sourceSchema).build()
421
);
422
423
tEnv.registerTableSource("Orders", source);
424
425
// Register Kafka sink table
426
TableSchema sinkSchema = TableSchema.builder()
427
.field("product_id", Types.STRING())
428
.field("total_quantity", Types.LONG())
429
.field("total_revenue", Types.DECIMAL())
430
.field("window_start", Types.SQL_TIMESTAMP())
431
.field("window_end", Types.SQL_TIMESTAMP())
432
.build();
433
434
Properties sinkProps = new Properties();
435
sinkProps.setProperty("metadata.broker.list", "localhost:9092");
436
437
Kafka08TableSink sink = new Kafka08TableSink(
438
sinkSchema,
439
"product-analytics",
440
sinkProps,
441
Optional.empty(),
442
new JsonRowSerializationSchema.Builder(sinkSchema).build()
443
);
444
445
tEnv.registerTableSink("ProductAnalytics", sink);
446
447
// Execute SQL query
448
String sql = """
449
INSERT INTO ProductAnalytics
450
SELECT
451
product_id,
452
SUM(quantity) as total_quantity,
453
SUM(quantity * price) as total_revenue,
454
TUMBLE_START(proc_time, INTERVAL '1' HOUR) as window_start,
455
TUMBLE_END(proc_time, INTERVAL '1' HOUR) as window_end
456
FROM Orders
457
GROUP BY
458
product_id,
459
TUMBLE(proc_time, INTERVAL '1' HOUR)
460
""";
461
462
tEnv.executeSql(sql);
463
```
464
465
### Dynamic Table Registration
466
467
```java { .api }
468
import org.apache.flink.table.descriptors.*;
469
470
// Register source using descriptors (alternative approach)
471
tEnv.connect(
472
new Kafka()
473
.version("0.8")
474
.topic("user-events")
475
.property("zookeeper.connect", "localhost:2181")
476
.property("group.id", "event-processors")
477
)
478
.withFormat(
479
new Json()
480
.failOnMissingField(false)
481
.deriveSchema()
482
)
483
.withSchema(
484
new Schema()
485
.field("event_id", Types.STRING())
486
.field("user_id", Types.LONG())
487
.field("event_type", Types.STRING())
488
.field("event_time", Types.SQL_TIMESTAMP())
489
.field("proc_time", Types.SQL_TIMESTAMP()).proctime()
490
)
491
.createTemporaryTable("UserEvents");
492
493
// Register sink using descriptors
494
tEnv.connect(
495
new Kafka()
496
.version("0.8")
497
.topic("processed-events")
498
.property("metadata.broker.list", "localhost:9092")
499
)
500
.withFormat(
501
new Json()
502
)
503
.withSchema(
504
new Schema()
505
.field("user_id", Types.LONG())
506
.field("event_count", Types.LONG())
507
.field("processing_time", Types.SQL_TIMESTAMP())
508
)
509
.createTemporaryTable("ProcessedEvents");
510
```
511
512
## Serialization Schemas for Table API
513
514
### JSON Row Serialization
515
516
```java { .api }
517
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
518
import org.apache.flink.formats.json.JsonRowSerializationSchema;
519
520
// JSON deserialization for table source
521
JsonRowDeserializationSchema.Builder deserBuilder =
522
new JsonRowDeserializationSchema.Builder(sourceSchema);
523
524
DeserializationSchema<Row> deserializer = deserBuilder
525
.ignoreParseErrors() // Skip malformed JSON
526
.build();
527
528
// JSON serialization for table sink
529
JsonRowSerializationSchema.Builder serBuilder =
530
new JsonRowSerializationSchema.Builder(sinkSchema);
531
532
SerializationSchema<Row> serializer = serBuilder.build();
533
```
534
535
### Custom Row Serialization
536
537
```java { .api }
538
// Custom deserialization schema
539
public class CsvRowDeserializationSchema implements DeserializationSchema<Row> {
540
private final TableSchema schema;
541
private final String delimiter;
542
543
public CsvRowDeserializationSchema(TableSchema schema, String delimiter) {
544
this.schema = schema;
545
this.delimiter = delimiter;
546
}
547
548
@Override
549
public Row deserialize(byte[] message) throws IOException {
550
String line = new String(message);
551
String[] fields = line.split(delimiter);
552
553
Row row = new Row(schema.getFieldCount());
554
for (int i = 0; i < fields.length && i < schema.getFieldCount(); i++) {
555
row.setField(i, convertField(fields[i], schema.getFieldTypes()[i]));
556
}
557
return row;
558
}
559
560
@Override
561
public boolean isEndOfStream(Row nextElement) {
562
return false;
563
}
564
565
@Override
566
public TypeInformation<Row> getProducedType() {
567
return schema.toRowType();
568
}
569
570
private Object convertField(String field, TypeInformation<?> type) {
571
// Type conversion logic based on schema
572
if (type == Types.STRING()) return field;
573
if (type == Types.LONG()) return Long.parseLong(field);
574
if (type == Types.INT()) return Integer.parseInt(field);
575
// ... other type conversions
576
return field;
577
}
578
}
579
```
580
581
## Configuration and Limitations
582
583
### Kafka 0.8 Table API Limitations
584
585
1. **No Timestamp Support**: Kafka 0.8 doesn't support message timestamps, affecting rowtime attributes
586
2. **Internal API**: All classes are marked `@Internal` and may change between versions
587
3. **Limited Watermark Strategies**: Restricted by Kafka 0.8's metadata capabilities
588
4. **ZooKeeper Dependency**: Requires ZooKeeper configuration for consumer operations
589
590
### Recommended Configuration
591
592
```java { .api }
593
// Source configuration for reliability
594
Properties sourceProps = new Properties();
595
sourceProps.setProperty("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181");
596
sourceProps.setProperty("group.id", "table-api-consumer");
597
sourceProps.setProperty("auto.offset.reset", "earliest");
598
sourceProps.setProperty("auto.commit.enable", "false"); // Managed by Flink
599
600
// Sink configuration for reliability
601
Properties sinkProps = new Properties();
602
sinkProps.setProperty("metadata.broker.list", "broker1:9092,broker2:9092");
603
sinkProps.setProperty("request.required.acks", "1");
604
sinkProps.setProperty("message.send.max.retries", "3");
605
606
// Enable checkpointing for exactly-once processing (source side only)
607
env.enableCheckpointing(60000);
608
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
609
```
610
611
### Migration Considerations
612
613
For production Table API usage, consider upgrading to Kafka 0.9+ connectors that provide:
614
615
1. **Stable Public API**: Non-internal classes with stability guarantees
616
2. **Message Timestamp Support**: Better rowtime attribute support
617
3. **Improved Reliability**: Exactly-once semantics for both source and sink
618
4. **Enhanced Performance**: Better resource utilization and throughput