0
# Table API Integration
1
2
Table API and SQL integration for declarative stream processing with Kafka 0.9 sources and sinks through factory-based configuration and schema-aware data processing.
3
4
## Capabilities
5
6
### Kafka09TableSourceSinkFactory
7
8
Factory class for creating Kafka 0.9 table sources and sinks in the Table API ecosystem.
9
10
```java { .api }
11
/**
12
* Factory for creating configured instances of Kafka 0.9 table sources and sinks.
13
* Extends the base Kafka table factory with version-specific implementations.
14
*/
15
public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
16
17
/**
18
* Returns the Kafka version identifier for this factory.
19
*
20
* @return "0.9" as the version string
21
*/
22
@Override
23
protected String kafkaVersion();
24
25
/**
26
* Indicates whether this version supports Kafka timestamps.
27
*
28
* @return false, as Kafka 0.9 does not support message timestamps
29
*/
30
@Override
31
protected boolean supportsKafkaTimestamps();
32
33
/**
34
* Creates a Kafka 0.9 table source with the provided configuration.
35
*
36
* @param schema Schema of the produced table
37
* @param proctimeAttribute Field name of the processing time attribute
38
* @param rowtimeAttributeDescriptors Descriptor for rowtime attributes
39
* @param fieldMapping Mapping for table schema fields to physical returned type fields
40
* @param topic Kafka topic to consume
41
* @param properties Properties for the Kafka consumer
42
* @param deserializationSchema Deserialization schema for decoding records from Kafka
43
* @param startupMode Startup mode for the contained consumer
44
* @param specificStartupOffsets Specific startup offsets (when using SPECIFIC_OFFSETS mode)
45
* @return Configured Kafka09TableSource instance
46
*/
47
@Override
48
protected KafkaTableSourceBase createKafkaTableSource(
49
TableSchema schema,
50
Optional<String> proctimeAttribute,
51
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
52
Map<String, String> fieldMapping,
53
String topic,
54
Properties properties,
55
DeserializationSchema<Row> deserializationSchema,
56
StartupMode startupMode,
57
Map<KafkaTopicPartition, Long> specificStartupOffsets);
58
59
/**
60
* Creates a Kafka 0.9 table sink with the provided configuration.
61
*
62
* @param schema Schema of the table to be written
63
* @param topic Target Kafka topic
64
* @param properties Properties for the Kafka producer
65
* @param partitioner Optional custom partitioner for message distribution
66
* @param serializationSchema Serialization schema for encoding records to Kafka
67
* @return Configured Kafka09TableSink instance
68
*/
69
@Override
70
protected KafkaTableSinkBase createKafkaTableSink(
71
TableSchema schema,
72
String topic,
73
Properties properties,
74
Optional<FlinkKafkaPartitioner<Row>> partitioner,
75
SerializationSchema<Row> serializationSchema);
76
}
77
```
78
79
### Kafka09TableSource
80
81
Table source implementation for consuming Kafka 0.9 data in Table API queries.
82
83
```java { .api }
84
/**
85
* Kafka table source for Kafka 0.9 - internal implementation.
86
* Provides streaming table source capabilities for Table API and SQL.
87
*/
88
@Internal
89
public class Kafka09TableSource extends KafkaTableSourceBase {
90
91
/**
92
* Creates a Kafka 0.9 table source with full configuration options.
93
*
94
* @param schema Schema of the produced table
95
* @param proctimeAttribute Field name of the processing time attribute
96
* @param rowtimeAttributeDescriptors Descriptor for rowtime attributes
97
* @param fieldMapping Optional mapping for table schema fields to physical type fields
98
* @param topic Kafka topic to consume
99
* @param properties Properties for the Kafka consumer
100
* @param deserializationSchema Deserialization schema for decoding records
101
* @param startupMode Startup mode for the consumer
102
* @param specificStartupOffsets Specific startup offsets for SPECIFIC_OFFSETS mode
103
*/
104
public Kafka09TableSource(
105
TableSchema schema,
106
Optional<String> proctimeAttribute,
107
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
108
Optional<Map<String, String>> fieldMapping,
109
String topic,
110
Properties properties,
111
DeserializationSchema<Row> deserializationSchema,
112
StartupMode startupMode,
113
Map<KafkaTopicPartition, Long> specificStartupOffsets);
114
115
/**
116
* Creates a simple Kafka 0.9 table source with basic configuration.
117
*
118
* @param schema Schema of the produced table
119
* @param topic Kafka topic to consume
120
* @param properties Properties for the Kafka consumer
121
* @param deserializationSchema Deserialization schema for decoding records
122
*/
123
public Kafka09TableSource(
124
TableSchema schema,
125
String topic,
126
Properties properties,
127
DeserializationSchema<Row> deserializationSchema);
128
}
129
```
130
131
### Kafka09TableSink
132
133
Table sink implementation for writing to Kafka 0.9 topics from Table API queries.
134
135
```java { .api }
136
/**
137
* Kafka table sink for Kafka 0.9 - internal implementation.
138
* Provides streaming table sink capabilities for Table API and SQL.
139
*/
140
@Internal
141
public class Kafka09TableSink extends KafkaTableSinkBase {
142
143
/**
144
* Creates a Kafka 0.9 table sink with the provided configuration.
145
*
146
* @param schema Schema of the table to be written
147
* @param topic Target Kafka topic
148
* @param properties Properties for the Kafka producer
149
* @param partitioner Optional custom partitioner for message distribution
150
* @param serializationSchema Serialization schema for encoding records
151
*/
152
public Kafka09TableSink(
153
TableSchema schema,
154
String topic,
155
Properties properties,
156
Optional<FlinkKafkaPartitioner<Row>> partitioner,
157
SerializationSchema<Row> serializationSchema);
158
}
159
```
160
161
## Usage Examples
162
163
### Table API with Kafka Source
164
165
```java
166
import org.apache.flink.table.api.EnvironmentSettings;
167
import org.apache.flink.table.api.TableEnvironment;
168
import org.apache.flink.table.api.Table;
169
170
// Create table environment
171
EnvironmentSettings settings = EnvironmentSettings.newInstance()
172
.useBlinkPlanner()
173
.inStreamingMode()
174
.build();
175
TableEnvironment tableEnv = TableEnvironment.create(settings);
176
177
// Create Kafka source table using DDL
178
tableEnv.executeSql(
179
"CREATE TABLE kafka_source (" +
180
" user_id STRING," +
181
" event_time TIMESTAMP(3)," +
182
" event_type STRING," +
183
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
184
") WITH (" +
185
" 'connector' = 'kafka-0.9'," +
186
" 'topic' = 'user-events'," +
187
" 'properties.bootstrap.servers' = 'localhost:9092'," +
188
" 'properties.group.id' = 'table-consumer-group'," +
189
" 'format' = 'json'" +
190
")"
191
);
192
193
// Query the Kafka source
194
Table result = tableEnv.sqlQuery(
195
"SELECT user_id, COUNT(*) as event_count " +
196
"FROM kafka_source " +
197
"WHERE event_type = 'login' " +
198
"GROUP BY user_id"
199
);
200
```
201
202
### Table API with Kafka Sink
203
204
```java
205
// Create Kafka sink table
206
tableEnv.executeSql(
207
"CREATE TABLE kafka_sink (" +
208
" user_id STRING," +
209
" event_count BIGINT" +
210
") WITH (" +
211
" 'connector' = 'kafka-0.9'," +
212
" 'topic' = 'processed-events'," +
213
" 'properties.bootstrap.servers' = 'localhost:9092'," +
214
" 'format' = 'json'" +
215
")"
216
);
217
218
// Insert query results into Kafka
219
result.executeInsert("kafka_sink");
220
```
221
222
### Programmatic Table Source Creation
223
224
```java
225
import org.apache.flink.api.common.serialization.DeserializationSchema;
226
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
227
import org.apache.flink.table.api.DataTypes;
228
import org.apache.flink.table.api.TableSchema;
229
import org.apache.flink.streaming.connectors.kafka.Kafka09TableSource;
230
import java.util.Properties;
231
232
// Define table schema
233
TableSchema schema = TableSchema.builder()
234
.field("user_id", DataTypes.STRING())
235
.field("timestamp", DataTypes.TIMESTAMP(3))
236
.field("action", DataTypes.STRING())
237
.field("value", DataTypes.DOUBLE())
238
.build();
239
240
// Configure Kafka properties
241
Properties properties = new Properties();
242
properties.setProperty("bootstrap.servers", "localhost:9092");
243
properties.setProperty("group.id", "programmatic-consumer");
244
245
// Create JSON deserializer
246
DeserializationSchema<Row> deserializer = new JsonRowDeserializationSchema.Builder(
247
schema.toRowType()
248
).build();
249
250
// Create Kafka table source
251
Kafka09TableSource kafkaSource = new Kafka09TableSource(
252
schema,
253
"user-actions",
254
properties,
255
deserializer
256
);
257
258
// Register as table
259
tableEnv.registerTableSource("user_actions", kafkaSource);
260
261
// Use in SQL
262
Table queryResult = tableEnv.sqlQuery(
263
"SELECT user_id, SUM(value) as total_value " +
264
"FROM user_actions " +
265
"WHERE action = 'purchase' " +
266
"GROUP BY user_id"
267
);
268
```
269
270
### Complex Table Processing Pipeline
271
272
```java
273
// Multiple Kafka sources and sinks
274
tableEnv.executeSql(
275
"CREATE TABLE orders (" +
276
" order_id STRING," +
277
" customer_id STRING," +
278
" product_id STRING," +
279
" quantity INT," +
280
" price DECIMAL(10,2)," +
281
" order_time TIMESTAMP(3)," +
282
" WATERMARK FOR order_time AS order_time - INTERVAL '1' MINUTE" +
283
") WITH (" +
284
" 'connector' = 'kafka-0.9'," +
285
" 'topic' = 'orders'," +
286
" 'properties.bootstrap.servers' = 'localhost:9092'," +
287
" 'properties.group.id' = 'order-processor'," +
288
" 'format' = 'json'" +
289
")"
290
);
291
292
tableEnv.executeSql(
293
"CREATE TABLE order_summary (" +
294
" window_start TIMESTAMP(3)," +
295
" window_end TIMESTAMP(3)," +
296
" total_orders BIGINT," +
297
" total_revenue DECIMAL(12,2)," +
298
" avg_order_value DECIMAL(10,2)" +
299
") WITH (" +
300
" 'connector' = 'kafka-0.9'," +
301
" 'topic' = 'order-summary'," +
302
" 'properties.bootstrap.servers' = 'localhost:9092'," +
303
" 'format' = 'json'" +
304
")"
305
);
306
307
// Windowed aggregation query
308
tableEnv.executeSql(
309
"INSERT INTO order_summary " +
310
"SELECT " +
311
" TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start," +
312
" TUMBLE_END(order_time, INTERVAL '1' HOUR) as window_end," +
313
" COUNT(*) as total_orders," +
314
" SUM(quantity * price) as total_revenue," +
315
" AVG(quantity * price) as avg_order_value " +
316
"FROM orders " +
317
"GROUP BY TUMBLE(order_time, INTERVAL '1' HOUR)"
318
);
319
```
320
321
### Custom Format Integration
322
323
```java
324
import org.apache.flink.api.common.serialization.DeserializationSchema;
325
import org.apache.flink.api.common.typeinfo.TypeInformation;
326
import org.apache.flink.types.Row;
327
328
// Custom CSV-like format deserializer
329
DeserializationSchema<Row> csvDeserializer = new DeserializationSchema<Row>() {
330
@Override
331
public Row deserialize(byte[] message) throws IOException {
332
String line = new String(message);
333
String[] fields = line.split(",");
334
335
Row row = new Row(3);
336
row.setField(0, fields[0]); // id
337
row.setField(1, fields[1]); // name
338
row.setField(2, Double.parseDouble(fields[2])); // value
339
return row;
340
}
341
342
@Override
343
public boolean isEndOfStream(Row nextElement) {
344
return false;
345
}
346
347
@Override
348
public TypeInformation<Row> getProducedType() {
349
return Types.ROW(Types.STRING, Types.STRING, Types.DOUBLE);
350
}
351
};
352
353
TableSchema csvSchema = TableSchema.builder()
354
.field("id", DataTypes.STRING())
355
.field("name", DataTypes.STRING())
356
.field("value", DataTypes.DOUBLE())
357
.build();
358
359
Kafka09TableSource csvSource = new Kafka09TableSource(
360
csvSchema,
361
"csv-data",
362
properties,
363
csvDeserializer
364
);
365
```
366
367
## Configuration Options
368
369
### Connector Properties
370
371
```sql
372
-- Basic Kafka 0.9 connector configuration
373
'connector' = 'kafka-0.9'
374
'topic' = 'my-topic' -- Required: Kafka topic name
375
'properties.bootstrap.servers' = 'localhost:9092' -- Required: Kafka broker addresses
376
'properties.group.id' = 'my-consumer-group' -- Required for sources: Consumer group ID
377
378
-- Consumer-specific properties (sources)
379
'properties.auto.offset.reset' = 'earliest' -- latest, earliest, none
380
'properties.enable.auto.commit' = 'false' -- Flink manages commits
381
'properties.fetch.min.bytes' = '1024' -- Minimum fetch size
382
'properties.max.partition.fetch.bytes' = '1048576' -- Maximum per-partition fetch
383
384
-- Producer-specific properties (sinks)
385
'properties.acks' = '1' -- 0, 1, all
386
'properties.retries' = '3' -- Retry count
387
'properties.batch.size' = '16384' -- Batch size in bytes
388
'properties.linger.ms' = '5' -- Batch linger time
389
'properties.compression.type' = 'snappy' -- none, gzip, snappy, lz4
390
391
-- Format configuration
392
'format' = 'json' -- json, csv, avro, etc.
393
'json.fail-on-missing-field' = 'false' -- Handle missing JSON fields
394
'json.ignore-parse-errors' = 'true' -- Skip malformed records
395
```
396
397
### Startup Modes
398
399
```sql
400
-- Start from earliest available offset
401
'scan.startup.mode' = 'earliest-offset'
402
403
-- Start from latest available offset
404
'scan.startup.mode' = 'latest-offset'
405
406
-- Start from consumer group's committed offset
407
'scan.startup.mode' = 'group-offsets'
408
409
-- Start from specific offsets (Kafka 0.10+ feature, limited in 0.9)
410
'scan.startup.mode' = 'specific-offsets'
411
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
412
```
413
414
## Limitations and Considerations
415
416
### Kafka 0.9 Specific Limitations
417
418
- **No timestamp support**: Cannot use message timestamps for watermark generation
419
- **Limited offset management**: Fewer startup mode options compared to newer versions
420
- **No exactly-once semantics**: Producer doesn't support transactions or idempotence
421
- **Basic consumer features**: Missing some advanced consumer configuration options
422
423
### Table API Integration Notes
424
425
- Sources and sinks are created through the factory pattern
426
- Schema must be explicitly defined in DDL or programmatically
427
- Time attributes require careful configuration for event time processing
428
- Custom formats require implementing appropriate serialization/deserialization schemas
429
430
### Performance Considerations
431
432
- Use appropriate batch sizes and linger times for producers
433
- Configure consumer fetch sizes based on message volume
434
- Consider partition count vs parallelism for optimal throughput
435
- Monitor consumer lag and producer throughput metrics