Table sources and sinks for SQL layer integration supporting various data formats (JSON, Avro) with automatic schema inference, connector descriptors, and streaming table operations. This enables declarative SQL queries over Kafka streams.
Abstract base class for Kafka table sources providing streaming table functionality with support for projection pushdown, filter pushdown, and watermark extraction.
public abstract class KafkaTableSource implements StreamTableSource<Row>,
DefinedProctimeAttribute, DefinedRowtimeAttributes {
// Abstract methods implemented by concrete versions
protected abstract FlinkKafkaConsumerBase<Row> createKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema
);
}Key Interfaces:
StreamTableSource<Row> - Provides streaming data source for table APIDefinedProctimeAttribute - Supports processing time attribute definitionDefinedRowtimeAttributes - Supports event time attribute definitionAbstract base class for Kafka table sinks providing streaming table output functionality with partitioning support.
public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
// Abstract methods implemented by concrete versions
protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
FlinkKafkaPartitioner<Row> partitioner
);
}Key Interface:
AppendStreamTableSink<Row> - Supports append-only table sink operationsTable source for JSON-formatted Kafka messages with field mapping support and schema inference.
public abstract class KafkaJsonTableSource extends KafkaTableSource implements DefinedFieldMapping {
public KafkaJsonTableSource(
TableSchema schema,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
boolean failOnMissingField,
boolean ignoreParseErrors
);
}Parameters:
schema - Table schema defining column names and typesproctimeAttribute - Optional processing time attribute namerowtimeAttributeDescriptors - Event time attribute descriptorsfieldMapping - Optional mapping from table fields to JSON fieldstopic - Kafka topic to consume fromproperties - Kafka consumer propertiesdeserializationSchema - Row deserialization schemafailOnMissingField - Whether to fail on missing JSON fieldsignoreParseErrors - Whether to skip records with parse errorsUsage Example:
// Define table schema
TableSchema schema = TableSchema.builder()
.field("user_id", DataTypes.STRING())
.field("action", DataTypes.STRING())
.field("timestamp", DataTypes.TIMESTAMP(3))
.field("proctime", DataTypes.TIMESTAMP(3))
.build();
// Create JSON table source
KafkaJsonTableSource source = new MyKafkaJsonTableSource(
schema,
Optional.of("proctime"),
Collections.emptyList(),
Optional.empty(),
"user-events",
kafkaProperties,
new JsonRowDeserializationSchema(schema.toRowType()),
false, // Don't fail on missing fields
true // Skip parse errors
);Factory for creating KafkaJsonTableSource instances from table descriptors using SQL DDL.
public abstract class KafkaJsonTableSourceFactory implements TableSourceFactory<Row> {
public Map<String, String> requiredContext();
public List<String> supportedProperties();
public TableSource<Row> createTableSource(Map<String, String> properties);
}SQL DDL Example:
CREATE TABLE user_events (
user_id STRING,
action STRING,
event_time TIMESTAMP(3),
proctime AS PROCTIME(),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test-group',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);Table sink for writing JSON-formatted messages to Kafka with optional partitioning.
public abstract class KafkaJsonTableSink extends KafkaTableSink {
public KafkaJsonTableSink(
TableSchema schema,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema
);
}Usage Example:
// Create JSON table sink
KafkaJsonTableSink sink = new MyKafkaJsonTableSink(
schema,
"output-events",
kafkaProperties,
Optional.of(new FlinkFixedPartitioner<>()),
new JsonRowSerializationSchema(schema.toRowType())
);Table source for Avro-formatted Kafka messages with field mapping support and schema registry integration.
public abstract class KafkaAvroTableSource extends KafkaTableSource implements DefinedFieldMapping {
public KafkaAvroTableSource(
TableSchema schema,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema
);
}Usage Example:
// Properties for Avro with Schema Registry
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("schema.registry.url", "http://localhost:8081");
// Create Avro table source
KafkaAvroTableSource source = new MyKafkaAvroTableSource(
schema,
Optional.empty(),
Collections.emptyList(),
Optional.empty(),
"avro-events",
props,
new AvroRowDeserializationSchema(avroSchema)
);Programmatic configuration for Kafka table sources and sinks.
public class Kafka extends ConnectorDescriptor {
public Kafka();
public Kafka version(String version);
public Kafka topic(String topic);
public Kafka properties(Properties properties);
public Kafka property(String key, String value);
public Kafka startFromEarliest();
public Kafka startFromLatest();
public Kafka startFromGroupOffsets();
public Kafka startFromSpecificOffsets(Map<Integer, Long> specificOffsets);
}Usage Example:
// Create table environment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Define source using descriptor API
tEnv.connect(
new Kafka()
.version("universal")
.topic("input-topic")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test-group")
.startFromEarliest()
)
.withFormat(new Json().failOnMissingField(false))
.withSchema(new Schema()
.field("user_id", DataTypes.STRING())
.field("action", DataTypes.STRING())
.field("timestamp", DataTypes.TIMESTAMP(3))
.field("proctime", DataTypes.TIMESTAMP(3).proctime())
.field("rowtime", DataTypes.TIMESTAMP(3).rowtime())
)
.createTemporaryTable("user_events");Validator for Kafka table descriptors ensuring proper configuration.
public class KafkaValidator extends ConnectorDescriptorValidator {
public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
public static final String CONNECTOR_VERSION = "connector.version";
public static final String CONNECTOR_TOPIC = "connector.topic";
public static final String CONNECTOR_PROPERTIES = "connector.properties";
public static final String CONNECTOR_STARTUP_MODE = "connector.startup-mode";
public static final String CONNECTOR_SPECIFIC_OFFSETS = "connector.specific-offsets";
}Define processing time attribute for time-based operations:
// In table source constructor
Optional<String> proctimeAttribute = Optional.of("proctime");
// In SQL DDL
proctime AS PROCTIME()Define event time attributes with watermark strategies:
// Rowtime descriptor with watermark strategy
List<RowtimeAttributeDescriptor> rowtimeAttributes = Arrays.asList(
new RowtimeAttributeDescriptor(
"rowtime",
new ExistingField("timestamp"),
new BoundedOutOfOrderTimestamps(5000) // 5 second out-of-orderness
)
);
// In SQL DDL
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND-- Create source table
CREATE TABLE user_events (
user_id STRING,
action STRING,
amount DECIMAL(10,2),
event_time TIMESTAMP(3),
proctime AS PROCTIME(),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- Create sink table
CREATE TABLE hourly_stats (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
user_count BIGINT,
total_amount DECIMAL(10,2)
) WITH (
'connector' = 'kafka',
'topic' = 'hourly-stats',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- Windowed aggregation query
INSERT INTO hourly_stats
SELECT
TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end,
COUNT(DISTINCT user_id) as user_count,
SUM(amount) as total_amount
FROM user_events
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);-- Kafka stream
CREATE TABLE orders (
order_id STRING,
user_id STRING,
product_id STRING,
quantity INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- Enriched output
CREATE TABLE enriched_orders (
order_id STRING,
user_id STRING,
product_name STRING,
quantity INT,
total_price DECIMAL(10,2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'enriched-orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- Join with product catalog (assuming JDBC lookup table)
INSERT INTO enriched_orders
SELECT
o.order_id,
o.user_id,
p.product_name,
o.quantity,
o.quantity * p.price as total_price,
o.order_time
FROM orders o
JOIN product_catalog FOR SYSTEM_TIME AS OF o.order_time AS p
ON o.product_id = p.product_id;Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("group.id", "flink-table-consumer");
consumerProps.setProperty("auto.offset.reset", "earliest");
consumerProps.setProperty("enable.auto.commit", "false"); // Managed by Flink
consumerProps.setProperty("max.poll.records", "500");Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
producerProps.setProperty("transaction.timeout.ms", "900000");
producerProps.setProperty("enable.idempotence", "true");
producerProps.setProperty("acks", "all");Properties avroProps = new Properties();
avroProps.setProperty("bootstrap.servers", "localhost:9092");
avroProps.setProperty("schema.registry.url", "http://localhost:8081");
avroProps.setProperty("specific.avro.reader", "true");