or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md
tile.json

table-api.mddocs/

Table API Integration

SQL and Table API support through dynamic table factories for declarative stream processing with Kinesis sources and sinks, enabling integration with Flink's unified batch and stream processing APIs.

Capabilities

KinesisDynamicTableFactory

Factory class for creating Kinesis table sources and sinks that integrate with Flink's Table API ecosystem.

@Internal
public class KinesisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    
    public static final String IDENTIFIER = "kinesis";
    
    /**
     * Create a dynamic table source for reading from Kinesis.
     *
     * @param context Factory context with table schema and options
     * @return Configured Kinesis table source
     */
    public DynamicTableSource createDynamicTableSource(Context context);
    
    /**
     * Create a dynamic table sink for writing to Kinesis.
     *
     * @param context Factory context with table schema and options
     * @return Configured Kinesis table sink
     */
    public DynamicTableSink createDynamicTableSink(Context context);
    
    /**
     * Get the factory identifier for table DDL.
     *
     * @return Factory identifier string
     */
    public String factoryIdentifier();
    
    /**
     * Get required configuration options.
     *
     * @return Set of required configuration options
     */
    public Set<ConfigOption<?>> requiredOptions();
    
    /**
     * Get optional configuration options.
     *
     * @return Set of optional configuration options
     */
    public Set<ConfigOption<?>> optionalOptions();
    
    /**
     * Validate Kinesis partitioner configuration.
     *
     * @param tableOptions Table configuration options
     * @param targetTable Catalog table definition
     */
    public static void validateKinesisPartitioner(ReadableConfig tableOptions, CatalogTable targetTable);
}

KinesisDynamicSource

Dynamic table source implementation for reading from Kinesis streams in Table API queries.

@Internal
public class KinesisDynamicSource implements ScanTableSource, SupportsReadingMetadata {
    
    /**
     * Get the change log mode supported by this source.
     *
     * @return Change log mode (INSERT only for Kinesis)
     */
    public ChangelogMode getChangelogMode();
    
    /**
     * Create the actual source function for reading data.
     *
     * @param context Source function context
     * @return Configured source function
     */
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context);
    
    /**
     * Copy the source with projection applied.
     *
     * @param projectedFields Projected field indices
     * @return New source with projection
     */
    public DynamicTableSource copy();
    
    /**
     * Get summary string for debugging.
     *
     * @return Summary string
     */
    public String asSummaryString();
}

KinesisDynamicSink

Dynamic table sink implementation for writing to Kinesis streams from Table API queries.

@Internal
public class KinesisDynamicSink implements DynamicTableSink {
    
    /**
     * Get the change log mode accepted by this sink.
     *
     * @param requestedMode Requested change log mode
     * @return Accepted change log mode
     */
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
    
    /**
     * Create the actual sink function for writing data.
     *
     * @param context Sink function context
     * @return Configured sink function
     */
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
    
    /**
     * Copy the sink with updated configuration.
     *
     * @return New sink copy
     */
    public DynamicTableSink copy();
    
    /**
     * Get summary string for debugging.
     *
     * @return Summary string
     */
    public String asSummaryString();
}

Usage Examples

Creating Kinesis Tables with DDL

-- Create a Kinesis source table
CREATE TABLE kinesis_source (
    event_id STRING,
    user_id BIGINT,
    event_type STRING,
    timestamp_col TIMESTAMP(3),
    payload ROW<
        action STRING,
        properties MAP<STRING, STRING>
    >,
    -- Kinesis metadata columns
    kinesis_partition_key STRING METADATA FROM 'partition-key',
    kinesis_sequence_number STRING METADATA FROM 'sequence-number',
    kinesis_shard_id STRING METADATA FROM 'shard-id',
    kinesis_stream_name STRING METADATA FROM 'stream-name',
    kinesis_arrival_timestamp TIMESTAMP(3) METADATA FROM 'arrival-timestamp',
    -- Watermark for event time processing
    WATERMARK FOR timestamp_col AS timestamp_col - INTERVAL '30' SECOND
) WITH (
    'connector' = 'kinesis',
    'stream' = 'user-events',
    'aws.region' = 'us-west-2',
    'aws.credentials.provider' = 'AUTO',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json'
);

-- Create a Kinesis sink table
CREATE TABLE kinesis_sink (
    processed_event_id STRING,
    user_id BIGINT,
    aggregated_count BIGINT,
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3)
) WITH (
    'connector' = 'kinesis',
    'stream' = 'processed-events',
    'aws.region' = 'us-west-2',
    'aws.credentials.provider' = 'AUTO',
    'format' = 'json',
    'sink.partitioner' = 'fixed'
);

Real-Time Analytics Query

-- Real-time user event aggregation
INSERT INTO kinesis_sink
SELECT 
    CONCAT('agg_', event_id) as processed_event_id,
    user_id,
    COUNT(*) as aggregated_count,
    TUMBLE_START(timestamp_col, INTERVAL '5' MINUTE) as window_start,
    TUMBLE_END(timestamp_col, INTERVAL '5' MINUTE) as window_end
FROM kinesis_source
WHERE event_type = 'page_view'
GROUP BY 
    user_id,
    event_id,
    TUMBLE(timestamp_col, INTERVAL '5' MINUTE);

Multi-Stream Processing

-- Create multiple Kinesis source tables
CREATE TABLE orders_stream (
    order_id STRING,
    customer_id STRING,
    product_id STRING,
    quantity INT,
    price DECIMAL(10,2),
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kinesis',
    'stream' = 'orders',
    'aws.region' = 'us-west-2',
    'format' = 'json'
);

CREATE TABLE inventory_stream (
    product_id STRING,
    available_quantity INT,
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kinesis',
    'stream' = 'inventory-updates',
    'aws.region' = 'us-west-2',
    'format' = 'json'
);

-- Join streams for real-time inventory management
CREATE TABLE inventory_alerts (
    product_id STRING,
    order_quantity INT,
    available_quantity INT,
    alert_message STRING,
    alert_time TIMESTAMP(3)
) WITH (
    'connector' = 'kinesis',
    'stream' = 'inventory-alerts',
    'aws.region' = 'us-west-2',
    'format' = 'json'
);

INSERT INTO inventory_alerts
SELECT 
    o.product_id,
    SUM(o.quantity) as order_quantity,
    LAST_VALUE(i.available_quantity) as available_quantity,
    CASE 
        WHEN LAST_VALUE(i.available_quantity) < SUM(o.quantity) 
        THEN 'LOW_STOCK_ALERT'
        ELSE 'STOCK_OK'
    END as alert_message,
    CURRENT_TIMESTAMP as alert_time
FROM orders_stream o
LEFT JOIN inventory_stream i 
    ON o.product_id = i.product_id
    AND i.update_time BETWEEN o.order_time - INTERVAL '1' HOUR AND o.order_time + INTERVAL '5' MINUTE
GROUP BY 
    o.product_id,
    TUMBLE(o.order_time, INTERVAL '1' MINUTE);

DynamoDB Streams Integration

-- Create table for DynamoDB Streams
CREATE TABLE dynamodb_changes (
    event_name STRING,
    table_name STRING,
    partition_key STRING,
    sort_key STRING,
    old_image ROW<
        user_id STRING,
        username STRING,
        email STRING
    >,
    new_image ROW<
        user_id STRING,
        username STRING,
        email STRING
    >,
    approximate_creation_time TIMESTAMP(3),
    WATERMARK FOR approximate_creation_time AS approximate_creation_time - INTERVAL '1' MINUTE
) WITH (
    'connector' = 'kinesis',
    'stream' = 'arn:aws:dynamodb:us-west-2:123456789012:table/Users/stream/2023-01-01T00:00:00.000',
    'aws.region' = 'us-west-2',
    'format' = 'json'
);

-- Create change log for audit purposes
CREATE TABLE user_audit_log (
    change_id STRING,
    user_id STRING,
    change_type STRING,
    old_values STRING,
    new_values STRING,
    change_timestamp TIMESTAMP(3)
) WITH (
    'connector' = 'kinesis',
    'stream' = 'user-audit-log',
    'aws.region' = 'us-west-2',
    'format' = 'json'
);

INSERT INTO user_audit_log
SELECT 
    CONCAT(table_name, '_', partition_key, '_', UNIX_TIMESTAMP(approximate_creation_time)) as change_id,
    partition_key as user_id,
    event_name as change_type,
    CASE WHEN old_image IS NOT NULL THEN CAST(old_image AS STRING) ELSE NULL END as old_values,
    CASE WHEN new_image IS NOT NULL THEN CAST(new_image AS STRING) ELSE NULL END as new_values,
    approximate_creation_time as change_timestamp
FROM dynamodb_changes
WHERE event_name IN ('INSERT', 'MODIFY', 'REMOVE');

Complex Event Processing

-- Create pattern detection table
CREATE TABLE user_behavior_events (
    user_id STRING,
    event_type STRING,
    page_url STRING,
    session_id STRING,
    event_timestamp TIMESTAMP(3),
    WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
    'connector' = 'kinesis',
    'stream' = 'user-behavior',
    'aws.region' = 'us-west-2',
    'format' = 'json'
);

-- Fraud detection patterns
CREATE TABLE fraud_alerts (
    user_id STRING,
    alert_type STRING,
    event_count BIGINT,
    time_window_start TIMESTAMP(3),
    time_window_end TIMESTAMP(3),
    alert_timestamp TIMESTAMP(3)
) WITH (
    'connector' = 'kinesis',
    'stream' = 'fraud-alerts',
    'aws.region' = 'us-west-2',
    'format' = 'json'
);

-- Detect suspicious patterns (too many events in short time)
INSERT INTO fraud_alerts
SELECT 
    user_id,
    'HIGH_FREQUENCY_ACTIVITY' as alert_type,
    COUNT(*) as event_count,
    TUMBLE_START(event_timestamp, INTERVAL '1' MINUTE) as time_window_start,
    TUMBLE_END(event_timestamp, INTERVAL '1' MINUTE) as time_window_end,
    CURRENT_TIMESTAMP as alert_timestamp
FROM user_behavior_events
GROUP BY 
    user_id,
    TUMBLE(event_timestamp, INTERVAL '1' MINUTE)
HAVING COUNT(*) > 100;  -- More than 100 events per minute

Temporal Table Joins

-- Create product catalog table (changelog stream)
CREATE TABLE product_catalog (
    product_id STRING,
    product_name STRING,
    category STRING,
    price DECIMAL(10,2),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time - INTERVAL '10' SECOND,
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'connector' = 'kinesis',
    'stream' = 'product-catalog-changes',
    'aws.region' = 'us-west-2',
    'format' = 'json'
);

-- Create versioned table for temporal joins
CREATE TABLE product_catalog_versioned (
    product_id STRING,
    product_name STRING,
    category STRING,
    price DECIMAL(10,2),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time - INTERVAL '10' SECOND,
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'connector' = 'kinesis',
    'stream' = 'product-catalog-changes',
    'aws.region' = 'us-west-2',
    'format' = 'json'
);

-- Join orders with product information as of order time
CREATE TABLE enriched_orders (
    order_id STRING,
    customer_id STRING,
    product_id STRING,
    product_name STRING,
    category STRING,
    quantity INT,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(10,2),
    order_time TIMESTAMP(3)
) WITH (
    'connector' = 'kinesis',
    'stream' = 'enriched-orders',
    'aws.region' = 'us-west-2',
    'format' = 'json'
);

INSERT INTO enriched_orders
SELECT 
    o.order_id,
    o.customer_id,
    o.product_id,
    p.product_name,
    p.category,
    o.quantity,
    p.price as unit_price,
    o.quantity * p.price as total_amount,
    o.order_time
FROM orders_stream o
LEFT JOIN product_catalog_versioned FOR SYSTEM_TIME AS OF o.order_time AS p
    ON o.product_id = p.product_id;

Java Table API Examples

Programmatic Table Creation

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.DataTypes;

// Create Table Environment
EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    .build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// Create Kinesis source table programmatically
tableEnv.createTemporaryTable("kinesis_events",
    TableDescriptor.forConnector("kinesis")
        .schema(Schema.newBuilder()
            .column("event_id", DataTypes.STRING())
            .column("user_id", DataTypes.BIGINT())
            .column("event_type", DataTypes.STRING())
            .column("timestamp_col", DataTypes.TIMESTAMP(3))
            .column("kinesis_partition_key", DataTypes.STRING())
                .metadata("partition-key")
            .column("kinesis_sequence_number", DataTypes.STRING())
                .metadata("sequence-number")
            .watermark("timestamp_col", "timestamp_col - INTERVAL '30' SECOND")
            .build())
        .option("stream", "user-events")
        .option("aws.region", "us-west-2")
        .option("aws.credentials.provider", "AUTO")
        .option("scan.stream.initpos", "LATEST")
        .option("format", "json")
        .build());

// Execute query
Table result = tableEnv.sqlQuery(
    "SELECT user_id, COUNT(*) as event_count " +
    "FROM kinesis_events " +
    "WHERE event_type = 'click' " +
    "GROUP BY user_id"
);

// Write to another Kinesis stream
result.executeInsert("kinesis_sink");

Custom Formats and Serialization

// Register custom format
tableEnv.executeSql(
    "CREATE TABLE custom_format_table (" +
    "  data STRING," +
    "  metadata_field STRING METADATA FROM 'partition-key'" +
    ") WITH (" +
    "  'connector' = 'kinesis'," +
    "  'stream' = 'custom-format-stream'," +
    "  'aws.region' = 'us-west-2'," +
    "  'format' = 'avro'," +
    "  'avro.schema' = '{" +
    "    \"type\": \"record\"," +
    "    \"name\": \"CustomEvent\"," +
    "    \"fields\": [" +
    "      {\"name\": \"data\", \"type\": \"string\"}" +
    "    ]" +
    "  }'" +
    ")"
);

Configuration Options

Common Table Options

# Required options
connector = kinesis
stream = my-stream-name
aws.region = us-west-2

# Authentication options
aws.credentials.provider = AUTO | BASIC | PROFILE | ASSUME_ROLE | ENV_VAR | SYS_PROP
aws.access-key-id = your-access-key
aws.secret-access-key = your-secret-key

# Source-specific options
scan.stream.initpos = LATEST | TRIM_HORIZON | AT_TIMESTAMP
scan.stream.initpos.timestamp = 2023-01-01T00:00:00Z
scan.shard.getrecords.maxrecordcount = 10000
scan.shard.getrecords.intervalmillis = 200

# Sink-specific options
sink.partitioner = fixed | random | custom
sink.partitioner.field-delimiter = |
sink.flush-buffer.size = 1000
sink.flush-buffer.timeout = 2s

# Format options
format = json | avro | csv | raw

Advanced Configuration

-- Enhanced Fan-Out configuration
CREATE TABLE efo_source (
    data STRING
) WITH (
    'connector' = 'kinesis',
    'stream' = 'my-stream',
    'aws.region' = 'us-west-2',
    'scan.stream.recordpublisher' = 'EFO',
    'scan.stream.efo.consumername' = 'my-flink-app',
    'scan.stream.efo.registration' = 'LAZY'
);

-- Custom partitioning for sink
CREATE TABLE partitioned_sink (
    user_id STRING,
    data STRING
) WITH (
    'connector' = 'kinesis',
    'stream' = 'partitioned-output',
    'aws.region' = 'us-west-2',
    'format' = 'json',
    'sink.partitioner' = 'custom',
    'sink.partitioner.class' = 'com.example.MyCustomPartitioner'
);

Best Practices

  1. Schema Evolution: Use flexible formats like JSON or Avro for schema evolution
  2. Watermarks: Configure appropriate watermark strategies for event-time processing
  3. Metadata: Leverage Kinesis metadata columns for debugging and monitoring
  4. Partitioning: Choose appropriate partitioning strategies for optimal performance
  5. Error Handling: Implement proper error handling and dead letter queues
  6. Resource Management: Configure appropriate parallelism and resource allocation
  7. Monitoring: Use Flink metrics and Kinesis CloudWatch metrics for monitoring