0
# Table/SQL API Integration
1
2
The Flink Kafka 0.11 connector provides comprehensive Table API and SQL integration through factory classes that enable declarative table definitions and seamless integration with Flink's SQL engine.
3
4
## Capabilities
5
6
### Dynamic Table Factory
7
8
Modern factory implementation for SQL DDL support and dynamic table creation.
9
10
```java { .api }
11
/**
12
* Factory for creating dynamic table sources and sinks for Kafka 0.11.x
13
* Supports the new Dynamic Table API introduced in Flink 1.11+
14
*/
15
class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {
16
/**
17
* Factory identifier used in SQL DDL CREATE TABLE statements
18
* @return "kafka-0.11" identifier for connector specification
19
*/
20
String factoryIdentifier();
21
}
22
```
23
24
**Usage Examples:**
25
26
```sql
27
-- SQL DDL using the kafka-0.11 connector identifier
28
CREATE TABLE user_events (
29
user_id BIGINT,
30
event_type STRING,
31
event_data STRING,
32
event_timestamp TIMESTAMP(3),
33
WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '5' SECOND
34
) WITH (
35
'connector' = 'kafka-0.11',
36
'topic' = 'user-events',
37
'properties.bootstrap.servers' = 'localhost:9092',
38
'properties.group.id' = 'flink-consumer-group',
39
'scan.startup.mode' = 'earliest-offset',
40
'format' = 'json'
41
);
42
43
-- Create sink table
44
CREATE TABLE processed_events (
45
user_id BIGINT,
46
event_count BIGINT,
47
processing_time TIMESTAMP(3)
48
) WITH (
49
'connector' = 'kafka-0.11',
50
'topic' = 'processed-events',
51
'properties.bootstrap.servers' = 'localhost:9092',
52
'properties.transaction.timeout.ms' = '900000',
53
'sink.semantic' = 'exactly-once',
54
'format' = 'json'
55
);
56
```
57
58
### Dynamic Table Source
59
60
Kafka table source implementation for the modern Dynamic Table API.
61
62
```java { .api }
63
/**
64
* Dynamic table source for Kafka 0.11.x supporting advanced features
65
* like watermark generation and projection pushdown
66
*/
67
@Internal
68
class Kafka011DynamicSource extends KafkaDynamicSourceBase {
69
/**
70
* Constructor for dynamic table source
71
* @param outputDataType the output data type of the source
72
* @param topic the Kafka topic name
73
* @param properties Kafka consumer properties
74
* @param decodingFormat format for deserializing records
75
* @param startupMode how to start consuming (earliest, latest, etc.)
76
* @param specificStartupOffsets specific partition offsets for startup
77
* @param startupTimestampMillis timestamp for timestamp-based startup
78
*/
79
Kafka011DynamicSource(
80
DataType outputDataType,
81
String topic,
82
Properties properties,
83
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
84
StartupMode startupMode,
85
Map<KafkaTopicPartition, Long> specificStartupOffsets,
86
long startupTimestampMillis
87
);
88
89
/**
90
* Create a copy of this source for runtime instantiation
91
* @return copied source instance
92
*/
93
DynamicTableSource copy();
94
95
/**
96
* Summary string for debugging and logging
97
* @return "Kafka-0.11" description
98
*/
99
String asSummaryString();
100
}
101
```
102
103
### Dynamic Table Sink
104
105
Kafka table sink implementation for the modern Dynamic Table API.
106
107
```java { .api }
108
/**
109
* Dynamic table sink for Kafka 0.11.x supporting transactional writes
110
* and exactly-once semantics for table operations
111
*/
112
@Internal
113
class Kafka011DynamicSink extends KafkaDynamicSinkBase {
114
/**
115
* Constructor for dynamic table sink
116
* @param consumedDataType the data type consumed by the sink
117
* @param topic the target Kafka topic name
118
* @param properties Kafka producer properties
119
* @param partitioner optional custom partitioner for records
120
* @param encodingFormat format for serializing records
121
*/
122
Kafka011DynamicSink(
123
DataType consumedDataType,
124
String topic,
125
Properties properties,
126
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
127
EncodingFormat<SerializationSchema<RowData>> encodingFormat
128
);
129
130
/**
131
* Create a copy of this sink for runtime instantiation
132
* @return copied sink instance
133
*/
134
DynamicTableSink copy();
135
136
/**
137
* Summary string for debugging and logging
138
* @return "Kafka 0.11 table sink" description
139
*/
140
String asSummaryString();
141
}
142
```
143
144
### Legacy Table API Support
145
146
Legacy factory and table implementations for backward compatibility with older Flink versions.
147
148
```java { .api }
149
/**
150
* Legacy table source for Kafka 0.11.x (pre-1.11 Table API)
151
* Maintained for backward compatibility
152
*/
153
@Internal
154
class Kafka011TableSource extends KafkaTableSourceBase {
155
// Full constructor with all table configuration options
156
Kafka011TableSource(
157
TableSchema schema,
158
Optional<String> proctimeAttribute,
159
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
160
Optional<Map<String, String>> fieldMapping,
161
String topic,
162
Properties properties,
163
DeserializationSchema<Row> deserializationSchema,
164
StartupMode startupMode,
165
Map<KafkaTopicPartition, Long> specificStartupOffsets,
166
long startupTimestampMillis
167
);
168
169
// Simplified constructor for basic use cases
170
Kafka011TableSource(
171
TableSchema schema,
172
String topic,
173
Properties properties,
174
DeserializationSchema<Row> deserializationSchema
175
);
176
}
177
178
/**
179
* Legacy table sink for Kafka 0.11.x (pre-1.11 Table API)
180
* Maintained for backward compatibility
181
*/
182
@Internal
183
class Kafka011TableSink extends KafkaTableSinkBase {
184
/**
185
* Constructor for legacy table sink
186
* @param schema table schema definition
187
* @param topic target Kafka topic name
188
* @param properties Kafka producer properties
189
* @param partitioner optional custom partitioner
190
* @param serializationSchema schema for serializing rows
191
*/
192
Kafka011TableSink(
193
TableSchema schema,
194
String topic,
195
Properties properties,
196
Optional<FlinkKafkaPartitioner<Row>> partitioner,
197
SerializationSchema<Row> serializationSchema
198
);
199
}
200
201
/**
202
* Legacy factory for creating table sources and sinks
203
* Maintained for backward compatibility with older Flink versions
204
*/
205
class Kafka011TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
206
// Factory methods inherited from base class
207
// Used by Table API environment for table registration
208
}
209
```
210
211
**Usage Examples:**
212
213
```java
214
// Programmatic table registration (legacy approach)
215
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
216
217
Properties properties = new Properties();
218
properties.setProperty("bootstrap.servers", "localhost:9092");
219
properties.setProperty("group.id", "table-consumer");
220
221
// Register source table
222
Kafka011TableSource source = new Kafka011TableSource(
223
TableSchema.builder()
224
.field("user_id", DataTypes.BIGINT())
225
.field("event_data", DataTypes.STRING())
226
.field("event_time", DataTypes.TIMESTAMP(3))
227
.build(),
228
"input-topic",
229
properties,
230
new JsonDeserializationSchema()
231
);
232
233
tableEnv.registerTableSource("kafka_source", source);
234
235
// Register sink table
236
Kafka011TableSink sink = new Kafka011TableSink(
237
TableSchema.builder()
238
.field("result", DataTypes.STRING())
239
.field("count", DataTypes.BIGINT())
240
.build(),
241
"output-topic",
242
properties,
243
Optional.empty(),
244
new JsonSerializationSchema()
245
);
246
247
tableEnv.registerTableSink("kafka_sink", sink);
248
```
249
250
## Table Configuration Options
251
252
SQL DDL configuration options for Kafka connector tables.
253
254
### Core Connection Options
255
256
```java { .api }
257
// Required configuration options
258
'connector' = 'kafka-0.11' // Connector identifier
259
'topic' = 'topic-name' // Kafka topic name
260
'properties.bootstrap.servers' = 'host:port' // Kafka broker addresses
261
```
262
263
### Consumer Configuration Options
264
265
```java { .api }
266
// Consumer-specific options for source tables
267
'properties.group.id' = 'consumer-group' // Consumer group ID
268
'scan.startup.mode' = 'mode' // Startup mode: earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp
269
'scan.startup.specific-offsets' = 'offsets' // Partition-specific offsets (partition:offset,partition:offset,...)
270
'scan.startup.timestamp-millis' = 'timestamp' // Timestamp for timestamp mode
271
```
272
273
### Producer Configuration Options
274
275
```java { .api }
276
// Producer-specific options for sink tables
277
'sink.partitioner' = 'partitioner' // Partitioner: fixed, round-robin, or custom class name
278
'sink.semantic' = 'semantic' // Delivery semantic: exactly-once, at-least-once, none
279
'properties.transaction.timeout.ms' = 'timeout' // Transaction timeout for exactly-once
280
```
281
282
**Complete SQL DDL Examples:**
283
284
```sql
285
-- Source table with watermarks and specific startup configuration
286
CREATE TABLE orders_source (
287
order_id BIGINT,
288
customer_id BIGINT,
289
product_id BIGINT,
290
quantity INT,
291
price DECIMAL(10,2),
292
order_timestamp TIMESTAMP(3),
293
WATERMARK FOR order_timestamp AS order_timestamp - INTERVAL '10' SECOND
294
) WITH (
295
'connector' = 'kafka-0.11',
296
'topic' = 'orders',
297
'properties.bootstrap.servers' = 'kafka-cluster:9092',
298
'properties.group.id' = 'orders-processor',
299
'properties.auto.offset.reset' = 'earliest',
300
'scan.startup.mode' = 'timestamp',
301
'scan.startup.timestamp-millis' = '1609459200000',
302
'format' = 'avro-confluent',
303
'avro-confluent.url' = 'http://schema-registry:8081'
304
);
305
306
-- Sink table with exactly-once semantics
307
CREATE TABLE order_aggregates_sink (
308
customer_id BIGINT,
309
total_orders BIGINT,
310
total_amount DECIMAL(15,2),
311
window_start TIMESTAMP(3),
312
window_end TIMESTAMP(3)
313
) WITH (
314
'connector' = 'kafka-0.11',
315
'topic' = 'order-aggregates',
316
'properties.bootstrap.servers' = 'kafka-cluster:9092',
317
'properties.transaction.timeout.ms' = '900000',
318
'sink.semantic' = 'exactly-once',
319
'sink.partitioner' = 'fixed',
320
'format' = 'json'
321
);
322
323
-- Use tables in SQL query
324
INSERT INTO order_aggregates_sink
325
SELECT
326
customer_id,
327
COUNT(*) as total_orders,
328
SUM(price * quantity) as total_amount,
329
TUMBLE_START(order_timestamp, INTERVAL '1' HOUR) as window_start,
330
TUMBLE_END(order_timestamp, INTERVAL '1' HOUR) as window_end
331
FROM orders_source
332
GROUP BY customer_id, TUMBLE(order_timestamp, INTERVAL '1' HOUR);
333
```
334
335
## Service Loader Integration
336
337
The connector registers its factories through Java's Service Loader mechanism for automatic discovery.
338
339
```java { .api }
340
// Service registration files:
341
// META-INF/services/org.apache.flink.table.factories.Factory
342
// -> org.apache.flink.streaming.connectors.kafka.table.Kafka011DynamicTableFactory
343
//
344
// META-INF/services/org.apache.flink.table.factories.TableFactory
345
// -> org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
346
```
347
348
This enables automatic discovery and instantiation of the Kafka connector when using SQL DDL or programmatic table registration.