0
# Table API Integration
1
2
Table sources and sinks for SQL layer integration supporting various data formats (JSON, Avro) with automatic schema inference, connector descriptors, and streaming table operations. This enables declarative SQL queries over Kafka streams.
3
4
## Capabilities
5
6
### KafkaTableSource
7
8
Abstract base class for Kafka table sources providing streaming table functionality with support for projection pushdown, filter pushdown, and watermark extraction.
9
10
```java { .api }
11
public abstract class KafkaTableSource implements StreamTableSource<Row>,
12
DefinedProctimeAttribute, DefinedRowtimeAttributes {
13
14
// Abstract methods implemented by concrete versions
15
protected abstract FlinkKafkaConsumerBase<Row> createKafkaConsumer(
16
String topic,
17
Properties properties,
18
DeserializationSchema<Row> deserializationSchema
19
);
20
}
21
```
22
23
**Key Interfaces:**
24
- `StreamTableSource<Row>` - Provides streaming data source for table API
25
- `DefinedProctimeAttribute` - Supports processing time attribute definition
26
- `DefinedRowtimeAttributes` - Supports event time attribute definition
27
28
### KafkaTableSink
29
30
Abstract base class for Kafka table sinks providing streaming table output functionality with partitioning support.
31
32
```java { .api }
33
public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
34
35
// Abstract methods implemented by concrete versions
36
protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
37
String topic,
38
Properties properties,
39
SerializationSchema<Row> serializationSchema,
40
FlinkKafkaPartitioner<Row> partitioner
41
);
42
}
43
```
44
45
**Key Interface:**
46
- `AppendStreamTableSink<Row>` - Supports append-only table sink operations
47
48
### JSON Table Sources
49
50
#### KafkaJsonTableSource
51
52
Table source for JSON-formatted Kafka messages with field mapping support and schema inference.
53
54
```java { .api }
55
public abstract class KafkaJsonTableSource extends KafkaTableSource implements DefinedFieldMapping {
56
57
public KafkaJsonTableSource(
58
TableSchema schema,
59
Optional<String> proctimeAttribute,
60
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
61
Optional<Map<String, String>> fieldMapping,
62
String topic,
63
Properties properties,
64
DeserializationSchema<Row> deserializationSchema,
65
boolean failOnMissingField,
66
boolean ignoreParseErrors
67
);
68
}
69
```
70
71
**Parameters:**
72
- `schema` - Table schema defining column names and types
73
- `proctimeAttribute` - Optional processing time attribute name
74
- `rowtimeAttributeDescriptors` - Event time attribute descriptors
75
- `fieldMapping` - Optional mapping from table fields to JSON fields
76
- `topic` - Kafka topic to consume from
77
- `properties` - Kafka consumer properties
78
- `deserializationSchema` - Row deserialization schema
79
- `failOnMissingField` - Whether to fail on missing JSON fields
80
- `ignoreParseErrors` - Whether to skip records with parse errors
81
82
**Usage Example:**
83
84
```java
85
// Define table schema
86
TableSchema schema = TableSchema.builder()
87
.field("user_id", DataTypes.STRING())
88
.field("action", DataTypes.STRING())
89
.field("timestamp", DataTypes.TIMESTAMP(3))
90
.field("proctime", DataTypes.TIMESTAMP(3))
91
.build();
92
93
// Create JSON table source
94
KafkaJsonTableSource source = new MyKafkaJsonTableSource(
95
schema,
96
Optional.of("proctime"),
97
Collections.emptyList(),
98
Optional.empty(),
99
"user-events",
100
kafkaProperties,
101
new JsonRowDeserializationSchema(schema.toRowType()),
102
false, // Don't fail on missing fields
103
true // Skip parse errors
104
);
105
```
106
107
#### KafkaJsonTableSourceFactory
108
109
Factory for creating KafkaJsonTableSource instances from table descriptors using SQL DDL.
110
111
```java { .api }
112
public abstract class KafkaJsonTableSourceFactory implements TableSourceFactory<Row> {
113
114
public Map<String, String> requiredContext();
115
public List<String> supportedProperties();
116
public TableSource<Row> createTableSource(Map<String, String> properties);
117
}
118
```
119
120
**SQL DDL Example:**
121
122
```sql
123
CREATE TABLE user_events (
124
user_id STRING,
125
action STRING,
126
event_time TIMESTAMP(3),
127
proctime AS PROCTIME(),
128
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
129
) WITH (
130
'connector' = 'kafka',
131
'topic' = 'user-events',
132
'properties.bootstrap.servers' = 'localhost:9092',
133
'properties.group.id' = 'test-group',
134
'format' = 'json',
135
'scan.startup.mode' = 'earliest-offset'
136
);
137
```
138
139
### JSON Table Sinks
140
141
#### KafkaJsonTableSink
142
143
Table sink for writing JSON-formatted messages to Kafka with optional partitioning.
144
145
```java { .api }
146
public abstract class KafkaJsonTableSink extends KafkaTableSink {
147
148
public KafkaJsonTableSink(
149
TableSchema schema,
150
String topic,
151
Properties properties,
152
Optional<FlinkKafkaPartitioner<Row>> partitioner,
153
SerializationSchema<Row> serializationSchema
154
);
155
}
156
```
157
158
**Usage Example:**
159
160
```java
161
// Create JSON table sink
162
KafkaJsonTableSink sink = new MyKafkaJsonTableSink(
163
schema,
164
"output-events",
165
kafkaProperties,
166
Optional.of(new FlinkFixedPartitioner<>()),
167
new JsonRowSerializationSchema(schema.toRowType())
168
);
169
```
170
171
### Avro Table Sources
172
173
#### KafkaAvroTableSource
174
175
Table source for Avro-formatted Kafka messages with field mapping support and schema registry integration.
176
177
```java { .api }
178
public abstract class KafkaAvroTableSource extends KafkaTableSource implements DefinedFieldMapping {
179
180
public KafkaAvroTableSource(
181
TableSchema schema,
182
Optional<String> proctimeAttribute,
183
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
184
Optional<Map<String, String>> fieldMapping,
185
String topic,
186
Properties properties,
187
DeserializationSchema<Row> deserializationSchema
188
);
189
}
190
```
191
192
**Usage Example:**
193
194
```java
195
// Properties for Avro with Schema Registry
196
Properties props = new Properties();
197
props.setProperty("bootstrap.servers", "localhost:9092");
198
props.setProperty("schema.registry.url", "http://localhost:8081");
199
200
// Create Avro table source
201
KafkaAvroTableSource source = new MyKafkaAvroTableSource(
202
schema,
203
Optional.empty(),
204
Collections.emptyList(),
205
Optional.empty(),
206
"avro-events",
207
props,
208
new AvroRowDeserializationSchema(avroSchema)
209
);
210
```
211
212
## Table Descriptors
213
214
### Kafka Connector Descriptor
215
216
Programmatic configuration for Kafka table sources and sinks.
217
218
```java { .api }
219
public class Kafka extends ConnectorDescriptor {
220
public Kafka();
221
public Kafka version(String version);
222
public Kafka topic(String topic);
223
public Kafka properties(Properties properties);
224
public Kafka property(String key, String value);
225
public Kafka startFromEarliest();
226
public Kafka startFromLatest();
227
public Kafka startFromGroupOffsets();
228
public Kafka startFromSpecificOffsets(Map<Integer, Long> specificOffsets);
229
}
230
```
231
232
**Usage Example:**
233
234
```java
235
// Create table environment
236
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
237
238
// Define source using descriptor API
239
tEnv.connect(
240
new Kafka()
241
.version("universal")
242
.topic("input-topic")
243
.property("bootstrap.servers", "localhost:9092")
244
.property("group.id", "test-group")
245
.startFromEarliest()
246
)
247
.withFormat(new Json().failOnMissingField(false))
248
.withSchema(new Schema()
249
.field("user_id", DataTypes.STRING())
250
.field("action", DataTypes.STRING())
251
.field("timestamp", DataTypes.TIMESTAMP(3))
252
.field("proctime", DataTypes.TIMESTAMP(3).proctime())
253
.field("rowtime", DataTypes.TIMESTAMP(3).rowtime())
254
)
255
.createTemporaryTable("user_events");
256
```
257
258
### KafkaValidator
259
260
Validator for Kafka table descriptors ensuring proper configuration.
261
262
```java { .api }
263
public class KafkaValidator extends ConnectorDescriptorValidator {
264
public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
265
public static final String CONNECTOR_VERSION = "connector.version";
266
public static final String CONNECTOR_TOPIC = "connector.topic";
267
public static final String CONNECTOR_PROPERTIES = "connector.properties";
268
public static final String CONNECTOR_STARTUP_MODE = "connector.startup-mode";
269
public static final String CONNECTOR_SPECIFIC_OFFSETS = "connector.specific-offsets";
270
}
271
```
272
273
## Time Attributes and Watermarks
274
275
### Processing Time
276
277
Define processing time attribute for time-based operations:
278
279
```java
280
// In table source constructor
281
Optional<String> proctimeAttribute = Optional.of("proctime");
282
283
// In SQL DDL
284
proctime AS PROCTIME()
285
```
286
287
### Event Time and Watermarks
288
289
Define event time attributes with watermark strategies:
290
291
```java
292
// Rowtime descriptor with watermark strategy
293
List<RowtimeAttributeDescriptor> rowtimeAttributes = Arrays.asList(
294
new RowtimeAttributeDescriptor(
295
"rowtime",
296
new ExistingField("timestamp"),
297
new BoundedOutOfOrderTimestamps(5000) // 5 second out-of-orderness
298
)
299
);
300
301
// In SQL DDL
302
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
303
```
304
305
## Advanced Usage Examples
306
307
### Complex Query with Windowing
308
309
```sql
310
-- Create source table
311
CREATE TABLE user_events (
312
user_id STRING,
313
action STRING,
314
amount DECIMAL(10,2),
315
event_time TIMESTAMP(3),
316
proctime AS PROCTIME(),
317
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
318
) WITH (
319
'connector' = 'kafka',
320
'topic' = 'user-events',
321
'properties.bootstrap.servers' = 'localhost:9092',
322
'format' = 'json'
323
);
324
325
-- Create sink table
326
CREATE TABLE hourly_stats (
327
window_start TIMESTAMP(3),
328
window_end TIMESTAMP(3),
329
user_count BIGINT,
330
total_amount DECIMAL(10,2)
331
) WITH (
332
'connector' = 'kafka',
333
'topic' = 'hourly-stats',
334
'properties.bootstrap.servers' = 'localhost:9092',
335
'format' = 'json'
336
);
337
338
-- Windowed aggregation query
339
INSERT INTO hourly_stats
340
SELECT
341
TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
342
TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end,
343
COUNT(DISTINCT user_id) as user_count,
344
SUM(amount) as total_amount
345
FROM user_events
346
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);
347
```
348
349
### Join with Lookup Table
350
351
```sql
352
-- Kafka stream
353
CREATE TABLE orders (
354
order_id STRING,
355
user_id STRING,
356
product_id STRING,
357
quantity INT,
358
order_time TIMESTAMP(3),
359
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
360
) WITH (
361
'connector' = 'kafka',
362
'topic' = 'orders',
363
'properties.bootstrap.servers' = 'localhost:9092',
364
'format' = 'json'
365
);
366
367
-- Enriched output
368
CREATE TABLE enriched_orders (
369
order_id STRING,
370
user_id STRING,
371
product_name STRING,
372
quantity INT,
373
total_price DECIMAL(10,2),
374
order_time TIMESTAMP(3)
375
) WITH (
376
'connector' = 'kafka',
377
'topic' = 'enriched-orders',
378
'properties.bootstrap.servers' = 'localhost:9092',
379
'format' = 'json'
380
);
381
382
-- Join with product catalog (assuming JDBC lookup table)
383
INSERT INTO enriched_orders
384
SELECT
385
o.order_id,
386
o.user_id,
387
p.product_name,
388
o.quantity,
389
o.quantity * p.price as total_price,
390
o.order_time
391
FROM orders o
392
JOIN product_catalog FOR SYSTEM_TIME AS OF o.order_time AS p
393
ON o.product_id = p.product_id;
394
```
395
396
## Configuration Best Practices
397
398
### Consumer Configuration
399
400
```java
401
Properties consumerProps = new Properties();
402
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
403
consumerProps.setProperty("group.id", "flink-table-consumer");
404
consumerProps.setProperty("auto.offset.reset", "earliest");
405
consumerProps.setProperty("enable.auto.commit", "false"); // Managed by Flink
406
consumerProps.setProperty("max.poll.records", "500");
407
```
408
409
### Producer Configuration
410
411
```java
412
Properties producerProps = new Properties();
413
producerProps.setProperty("bootstrap.servers", "localhost:9092");
414
producerProps.setProperty("transaction.timeout.ms", "900000");
415
producerProps.setProperty("enable.idempotence", "true");
416
producerProps.setProperty("acks", "all");
417
```
418
419
### Schema Registry Integration
420
421
```java
422
Properties avroProps = new Properties();
423
avroProps.setProperty("bootstrap.servers", "localhost:9092");
424
avroProps.setProperty("schema.registry.url", "http://localhost:8081");
425
avroProps.setProperty("specific.avro.reader", "true");
426
```