0
# Table API Integration
1
2
The Flink Kafka 0.10 connector provides comprehensive integration with Flink's Table API and SQL, enabling declarative stream processing with Kafka sources and sinks. It supports both legacy table factories and the new dynamic table API.
3
4
## Capabilities
5
6
### Dynamic Table Factory
7
8
Modern table factory implementation for creating Kafka sources and sinks through SQL DDL and Table API.
9
10
```java { .api }
11
/**
12
* Dynamic table factory for Kafka 0.10.x integration
13
*/
14
public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
15
/** Connector identifier used in SQL DDL */
16
public static final String IDENTIFIER = "kafka-0.10";
17
}
18
19
/**
20
* Dynamic table source implementation for Kafka 0.10.x
21
*/
22
public class Kafka010DynamicSource extends KafkaDynamicSourceBase {
23
// Inherits all functionality from base dynamic source
24
}
25
26
/**
27
* Dynamic table sink implementation for Kafka 0.10.x
28
*/
29
public class Kafka010DynamicSink extends KafkaDynamicSinkBase {
30
// Inherits all functionality from base dynamic sink
31
}
32
```
33
34
**Usage Examples:**
35
36
```sql
37
-- Create Kafka source table
38
CREATE TABLE kafka_source (
39
user_id BIGINT,
40
event_name STRING,
41
event_time TIMESTAMP(3),
42
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
43
) WITH (
44
'connector' = 'kafka-0.10',
45
'topic' = 'user-events',
46
'properties.bootstrap.servers' = 'localhost:9092',
47
'properties.group.id' = 'my-consumer-group',
48
'format' = 'json',
49
'scan.startup.mode' = 'earliest-offset'
50
);
51
52
-- Create Kafka sink table
53
CREATE TABLE kafka_sink (
54
user_id BIGINT,
55
aggregated_count BIGINT,
56
window_start TIMESTAMP(3),
57
window_end TIMESTAMP(3)
58
) WITH (
59
'connector' = 'kafka-0.10',
60
'topic' = 'user-aggregates',
61
'properties.bootstrap.servers' = 'localhost:9092',
62
'format' = 'json',
63
'sink.partitioner' = 'round-robin'
64
);
65
66
-- Query using the tables
67
INSERT INTO kafka_sink
68
SELECT
69
user_id,
70
COUNT(*) as aggregated_count,
71
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
72
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end
73
FROM kafka_source
74
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE);
75
```
76
77
```java
78
// Table API usage
79
import org.apache.flink.table.api.EnvironmentSettings;
80
import org.apache.flink.table.api.TableEnvironment;
81
82
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
83
84
// Create source table descriptor
85
TableDescriptor sourceDescriptor = TableDescriptor.forConnector("kafka-0.10")
86
.schema(Schema.newBuilder()
87
.column("user_id", DataTypes.BIGINT())
88
.column("event_name", DataTypes.STRING())
89
.column("event_time", DataTypes.TIMESTAMP(3))
90
.watermark("event_time", "event_time - INTERVAL '5' SECOND")
91
.build())
92
.option("topic", "user-events")
93
.option("properties.bootstrap.servers", "localhost:9092")
94
.option("properties.group.id", "my-consumer-group")
95
.option("format", "json")
96
.option("scan.startup.mode", "earliest-offset")
97
.build();
98
99
tableEnv.createTemporaryTable("kafka_source", sourceDescriptor);
100
```
101
102
### Legacy Table Factory
103
104
Legacy table factory implementation for backward compatibility with older Table API versions.
105
106
```java { .api }
107
/**
108
* Legacy table factory for creating Kafka 0.10.x sources and sinks
109
*/
110
public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
111
/**
112
* Returns the Kafka version supported by this factory
113
* @return Kafka version string "0.10"
114
*/
115
protected String kafkaVersion();
116
117
/**
118
* Indicates whether this connector supports Kafka timestamps
119
* @return true, as Kafka 0.10.x supports timestamps
120
*/
121
protected boolean supportsKafkaTimestamps();
122
}
123
124
/**
125
* Legacy table source implementation for consuming from Kafka 0.10.x in Table API
126
*/
127
public class Kafka010TableSource extends KafkaTableSourceBase {
128
// Internal implementation - extends base Kafka table source
129
}
130
131
/**
132
* Legacy table sink implementation for writing to Kafka 0.10.x in Table API
133
*/
134
public class Kafka010TableSink extends KafkaTableSinkBase {
135
// Internal implementation - extends base Kafka table sink
136
}
137
```
138
139
**Usage Examples:**
140
141
```java
142
import org.apache.flink.table.api.TableEnvironment;
143
import org.apache.flink.table.descriptors.Kafka;
144
import org.apache.flink.table.descriptors.Json;
145
import org.apache.flink.table.descriptors.Schema;
146
147
// Legacy descriptor-based approach
148
TableEnvironment tableEnv = TableEnvironment.create(...);
149
150
tableEnv.connect(
151
new Kafka()
152
.version("0.10")
153
.topic("user-events")
154
.property("bootstrap.servers", "localhost:9092")
155
.property("group.id", "my-consumer-group")
156
.startFromEarliest()
157
)
158
.withFormat(new Json())
159
.withSchema(new Schema()
160
.field("user_id", DataTypes.BIGINT())
161
.field("event_name", DataTypes.STRING())
162
.field("event_time", DataTypes.TIMESTAMP(3))
163
)
164
.createTemporaryTable("legacy_kafka_source");
165
```
166
167
## Configuration Options
168
169
### Source Configuration
170
171
**Required Options:**
172
- `connector`: Must be "kafka-0.10"
173
- `topic`: Kafka topic name (or list of topics separated by semicolon)
174
- `properties.bootstrap.servers`: Kafka broker addresses
175
176
**Startup Mode Options:**
177
- `scan.startup.mode`: How to start consuming ("earliest-offset", "latest-offset", "group-offsets", "timestamp", "specific-offsets")
178
- `scan.startup.timestamp-millis`: Start timestamp when mode is "timestamp"
179
- `scan.startup.specific-offsets`: Specific partition offsets when mode is "specific-offsets"
180
181
**Consumer Properties:**
182
- `properties.group.id`: Consumer group ID
183
- `properties.auto.offset.reset`: Offset reset behavior ("earliest", "latest")
184
- `properties.flink.poll-timeout`: Polling timeout in milliseconds
185
186
**Pattern Subscription:**
187
- `topic-pattern`: Regular expression pattern for topic subscription instead of specific topics
188
189
### Sink Configuration
190
191
**Required Options:**
192
- `connector`: Must be "kafka-0.10"
193
- `topic`: Target Kafka topic name
194
- `properties.bootstrap.servers`: Kafka broker addresses
195
196
**Partitioning Options:**
197
- `sink.partitioner`: Partitioning strategy ("default", "round-robin", "custom")
198
- `sink.partitioner-class`: Custom partitioner class when using "custom"
199
200
**Producer Properties:**
201
- `properties.acks`: Acknowledgment mode ("all", "1", "0")
202
- `properties.retries`: Number of retries for failed sends
203
- `properties.enable.idempotence`: Enable idempotent producer for exactly-once
204
205
**Timestamp Options:**
206
- `sink.timestamp-field`: Field to use as Kafka record timestamp
207
- `sink.timestamp-format`: Timestamp format specification
208
209
### Format Integration
210
211
The connector works with various format specifications:
212
213
```sql
214
-- JSON format
215
CREATE TABLE kafka_json_source (...) WITH (
216
'connector' = 'kafka-0.10',
217
'format' = 'json',
218
'json.ignore-parse-errors' = 'true',
219
'json.timestamp-format.standard' = 'ISO-8601'
220
);
221
222
-- Avro format
223
CREATE TABLE kafka_avro_source (...) WITH (
224
'connector' = 'kafka-0.10',
225
'format' = 'avro',
226
'avro.schema-registry.url' = 'http://localhost:8081'
227
);
228
229
-- CSV format
230
CREATE TABLE kafka_csv_source (...) WITH (
231
'connector' = 'kafka-0.10',
232
'format' = 'csv',
233
'csv.field-delimiter' = ',',
234
'csv.ignore-parse-errors' = 'true'
235
);
236
```
237
238
## Watermark Strategies
239
240
Configure watermark generation for event time processing:
241
242
```sql
243
-- Bounded out-of-orderness watermarks
244
CREATE TABLE kafka_source (
245
user_id BIGINT,
246
event_name STRING,
247
event_time TIMESTAMP(3),
248
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
249
) WITH (
250
'connector' = 'kafka-0.10',
251
'topic' = 'events'
252
);
253
254
-- Ascending timestamps watermarks
255
CREATE TABLE kafka_ordered_source (
256
user_id BIGINT,
257
event_time TIMESTAMP(3),
258
WATERMARK FOR event_time AS event_time
259
) WITH (
260
'connector' = 'kafka-0.10',
261
'topic' = 'ordered-events'
262
);
263
```
264
265
## Multiple Topics and Pattern Subscription
266
267
### Multiple Topics
268
269
```sql
270
-- Multiple specific topics
271
CREATE TABLE multi_topic_source (...) WITH (
272
'connector' = 'kafka-0.10',
273
'topic' = 'topic1;topic2;topic3',
274
'properties.bootstrap.servers' = 'localhost:9092'
275
);
276
```
277
278
### Pattern-Based Subscription
279
280
```sql
281
-- Pattern-based topic subscription
282
CREATE TABLE pattern_source (...) WITH (
283
'connector' = 'kafka-0.10',
284
'topic-pattern' = 'logs-.*',
285
'properties.bootstrap.servers' = 'localhost:9092',
286
'properties.flink.partition-discovery.interval-millis' = '30000'
287
);
288
```
289
290
## Exactly-Once Semantics
291
292
Configure exactly-once processing for both sources and sinks:
293
294
### Source Configuration
295
296
```sql
297
CREATE TABLE exactly_once_source (...) WITH (
298
'connector' = 'kafka-0.10',
299
'properties.isolation.level' = 'read_committed',
300
'properties.enable.auto.commit' = 'false'
301
);
302
```
303
304
### Sink Configuration
305
306
```sql
307
CREATE TABLE exactly_once_sink (...) WITH (
308
'connector' = 'kafka-0.10',
309
'properties.acks' = 'all',
310
'properties.retries' = '3',
311
'properties.enable.idempotence' = 'true',
312
'properties.max.in.flight.requests.per.connection' = '1'
313
);
314
```
315
316
## Service Provider Registration
317
318
The connector automatically registers itself with Flink's service provider mechanism:
319
320
**META-INF/services/org.apache.flink.table.factories.Factory:**
321
- `org.apache.flink.streaming.connectors.kafka.table.Kafka010DynamicTableFactory`
322
323
**META-INF/services/org.apache.flink.table.factories.TableFactory:**
324
- `org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory`
325
326
## Migration from Legacy to Dynamic API
327
328
When migrating from legacy descriptor-based API to modern SQL DDL:
329
330
**Legacy (Deprecated):**
331
```java
332
tableEnv.connect(new Kafka().version("0.10").topic("my-topic"))
333
.withFormat(new Json())
334
.withSchema(new Schema().field("id", DataTypes.BIGINT()))
335
.createTemporaryTable("my_table");
336
```
337
338
**Modern (Recommended):**
339
```sql
340
CREATE TABLE my_table (
341
id BIGINT
342
) WITH (
343
'connector' = 'kafka-0.10',
344
'topic' = 'my-topic',
345
'format' = 'json'
346
);
347
```
348
349
## Error Handling
350
351
Table API integration provides several error handling strategies:
352
353
- **Parse Errors**: Configure format options to ignore or fail on parse errors
354
- **Serialization Errors**: Log-only mode for non-critical failures
355
- **Connection Errors**: Automatic retry mechanisms with exponential backoff
356
- **Schema Evolution**: Support for schema registry integration with Avro/JSON formats