or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md
tile.json

configuration.mddocs/

Configuration Management

Comprehensive configuration system for AWS credentials, regions, consumer behavior, producer settings, and advanced features like Enhanced Fan-Out and watermark management.

Capabilities

AWS Configuration

Base configuration constants for AWS service access, credential management, and regional settings.

@PublicEvolving
public class AWSConfigConstants {
    
    // Core AWS Configuration
    public static final String AWS_REGION = "aws.region";
    public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
    public static final String AWS_ENDPOINT = "aws.endpoint";
    
    // Credential Provider Types
    public enum CredentialProvider {
        ENV_VAR,             // Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
        SYS_PROP,            // System properties (aws.accessKeyId, aws.secretKey)
        PROFILE,             // AWS profile file (~/.aws/credentials)
        BASIC,               // Basic access key/secret key
        ASSUME_ROLE,         // IAM role assumption
        WEB_IDENTITY_TOKEN,  // Web identity token for OIDC
        AUTO                 // Automatic credential chain
    }
    
    // Helper methods for prefixed configuration keys
    public static String accessKeyId(String prefix);
    public static String secretKey(String prefix);
    public static String profilePath(String prefix);
    public static String profileName(String prefix);
    public static String roleArn(String prefix);
    public static String roleSessionName(String prefix);
    public static String externalId(String prefix);
    public static String roleCredentialsProvider(String prefix);
    public static String webIdentityTokenFile(String prefix);
}

Consumer Configuration

Consumer-specific configuration constants for stream positioning, shard management, Enhanced Fan-Out, and watermark handling.

@PublicEvolving
public class ConsumerConfigConstants extends AWSConfigConstants {
    
    // Stream Configuration
    public static final String RECORD_PUBLISHER_TYPE = "flink.stream.recordpublisher";
    public static final String EFO_CONSUMER_NAME = "flink.stream.efo.consumername";
    public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
    public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
    
    // Shard Configuration  
    public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
    public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
    public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval";
    public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptivereads";
    
    // Watermark Configuration
    public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";
    public static final String WATERMARK_LOOKAHEAD_MILLIS = "flink.watermark.lookahead.millis";
    
    // Enhanced Fan-Out Configuration
    public static final String EFO_REGISTRATION_TYPE = "flink.stream.efo.registration";
    
    // Stream Initial Position Options
    public enum InitialPosition {
        TRIM_HORIZON,  // Start from oldest available record
        LATEST,        // Start from latest record
        AT_TIMESTAMP   // Start from specific timestamp
    }
    
    // Record Publisher Types
    public enum RecordPublisherType {
        EFO,      // Enhanced Fan-Out with dedicated throughput
        POLLING   // Standard polling with shared throughput  
    }
    
    // EFO Registration Strategies
    public enum EFORegistrationType {
        LAZY,   // Register consumer on first access
        EAGER,  // Register consumer immediately on job start
        NONE    // Use existing consumer, don't register
    }
    
    /**
     * Generate EFO consumer ARN for a specific stream.
     *
     * @param streamName Name of the Kinesis stream
     * @return Consumer ARN property key
     */
    public static String efoConsumerArn(String streamName);
}

Producer Configuration (Deprecated)

Legacy producer configuration constants (deprecated in favor of direct KPL properties).

@Deprecated
public class ProducerConfigConstants extends AWSConfigConstants {
    
    /**
     * @deprecated Use KPL property "CollectionMaxCount" instead
     */
    @Deprecated
    public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
    
    /**
     * @deprecated Use KPL property "AggregationMaxCount" instead  
     */
    @Deprecated  
    public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
}

Configuration Examples

Basic AWS Configuration

import java.util.Properties;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;

Properties props = new Properties();

// Basic configuration with access keys
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "AKIAIOSFODNN7EXAMPLE");
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");

Advanced AWS Credential Configuration

// Using AWS profiles
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "PROFILE");
props.setProperty(AWSConfigConstants.AWS_PROFILE_NAME, "my-profile");
props.setProperty(AWSConfigConstants.AWS_PROFILE_PATH, "/path/to/credentials");

// Using IAM role assumption
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "arn:aws:iam::123456789012:role/MyRole");
props.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "flink-kinesis-session");
props.setProperty(AWSConfigConstants.AWS_ROLE_EXTERNAL_ID, "external-id");

// Using web identity tokens (for EKS/Fargate)
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
props.setProperty(AWSConfigConstants.AWS_WEB_IDENTITY_TOKEN_FILE, "/var/run/secrets/token");
props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "arn:aws:iam::123456789012:role/EKSRole");

// Using automatic credential chain
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

Consumer Configuration

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

// Stream positioning
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
// or for specific timestamp
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2023-01-01T00:00:00Z");

// Shard configuration for performance tuning
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "200");
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");
props.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");

// Watermark configuration for event-time processing
props.setProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS, "30000");
props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, "180000");

Enhanced Fan-Out Configuration

// Enable Enhanced Fan-Out for dedicated throughput
props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO");
props.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-application");

// EFO registration strategies
props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "LAZY");    // Register on demand
// props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "EAGER"); // Register immediately  
// props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "NONE");  // Use existing consumer

// For using existing EFO consumer
String consumerArn = "arn:aws:kinesis:us-west-2:123456789012:stream/my-stream/consumer/my-consumer:1234567890";
props.setProperty(ConsumerConfigConstants.efoConsumerArn("my-stream"), consumerArn);

Multi-Region Configuration

// Configure for cross-region access
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "https://kinesis.us-west-2.amazonaws.com");

// For custom endpoints (testing, VPC endpoints)
props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "https://vpce-123456-xyz.kinesis.us-west-2.vpce.amazonaws.com");

Producer KPL Configuration

// KPL-specific configuration (passed directly to KPL)
props.setProperty("RecordMaxBufferedTime", "100");        // Batching delay (ms)
props.setProperty("RecordTtl", "30000");                  // Record TTL (ms)
props.setProperty("RequestTimeout", "6000");              // Request timeout (ms)
props.setProperty("MaxConnections", "24");                // HTTP connections

// Aggregation settings
props.setProperty("AggregationEnabled", "true");
props.setProperty("AggregationMaxCount", "4294967295");
props.setProperty("AggregationMaxSize", "51200");

// Retry configuration
props.setProperty("RetryDuration", "10000");              // Max retry time (ms)

// Metrics configuration
props.setProperty("MetricsLevel", "DETAILED");            // NONE, SUMMARY, DETAILED
props.setProperty("MetricsGranularity", "SHARD");         // GLOBAL, STREAM, SHARD
props.setProperty("MetricsNameSpace", "MyApp/KinesisProducer");

Configuration Best Practices

Security

  • Use IAM roles instead of hardcoded access keys when possible
  • Implement credential rotation policies
  • Use least-privilege access policies
  • Enable CloudTrail logging for audit trails

Performance

  • Configure appropriate shard limits based on expected throughput
  • Use Enhanced Fan-Out for high-throughput consumers
  • Tune GetRecords intervals based on latency requirements
  • Enable adaptive reads for variable workloads

Reliability

  • Configure appropriate timeouts and retry policies
  • Set up monitoring and alerting for consumer lag
  • Use checkpointing for exactly-once processing guarantees
  • Configure shard idle timeouts to prevent watermark stalling

Cost Optimization

  • Use standard polling instead of EFO for low-throughput streams
  • Configure appropriate retention periods
  • Use shard-level metrics only when needed
  • Optimize batch sizes and intervals to reduce API calls

Environment-Specific Configuration

Development Environment

// Relaxed settings for development
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "60000");
props.setProperty("MetricsLevel", "SUMMARY");

Production Environment

// Optimized settings for production
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "200");
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");
props.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
props.setProperty("MetricsLevel", "DETAILED");
props.setProperty("MetricsGranularity", "SHARD");

// Enhanced monitoring
props.setProperty("CloudWatchMetricsEnabled", "true");
props.setProperty("CloudWatchMetricsNamespace", "MyApp/Kinesis");

Testing Environment

// Configuration for integration testing
props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566"); // LocalStack
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
props.setProperty("MetricsLevel", "NONE"); // Disable metrics for testing