0
# Table API Integration
1
2
The Flink Kafka 0.8 connector provides Table API integration for using Kafka topics as table sources and sinks in SQL queries and Table API operations.
3
4
## Capabilities
5
6
### Kafka08TableSource
7
8
Generic table source for consuming Kafka topics with custom deserialization schemas.
9
10
```java { .api }
11
/**
12
* Table source for Kafka 0.8.x topics with custom deserialization
13
*/
14
public class Kafka08TableSource extends KafkaTableSource {
15
/**
16
* Creates a Kafka table source with custom deserialization schema
17
* @param topic Kafka topic name
18
* @param properties Kafka consumer properties
19
* @param deserializationSchema Schema for deserializing Row objects
20
* @param typeInfo Type information for the resulting table
21
*/
22
public Kafka08TableSource(
23
String topic,
24
Properties properties,
25
DeserializationSchema<Row> deserializationSchema,
26
TypeInformation<Row> typeInfo
27
);
28
}
29
```
30
31
**Usage Example:**
32
33
```java
34
import org.apache.flink.table.api.TableEnvironment;
35
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;
36
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
37
import org.apache.flink.types.Row;
38
import org.apache.flink.api.common.typeinfo.TypeInformation;
39
import org.apache.flink.api.java.typeutils.RowTypeInfo;
40
41
import java.util.Properties;
42
43
// Configure Kafka properties
44
Properties props = new Properties();
45
props.setProperty("bootstrap.servers", "localhost:9092");
46
props.setProperty("zookeeper.connect", "localhost:2181");
47
props.setProperty("group.id", "table-consumer");
48
49
// Define table schema
50
TypeInformation<?>[] fieldTypes = {Types.STRING, Types.INT, Types.DOUBLE};
51
String[] fieldNames = {"name", "age", "salary"};
52
RowTypeInfo typeInfo = new RowTypeInfo(fieldTypes, fieldNames);
53
54
// Create custom deserialization schema
55
DeserializationSchema<Row> deserializer = new MyCustomRowDeserializer(typeInfo);
56
57
// Create table source
58
Kafka08TableSource tableSource = new Kafka08TableSource(
59
"employees",
60
props,
61
deserializer,
62
typeInfo
63
);
64
65
// Register as table
66
TableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
67
tableEnv.registerTableSource("employee_stream", tableSource);
68
69
// Use in SQL
70
Table result = tableEnv.sqlQuery("SELECT name, age FROM employee_stream WHERE salary > 50000");
71
```
72
73
### Kafka08JsonTableSource
74
75
Specialized table source for consuming JSON-formatted Kafka messages.
76
77
```java { .api }
78
/**
79
* Table source for JSON-formatted Kafka 0.8.x topics
80
*/
81
public class Kafka08JsonTableSource extends KafkaJsonTableSource {
82
/**
83
* Creates a JSON table source for Kafka topics
84
* @param topic Kafka topic name
85
* @param properties Kafka consumer properties
86
* @param typeInfo Type information describing the JSON structure
87
*/
88
public Kafka08JsonTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo);
89
}
90
```
91
92
**Usage Example:**
93
94
```java
95
import org.apache.flink.streaming.connectors.kafka.Kafka08JsonTableSource;
96
import org.apache.flink.api.common.typeinfo.Types;
97
import org.apache.flink.api.java.typeutils.RowTypeInfo;
98
99
// Define JSON schema
100
TypeInformation<?>[] fieldTypes = {
101
Types.STRING, // user_id
102
Types.STRING, // event_type
103
Types.LONG, // timestamp
104
Types.DOUBLE // value
105
};
106
String[] fieldNames = {"user_id", "event_type", "timestamp", "value"};
107
RowTypeInfo jsonTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
108
109
// Create JSON table source
110
Kafka08JsonTableSource jsonSource = new Kafka08JsonTableSource(
111
"user-events",
112
props,
113
jsonTypeInfo
114
);
115
116
tableEnv.registerTableSource("events", jsonSource);
117
118
// Query JSON data
119
Table eventStats = tableEnv.sqlQuery(
120
"SELECT event_type, COUNT(*) as event_count, AVG(value) as avg_value " +
121
"FROM events " +
122
"GROUP BY event_type"
123
);
124
```
125
126
### Kafka08AvroTableSource
127
128
Specialized table source for consuming Avro-formatted Kafka messages.
129
130
```java { .api }
131
/**
132
* Table source for Avro-formatted Kafka 0.8.x topics
133
*/
134
public class Kafka08AvroTableSource extends KafkaAvroTableSource {
135
/**
136
* Creates an Avro table source for Kafka topics
137
* @param topic Kafka topic name
138
* @param properties Kafka consumer properties
139
* @param record Avro record class extending SpecificRecordBase
140
*/
141
public Kafka08AvroTableSource(
142
String topic,
143
Properties properties,
144
Class<? extends SpecificRecordBase> record
145
);
146
}
147
```
148
149
**Usage Example:**
150
151
```java
152
import org.apache.flink.streaming.connectors.kafka.Kafka08AvroTableSource;
153
import org.apache.avro.specific.SpecificRecordBase;
154
155
// Assume you have an Avro-generated class
156
public class UserEvent extends SpecificRecordBase {
157
// Avro-generated fields and methods
158
}
159
160
// Create Avro table source
161
Kafka08AvroTableSource avroSource = new Kafka08AvroTableSource(
162
"user-events-avro",
163
props,
164
UserEvent.class
165
);
166
167
tableEnv.registerTableSource("avro_events", avroSource);
168
169
// Query Avro data (field names from Avro schema)
170
Table avroResults = tableEnv.sqlQuery(
171
"SELECT userId, eventType, COUNT(*) " +
172
"FROM avro_events " +
173
"WHERE timestamp > UNIX_TIMESTAMP() - 3600 " +
174
"GROUP BY userId, eventType"
175
);
176
```
177
178
### Kafka08JsonTableSink
179
180
Table sink for writing JSON-formatted data to Kafka topics.
181
182
```java { .api }
183
/**
184
* Table sink for JSON-formatted Kafka 0.8.x topics
185
*/
186
public class Kafka08JsonTableSink extends KafkaJsonTableSink {
187
/**
188
* Creates a JSON table sink for Kafka topics
189
* @param topic Kafka topic name
190
* @param properties Kafka producer properties
191
* @param partitioner Custom partitioner for message distribution
192
*/
193
public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner);
194
195
/**
196
* @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner
197
*/
198
@Deprecated
199
public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner);
200
}
201
```
202
203
**Usage Example:**
204
205
```java
206
import org.apache.flink.streaming.connectors.kafka.Kafka08JsonTableSink;
207
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
208
209
// Configure producer properties
210
Properties producerProps = new Properties();
211
producerProps.setProperty("bootstrap.servers", "localhost:9092");
212
213
// Create JSON table sink
214
Kafka08JsonTableSink jsonSink = new Kafka08JsonTableSink(
215
"output-events",
216
producerProps,
217
new FlinkFixedPartitioner<>()
218
);
219
220
// Register as sink
221
tableEnv.registerTableSink("json_output", jsonSink);
222
223
// Write query results to Kafka
224
Table processedData = tableEnv.sqlQuery(
225
"SELECT user_id, event_type, COUNT(*) as count " +
226
"FROM events " +
227
"GROUP BY user_id, event_type"
228
);
229
230
processedData.insertInto("json_output");
231
```
232
233
## Schema Definition
234
235
Define table schemas for different data formats:
236
237
### JSON Schema Definition
238
239
```java
240
// For JSON data like: {"id": "123", "name": "John", "score": 95.5}
241
TypeInformation<?>[] types = {Types.STRING, Types.STRING, Types.DOUBLE};
242
String[] names = {"id", "name", "score"};
243
RowTypeInfo jsonSchema = new RowTypeInfo(types, names);
244
```
245
246
### Complex Schema Definition
247
248
```java
249
// For nested JSON structures
250
TypeInformation<?>[] outerTypes = {
251
Types.STRING, // user_id
252
Types.ROW(Types.STRING, Types.INT), // profile (name, age)
253
Types.OBJECT_ARRAY(Types.STRING) // tags array
254
};
255
String[] outerNames = {"user_id", "profile", "tags"};
256
RowTypeInfo complexSchema = new RowTypeInfo(outerTypes, outerNames);
257
```
258
259
## SQL Integration
260
261
Use Kafka tables in SQL queries:
262
263
```sql
264
-- Create a view from Kafka source
265
CREATE VIEW user_events AS
266
SELECT
267
user_id,
268
event_type,
269
CAST(event_timestamp AS TIMESTAMP) as event_time,
270
value
271
FROM kafka_source;
272
273
-- Window aggregation
274
SELECT
275
TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
276
event_type,
277
COUNT(*) as event_count,
278
AVG(value) as avg_value
279
FROM user_events
280
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), event_type;
281
282
-- Insert results into output topic
283
INSERT INTO kafka_sink
284
SELECT user_id, event_type, COUNT(*) as count
285
FROM user_events
286
WHERE event_time > CURRENT_TIMESTAMP - INTERVAL '1' DAY
287
GROUP BY user_id, event_type;
288
```
289
290
## Configuration
291
292
### Consumer Properties for Table Sources
293
294
- **bootstrap.servers**: Kafka broker addresses
295
- **zookeeper.connect**: ZooKeeper connection string
296
- **group.id**: Consumer group for table source
297
- **auto.offset.reset**: Starting position for new consumer groups
298
299
### Producer Properties for Table Sinks
300
301
- **bootstrap.servers**: Kafka broker addresses
302
- **batch.size**: Batching configuration for performance
303
- **compression.type**: Message compression algorithm
304
305
### Format-Specific Configuration
306
307
```java
308
// JSON format properties
309
Properties jsonProps = new Properties();
310
jsonProps.setProperty("bootstrap.servers", "localhost:9092");
311
jsonProps.setProperty("group.id", "json-table-consumer");
312
jsonProps.setProperty("json.fail-on-missing-field", "false"); // Ignore missing fields
313
314
// Avro format properties
315
Properties avroProps = new Properties();
316
avroProps.setProperty("bootstrap.servers", "localhost:9092");
317
avroProps.setProperty("group.id", "avro-table-consumer");
318
avroProps.setProperty("schema.registry.url", "http://localhost:8081"); // If using schema registry
319
```
320
321
## Error Handling
322
323
Handle errors in table operations:
324
325
- **Deserialization errors**: Configure fail-on-error behavior
326
- **Schema evolution**: Handle missing or additional fields
327
- **Connection failures**: Implement retry logic at application level
328
329
```java
330
// Error handling configuration
331
Properties errorHandlingProps = new Properties();
332
errorHandlingProps.setProperty("bootstrap.servers", "localhost:9092");
333
errorHandlingProps.setProperty("group.id", "error-handling-consumer");
334
errorHandlingProps.setProperty("json.ignore-parse-errors", "true"); // Skip malformed JSON
335
errorHandlingProps.setProperty("consumer.max.poll.records", "100"); // Limit batch size
336
```