SQL and Table API support through dynamic table factories for declarative stream processing with Kinesis sources and sinks, enabling integration with Flink's unified batch and stream processing APIs.
Factory class for creating Kinesis table sources and sinks that integrate with Flink's Table API ecosystem.
@Internal
public class KinesisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public static final String IDENTIFIER = "kinesis";
/**
* Create a dynamic table source for reading from Kinesis.
*
* @param context Factory context with table schema and options
* @return Configured Kinesis table source
*/
public DynamicTableSource createDynamicTableSource(Context context);
/**
* Create a dynamic table sink for writing to Kinesis.
*
* @param context Factory context with table schema and options
* @return Configured Kinesis table sink
*/
public DynamicTableSink createDynamicTableSink(Context context);
/**
* Get the factory identifier for table DDL.
*
* @return Factory identifier string
*/
public String factoryIdentifier();
/**
* Get required configuration options.
*
* @return Set of required configuration options
*/
public Set<ConfigOption<?>> requiredOptions();
/**
* Get optional configuration options.
*
* @return Set of optional configuration options
*/
public Set<ConfigOption<?>> optionalOptions();
/**
* Validate Kinesis partitioner configuration.
*
* @param tableOptions Table configuration options
* @param targetTable Catalog table definition
*/
public static void validateKinesisPartitioner(ReadableConfig tableOptions, CatalogTable targetTable);
}Dynamic table source implementation for reading from Kinesis streams in Table API queries.
@Internal
public class KinesisDynamicSource implements ScanTableSource, SupportsReadingMetadata {
/**
* Get the change log mode supported by this source.
*
* @return Change log mode (INSERT only for Kinesis)
*/
public ChangelogMode getChangelogMode();
/**
* Create the actual source function for reading data.
*
* @param context Source function context
* @return Configured source function
*/
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context);
/**
* Copy the source with projection applied.
*
* @param projectedFields Projected field indices
* @return New source with projection
*/
public DynamicTableSource copy();
/**
* Get summary string for debugging.
*
* @return Summary string
*/
public String asSummaryString();
}Dynamic table sink implementation for writing to Kinesis streams from Table API queries.
@Internal
public class KinesisDynamicSink implements DynamicTableSink {
/**
* Get the change log mode accepted by this sink.
*
* @param requestedMode Requested change log mode
* @return Accepted change log mode
*/
public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
/**
* Create the actual sink function for writing data.
*
* @param context Sink function context
* @return Configured sink function
*/
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
/**
* Copy the sink with updated configuration.
*
* @return New sink copy
*/
public DynamicTableSink copy();
/**
* Get summary string for debugging.
*
* @return Summary string
*/
public String asSummaryString();
}-- Create a Kinesis source table
CREATE TABLE kinesis_source (
event_id STRING,
user_id BIGINT,
event_type STRING,
timestamp_col TIMESTAMP(3),
payload ROW<
action STRING,
properties MAP<STRING, STRING>
>,
-- Kinesis metadata columns
kinesis_partition_key STRING METADATA FROM 'partition-key',
kinesis_sequence_number STRING METADATA FROM 'sequence-number',
kinesis_shard_id STRING METADATA FROM 'shard-id',
kinesis_stream_name STRING METADATA FROM 'stream-name',
kinesis_arrival_timestamp TIMESTAMP(3) METADATA FROM 'arrival-timestamp',
-- Watermark for event time processing
WATERMARK FOR timestamp_col AS timestamp_col - INTERVAL '30' SECOND
) WITH (
'connector' = 'kinesis',
'stream' = 'user-events',
'aws.region' = 'us-west-2',
'aws.credentials.provider' = 'AUTO',
'scan.stream.initpos' = 'LATEST',
'format' = 'json'
);
-- Create a Kinesis sink table
CREATE TABLE kinesis_sink (
processed_event_id STRING,
user_id BIGINT,
aggregated_count BIGINT,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
'connector' = 'kinesis',
'stream' = 'processed-events',
'aws.region' = 'us-west-2',
'aws.credentials.provider' = 'AUTO',
'format' = 'json',
'sink.partitioner' = 'fixed'
);-- Real-time user event aggregation
INSERT INTO kinesis_sink
SELECT
CONCAT('agg_', event_id) as processed_event_id,
user_id,
COUNT(*) as aggregated_count,
TUMBLE_START(timestamp_col, INTERVAL '5' MINUTE) as window_start,
TUMBLE_END(timestamp_col, INTERVAL '5' MINUTE) as window_end
FROM kinesis_source
WHERE event_type = 'page_view'
GROUP BY
user_id,
event_id,
TUMBLE(timestamp_col, INTERVAL '5' MINUTE);-- Create multiple Kinesis source tables
CREATE TABLE orders_stream (
order_id STRING,
customer_id STRING,
product_id STRING,
quantity INT,
price DECIMAL(10,2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kinesis',
'stream' = 'orders',
'aws.region' = 'us-west-2',
'format' = 'json'
);
CREATE TABLE inventory_stream (
product_id STRING,
available_quantity INT,
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kinesis',
'stream' = 'inventory-updates',
'aws.region' = 'us-west-2',
'format' = 'json'
);
-- Join streams for real-time inventory management
CREATE TABLE inventory_alerts (
product_id STRING,
order_quantity INT,
available_quantity INT,
alert_message STRING,
alert_time TIMESTAMP(3)
) WITH (
'connector' = 'kinesis',
'stream' = 'inventory-alerts',
'aws.region' = 'us-west-2',
'format' = 'json'
);
INSERT INTO inventory_alerts
SELECT
o.product_id,
SUM(o.quantity) as order_quantity,
LAST_VALUE(i.available_quantity) as available_quantity,
CASE
WHEN LAST_VALUE(i.available_quantity) < SUM(o.quantity)
THEN 'LOW_STOCK_ALERT'
ELSE 'STOCK_OK'
END as alert_message,
CURRENT_TIMESTAMP as alert_time
FROM orders_stream o
LEFT JOIN inventory_stream i
ON o.product_id = i.product_id
AND i.update_time BETWEEN o.order_time - INTERVAL '1' HOUR AND o.order_time + INTERVAL '5' MINUTE
GROUP BY
o.product_id,
TUMBLE(o.order_time, INTERVAL '1' MINUTE);-- Create table for DynamoDB Streams
CREATE TABLE dynamodb_changes (
event_name STRING,
table_name STRING,
partition_key STRING,
sort_key STRING,
old_image ROW<
user_id STRING,
username STRING,
email STRING
>,
new_image ROW<
user_id STRING,
username STRING,
email STRING
>,
approximate_creation_time TIMESTAMP(3),
WATERMARK FOR approximate_creation_time AS approximate_creation_time - INTERVAL '1' MINUTE
) WITH (
'connector' = 'kinesis',
'stream' = 'arn:aws:dynamodb:us-west-2:123456789012:table/Users/stream/2023-01-01T00:00:00.000',
'aws.region' = 'us-west-2',
'format' = 'json'
);
-- Create change log for audit purposes
CREATE TABLE user_audit_log (
change_id STRING,
user_id STRING,
change_type STRING,
old_values STRING,
new_values STRING,
change_timestamp TIMESTAMP(3)
) WITH (
'connector' = 'kinesis',
'stream' = 'user-audit-log',
'aws.region' = 'us-west-2',
'format' = 'json'
);
INSERT INTO user_audit_log
SELECT
CONCAT(table_name, '_', partition_key, '_', UNIX_TIMESTAMP(approximate_creation_time)) as change_id,
partition_key as user_id,
event_name as change_type,
CASE WHEN old_image IS NOT NULL THEN CAST(old_image AS STRING) ELSE NULL END as old_values,
CASE WHEN new_image IS NOT NULL THEN CAST(new_image AS STRING) ELSE NULL END as new_values,
approximate_creation_time as change_timestamp
FROM dynamodb_changes
WHERE event_name IN ('INSERT', 'MODIFY', 'REMOVE');-- Create pattern detection table
CREATE TABLE user_behavior_events (
user_id STRING,
event_type STRING,
page_url STRING,
session_id STRING,
event_timestamp TIMESTAMP(3),
WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
'connector' = 'kinesis',
'stream' = 'user-behavior',
'aws.region' = 'us-west-2',
'format' = 'json'
);
-- Fraud detection patterns
CREATE TABLE fraud_alerts (
user_id STRING,
alert_type STRING,
event_count BIGINT,
time_window_start TIMESTAMP(3),
time_window_end TIMESTAMP(3),
alert_timestamp TIMESTAMP(3)
) WITH (
'connector' = 'kinesis',
'stream' = 'fraud-alerts',
'aws.region' = 'us-west-2',
'format' = 'json'
);
-- Detect suspicious patterns (too many events in short time)
INSERT INTO fraud_alerts
SELECT
user_id,
'HIGH_FREQUENCY_ACTIVITY' as alert_type,
COUNT(*) as event_count,
TUMBLE_START(event_timestamp, INTERVAL '1' MINUTE) as time_window_start,
TUMBLE_END(event_timestamp, INTERVAL '1' MINUTE) as time_window_end,
CURRENT_TIMESTAMP as alert_timestamp
FROM user_behavior_events
GROUP BY
user_id,
TUMBLE(event_timestamp, INTERVAL '1' MINUTE)
HAVING COUNT(*) > 100; -- More than 100 events per minute-- Create product catalog table (changelog stream)
CREATE TABLE product_catalog (
product_id STRING,
product_name STRING,
category STRING,
price DECIMAL(10,2),
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time - INTERVAL '10' SECOND,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'kinesis',
'stream' = 'product-catalog-changes',
'aws.region' = 'us-west-2',
'format' = 'json'
);
-- Create versioned table for temporal joins
CREATE TABLE product_catalog_versioned (
product_id STRING,
product_name STRING,
category STRING,
price DECIMAL(10,2),
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time - INTERVAL '10' SECOND,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'kinesis',
'stream' = 'product-catalog-changes',
'aws.region' = 'us-west-2',
'format' = 'json'
);
-- Join orders with product information as of order time
CREATE TABLE enriched_orders (
order_id STRING,
customer_id STRING,
product_id STRING,
product_name STRING,
category STRING,
quantity INT,
unit_price DECIMAL(10,2),
total_amount DECIMAL(10,2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kinesis',
'stream' = 'enriched-orders',
'aws.region' = 'us-west-2',
'format' = 'json'
);
INSERT INTO enriched_orders
SELECT
o.order_id,
o.customer_id,
o.product_id,
p.product_name,
p.category,
o.quantity,
p.price as unit_price,
o.quantity * p.price as total_amount,
o.order_time
FROM orders_stream o
LEFT JOIN product_catalog_versioned FOR SYSTEM_TIME AS OF o.order_time AS p
ON o.product_id = p.product_id;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.DataTypes;
// Create Table Environment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Create Kinesis source table programmatically
tableEnv.createTemporaryTable("kinesis_events",
TableDescriptor.forConnector("kinesis")
.schema(Schema.newBuilder()
.column("event_id", DataTypes.STRING())
.column("user_id", DataTypes.BIGINT())
.column("event_type", DataTypes.STRING())
.column("timestamp_col", DataTypes.TIMESTAMP(3))
.column("kinesis_partition_key", DataTypes.STRING())
.metadata("partition-key")
.column("kinesis_sequence_number", DataTypes.STRING())
.metadata("sequence-number")
.watermark("timestamp_col", "timestamp_col - INTERVAL '30' SECOND")
.build())
.option("stream", "user-events")
.option("aws.region", "us-west-2")
.option("aws.credentials.provider", "AUTO")
.option("scan.stream.initpos", "LATEST")
.option("format", "json")
.build());
// Execute query
Table result = tableEnv.sqlQuery(
"SELECT user_id, COUNT(*) as event_count " +
"FROM kinesis_events " +
"WHERE event_type = 'click' " +
"GROUP BY user_id"
);
// Write to another Kinesis stream
result.executeInsert("kinesis_sink");// Register custom format
tableEnv.executeSql(
"CREATE TABLE custom_format_table (" +
" data STRING," +
" metadata_field STRING METADATA FROM 'partition-key'" +
") WITH (" +
" 'connector' = 'kinesis'," +
" 'stream' = 'custom-format-stream'," +
" 'aws.region' = 'us-west-2'," +
" 'format' = 'avro'," +
" 'avro.schema' = '{" +
" \"type\": \"record\"," +
" \"name\": \"CustomEvent\"," +
" \"fields\": [" +
" {\"name\": \"data\", \"type\": \"string\"}" +
" ]" +
" }'" +
")"
);# Required options
connector = kinesis
stream = my-stream-name
aws.region = us-west-2
# Authentication options
aws.credentials.provider = AUTO | BASIC | PROFILE | ASSUME_ROLE | ENV_VAR | SYS_PROP
aws.access-key-id = your-access-key
aws.secret-access-key = your-secret-key
# Source-specific options
scan.stream.initpos = LATEST | TRIM_HORIZON | AT_TIMESTAMP
scan.stream.initpos.timestamp = 2023-01-01T00:00:00Z
scan.shard.getrecords.maxrecordcount = 10000
scan.shard.getrecords.intervalmillis = 200
# Sink-specific options
sink.partitioner = fixed | random | custom
sink.partitioner.field-delimiter = |
sink.flush-buffer.size = 1000
sink.flush-buffer.timeout = 2s
# Format options
format = json | avro | csv | raw-- Enhanced Fan-Out configuration
CREATE TABLE efo_source (
data STRING
) WITH (
'connector' = 'kinesis',
'stream' = 'my-stream',
'aws.region' = 'us-west-2',
'scan.stream.recordpublisher' = 'EFO',
'scan.stream.efo.consumername' = 'my-flink-app',
'scan.stream.efo.registration' = 'LAZY'
);
-- Custom partitioning for sink
CREATE TABLE partitioned_sink (
user_id STRING,
data STRING
) WITH (
'connector' = 'kinesis',
'stream' = 'partitioned-output',
'aws.region' = 'us-west-2',
'format' = 'json',
'sink.partitioner' = 'custom',
'sink.partitioner.class' = 'com.example.MyCustomPartitioner'
);