CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-core

Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration System

Apache Flink's configuration system provides a flexible and type-safe way to manage application and cluster settings. The system supports default values, validation, documentation, and seamless integration with the execution environment.

Configuration Basics

Configuration Objects

The core configuration classes for reading and writing configuration values.

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

// Basic configuration usage
public class ConfigurationBasics {
    
    public static void basicConfigurationExample() {
        Configuration config = new Configuration();
        
        // Set basic values
        config.setString("my.string.key", "hello world");
        config.setInteger("my.int.key", 42);
        config.setBoolean("my.boolean.key", true);
        config.setLong("my.long.key", 1234567890L);
        config.setDouble("my.double.key", 3.14159);
        
        // Read values
        String stringValue = config.getString("my.string.key", "default");
        int intValue = config.getInteger("my.int.key", 0);
        boolean boolValue = config.getBoolean("my.boolean.key", false);
        long longValue = config.getLong("my.long.key", 0L);
        double doubleValue = config.getDouble("my.double.key", 0.0);
        
        // Check if key exists
        boolean hasKey = config.containsKey("my.string.key");
        
        // Get all keys
        Set<String> allKeys = config.keySet();
        
        // Convert to map
        Map<String, String> configMap = config.toMap();
    }
    
    public static void createFromMap() {
        // Create configuration from map
        Map<String, String> properties = new HashMap<>();
        properties.put("parallelism.default", "4");
        properties.put("taskmanager.memory.process.size", "1024m");
        
        Configuration config = Configuration.fromMap(properties);
    }
}

ConfigOption System

Type-safe configuration options with metadata and validation.

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;

public class MyConfigOptions {
    
    // String configuration option
    public static final ConfigOption<String> DATABASE_URL =
        ConfigOptions.key("database.url")
            .stringType()
            .noDefaultValue()
            .withDescription("The URL of the database to connect to");
    
    // Integer option with default value
    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
        ConfigOptions.key("database.connection.pool.size")
            .intType()
            .defaultValue(10)
            .withDescription("Maximum number of database connections in the pool");
    
    // Boolean option
    public static final ConfigOption<Boolean> ENABLE_METRICS =
        ConfigOptions.key("metrics.enabled")
            .booleanType()
            .defaultValue(true)
            .withDescription("Whether to enable metrics collection");
    
    // Duration option
    public static final ConfigOption<Duration> TIMEOUT =
        ConfigOptions.key("request.timeout")
            .durationType()
            .defaultValue(Duration.ofSeconds(30))
            .withDescription("Timeout for requests to external systems");
    
    // Memory size option
    public static final ConfigOption<MemorySize> BUFFER_SIZE =
        ConfigOptions.key("buffer.size")
            .memoryType()
            .defaultValue(MemorySize.ofMebiBytes(64))
            .withDescription("Size of the internal buffer");
    
    // Enum option
    public static final ConfigOption<CompressionType> COMPRESSION =
        ConfigOptions.key("compression.type")
            .enumType(CompressionType.class)
            .defaultValue(CompressionType.GZIP)
            .withDescription("Compression algorithm to use");
    
    // List option
    public static final ConfigOption<List<String>> ALLOWED_HOSTS =
        ConfigOptions.key("security.allowed.hosts")
            .stringType()
            .asList()
            .defaultValues("localhost", "127.0.0.1")
            .withDescription("List of allowed host names");
    
    // Map option
    public static final ConfigOption<Map<String, String>> CUSTOM_PROPERTIES =
        ConfigOptions.key("custom.properties")
            .mapType()
            .defaultValue(Collections.emptyMap())
            .withDescription("Custom key-value properties");
}

// Using config options
public class ConfigOptionUsage {
    
    public static void useConfigOptions() {
        Configuration config = new Configuration();
        
        // Set values using config options
        config.set(MyConfigOptions.DATABASE_URL, "jdbc:postgresql://localhost:5432/mydb");
        config.set(MyConfigOptions.CONNECTION_POOL_SIZE, 20);
        config.set(MyConfigOptions.ENABLE_METRICS, false);
        
        // Read values using config options
        String dbUrl = config.get(MyConfigOptions.DATABASE_URL);
        int poolSize = config.get(MyConfigOptions.CONNECTION_POOL_SIZE);
        boolean metricsEnabled = config.get(MyConfigOptions.ENABLE_METRICS);
        
        // Use optional for potentially missing values
        Optional<String> optionalUrl = config.getOptional(MyConfigOptions.DATABASE_URL);
        
        // Check if option has been set
        boolean hasUrl = config.contains(MyConfigOptions.DATABASE_URL);
    }
}

Advanced Configuration Options

// Complex configuration options with validation and fallbacks
public class AdvancedConfigOptions {
    
    // Option with deprecated keys
    public static final ConfigOption<Integer> PARALLELISM =
        ConfigOptions.key("parallelism.default")
            .intType()
            .defaultValue(1)
            .withDeprecatedKeys("env.parallelism", "taskmanager.parallelism")
            .withDescription("The default parallelism for operators");
    
    // Option with fallback keys
    public static final ConfigOption<String> CHECKPOINT_DIR =
        ConfigOptions.key("state.checkpoints.dir")
            .stringType()
            .noDefaultValue()
            .withFallbackKeys("state.backend.fs.checkpointdir")
            .withDescription("Directory for storing checkpoints");
    
    // Option with rich description
    public static final ConfigOption<Duration> CHECKPOINT_INTERVAL =
        ConfigOptions.key("execution.checkpointing.interval")
            .durationType()
            .noDefaultValue()
            .withDescription(
                Description.builder()
                    .text("Interval between consecutive checkpoints. ")
                    .text("Setting this value enables checkpointing. ")
                    .linebreak()
                    .text("Example: 10s, 5min, 1h")
                    .build()
            );
    
    // Option with validation
    public static final ConfigOption<Integer> NETWORK_BUFFERS =
        ConfigOptions.key("taskmanager.network.numberOfBuffers")
            .intType()
            .defaultValue(2048)
            .withDescription("Number of network buffers available to each TaskManager")
            .withValidator(value -> {
                if (value < 1) {
                    throw new IllegalArgumentException("Number of buffers must be positive");
                }
                if (value > 100000) {
                    throw new IllegalArgumentException("Number of buffers too large (max: 100000)");
                }
            });
}

Built-in Configuration Options

Core Options

import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.TaskManagerOptions;

public class BuiltInOptions {
    
    public static void coreConfigurationExample() {
        Configuration config = new Configuration();
        
        // Core execution options
        config.set(CoreOptions.DEFAULT_PARALLELISM, 4);
        config.set(CoreOptions.TMP_DIRS, "/tmp/flink");
        config.set(CoreOptions.FLINK_SHUTDOWN_TIMEOUT, Duration.ofMinutes(1));
        
        // Checkpointing options
        config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1));
        config.set(CheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofMinutes(10));
        config.set(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE_ACCESS_ENV_VAR, "CHECKPOINT_STORAGE");
        
        // TaskManager options
        config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1024));
        config.set(TaskManagerOptions.NUM_TASK_SLOTS, 2);
        config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.ofMebiBytes(512));
    }
    
    public static void readBuiltInOptions() {
        Configuration config = new Configuration();
        
        // Read core options
        int parallelism = config.get(CoreOptions.DEFAULT_PARALLELISM);
        String tmpDirs = config.get(CoreOptions.TMP_DIRS);
        
        // Read checkpointing options
        Optional<Duration> checkpointInterval = 
            config.getOptional(CheckpointingOptions.CHECKPOINTING_INTERVAL);
            
        if (checkpointInterval.isPresent()) {
            System.out.println("Checkpointing enabled with interval: " + 
                checkpointInterval.get());
        }
    }
}

Memory Configuration

import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;

public class MemoryConfiguration {
    
    public static void configureTaskManagerMemory() {
        Configuration config = new Configuration();
        
        // Total process memory
        config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("2g"));
        
        // Or configure individual memory components
        config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.ofMebiBytes(1024));
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(512));
        config.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, MemorySize.ofMebiBytes(128));
        config.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(128));
        config.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(64));
        
        // Network memory
        config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(64));
        config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.ofMebiBytes(1024));
        config.set(TaskManagerOptions.NETWORK_MEMORY_FRACTION, 0.1f);
    }
    
    public static void memoryUtilities() {
        // Creating memory sizes
        MemorySize size1 = MemorySize.ofBytes(1024);
        MemorySize size2 = MemorySize.ofMebiBytes(64);
        MemorySize size3 = MemorySize.parse("1gb");
        MemorySize size4 = MemorySize.parse("512mb");
        
        // Memory arithmetic
        MemorySize total = size2.add(size3);
        MemorySize difference = size3.subtract(size2);
        MemorySize scaled = size2.multiply(2);
        
        // Conversions
        long bytes = size3.getBytes();
        long kiloBytes = size3.getKibiBytes();
        long megaBytes = size3.getMebiBytes();
        
        // Formatting
        String humanReadable = size3.toHumanReadableString(); // "1 gb"
    }
}

Configuration Validation and Utilities

Configuration Validation

public class ConfigurationValidation {
    
    // Custom validator function
    public static final ConfigOption<Integer> PORT =
        ConfigOptions.key("server.port")
            .intType()
            .defaultValue(8080)
            .withDescription("Server port number")
            .withValidator(ConfigurationValidation::validatePort);
    
    private static void validatePort(Integer port) {
        if (port < 1 || port > 65535) {
            throw new IllegalArgumentException(
                "Port must be between 1 and 65535, got: " + port);
        }
        if (port < 1024) {
            System.out.println("Warning: Using privileged port " + port);
        }
    }
    
    // Validate configuration object
    public static void validateConfiguration(Configuration config) {
        // Check required options
        if (!config.contains(MyConfigOptions.DATABASE_URL)) {
            throw new IllegalArgumentException("Database URL is required");
        }
        
        // Validate option combinations
        boolean metricsEnabled = config.get(MyConfigOptions.ENABLE_METRICS);
        if (metricsEnabled && !config.contains(MyConfigOptions.CUSTOM_PROPERTIES)) {
            System.out.println("Warning: Metrics enabled but no custom properties set");
        }
        
        // Validate memory settings
        MemorySize heapMemory = config.get(TaskManagerOptions.TASK_HEAP_MEMORY);
        MemorySize totalMemory = config.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
        
        if (heapMemory.compareTo(totalMemory) > 0) {
            throw new IllegalArgumentException(
                "Heap memory cannot exceed total process memory");
        }
    }
}

Configuration Utilities

import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.ConfigurationUtils;

public class ConfigurationUtilities {
    
    public static void configurationUtilsExample() {
        Configuration config = new Configuration();
        
        // Parse string to map
        String propertiesString = "key1:value1,key2:value2";
        Map<String, String> parsed = ConfigurationUtils.parseStringToMap(
            propertiesString, ",", ":");
        
        // Encode/decode arrays and collections
        List<String> hosts = Arrays.asList("host1", "host2", "host3");
        String encoded = ConfigUtils.encodeCollectionToConfig(
            config, "allowed.hosts", hosts, Object::toString);
        
        List<String> decoded = ConfigUtils.decodeListFromConfig(
            config, "allowed.hosts", String::valueOf);
        
        // Parse temporary directories
        String tempDirsConfig = "/tmp1,/tmp2,/tmp3";
        String[] tempDirs = ConfigurationUtils.parseTempDirectories(tempDirsConfig);
        String randomTempDir = ConfigurationUtils.getRandomTempDirectory(tempDirs);
    }
    
    public static void workingWithMaps() {
        Configuration config = new Configuration();
        
        // Set map values
        Map<String, String> properties = new HashMap<>();
        properties.put("timeout", "30s");
        properties.put("retries", "3");
        properties.put("compression", "gzip");
        
        config.set(MyConfigOptions.CUSTOM_PROPERTIES, properties);
        
        // Read map values
        Map<String, String> readProperties = config.get(MyConfigOptions.CUSTOM_PROPERTIES);
        
        // Add individual map entries
        config.setString("custom.properties.max-connections", "100");
        config.setString("custom.properties.buffer-size", "64kb");
    }
}

Dynamic Configuration

Runtime Configuration Updates

public class DynamicConfiguration {
    
    // Configuration that can be updated at runtime
    private volatile Configuration currentConfig;
    private final Object configLock = new Object();
    
    public DynamicConfiguration(Configuration initialConfig) {
        this.currentConfig = new Configuration(initialConfig);
    }
    
    public void updateConfiguration(Configuration newConfig) {
        synchronized (configLock) {
            // Validate new configuration
            validateConfiguration(newConfig);
            
            // Apply updates
            this.currentConfig = new Configuration(newConfig);
            
            // Notify components of configuration change
            notifyConfigurationChange();
        }
    }
    
    public <T> T getConfigValue(ConfigOption<T> option) {
        synchronized (configLock) {
            return currentConfig.get(option);
        }
    }
    
    public Configuration getSnapshot() {
        synchronized (configLock) {
            return new Configuration(currentConfig);
        }
    }
    
    private void validateConfiguration(Configuration config) {
        // Validation logic
    }
    
    private void notifyConfigurationChange() {
        // Notify listeners about configuration changes
    }
}

Configuration Providers

// Configuration provider interface
public interface ConfigurationProvider {
    Configuration getConfiguration();
    void addListener(ConfigurationListener listener);
    void removeListener(ConfigurationListener listener);
}

public interface ConfigurationListener {
    void onConfigurationChanged(Configuration newConfig);
}

// File-based configuration provider
public class FileConfigurationProvider implements ConfigurationProvider {
    private final Path configFile;
    private final List<ConfigurationListener> listeners;
    private Configuration currentConfig;
    private final ScheduledExecutorService watcherService;
    
    public FileConfigurationProvider(Path configFile) {
        this.configFile = configFile;
        this.listeners = new CopyOnWriteArrayList<>();
        this.currentConfig = loadConfiguration();
        this.watcherService = Executors.newSingleThreadScheduledExecutor();
        
        // Watch for file changes
        watcherService.scheduleWithFixedDelay(this::checkForUpdates, 5, 5, TimeUnit.SECONDS);
    }
    
    @Override
    public Configuration getConfiguration() {
        return new Configuration(currentConfig);
    }
    
    @Override
    public void addListener(ConfigurationListener listener) {
        listeners.add(listener);
    }
    
    @Override
    public void removeListener(ConfigurationListener listener) {
        listeners.remove(listener);
    }
    
    private Configuration loadConfiguration() {
        try {
            Properties props = new Properties();
            props.load(Files.newBufferedReader(configFile));
            
            Configuration config = new Configuration();
            for (String key : props.stringPropertyNames()) {
                config.setString(key, props.getProperty(key));
            }
            
            return config;
        } catch (IOException e) {
            throw new RuntimeException("Failed to load configuration from " + configFile, e);
        }
    }
    
    private void checkForUpdates() {
        try {
            Configuration newConfig = loadConfiguration();
            if (!newConfig.equals(currentConfig)) {
                currentConfig = newConfig;
                notifyListeners(newConfig);
            }
        } catch (Exception e) {
            System.err.println("Error checking for configuration updates: " + e.getMessage());
        }
    }
    
    private void notifyListeners(Configuration newConfig) {
        for (ConfigurationListener listener : listeners) {
            try {
                listener.onConfigurationChanged(newConfig);
            } catch (Exception e) {
                System.err.println("Error notifying configuration listener: " + e.getMessage());
            }
        }
    }
}

Configuration Best Practices

Configuration Management Patterns

// Configuration holder with lazy initialization
public class ApplicationConfig {
    private static final ApplicationConfig INSTANCE = new ApplicationConfig();
    
    // Configuration options
    public static final ConfigOption<String> APP_NAME =
        ConfigOptions.key("app.name")
            .stringType()
            .defaultValue("MyFlinkApp")
            .withDescription("Application name");
    
    public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
        ConfigOptions.key("app.heartbeat.interval")
            .durationType()
            .defaultValue(Duration.ofSeconds(30))
            .withDescription("Heartbeat interval for health checks");
    
    private final Configuration config;
    
    private ApplicationConfig() {
        this.config = loadConfiguration();
    }
    
    public static ApplicationConfig getInstance() {
        return INSTANCE;
    }
    
    public Configuration getConfiguration() {
        return new Configuration(config);
    }
    
    public <T> T get(ConfigOption<T> option) {
        return config.get(option);
    }
    
    private Configuration loadConfiguration() {
        Configuration config = new Configuration();
        
        // Load from system properties
        System.getProperties().forEach((key, value) -> {
            if (key.toString().startsWith("app.")) {
                config.setString(key.toString(), value.toString());
            }
        });
        
        // Load from environment variables
        System.getenv().forEach((key, value) -> {
            if (key.startsWith("FLINK_")) {
                String configKey = key.toLowerCase().replace("_", ".");
                config.setString(configKey, value);
            }
        });
        
        // Load from configuration file
        loadFromFile(config);
        
        return config;
    }
    
    private void loadFromFile(Configuration config) {
        // Load from application.properties or flink-conf.yaml
        try {
            Path configPath = Paths.get("conf/application.properties");
            if (Files.exists(configPath)) {
                Properties props = new Properties();
                props.load(Files.newBufferedReader(configPath));
                
                props.forEach((key, value) -> 
                    config.setString(key.toString(), value.toString()));
            }
        } catch (IOException e) {
            System.err.println("Could not load configuration file: " + e.getMessage());
        }
    }
}

// Configuration builder pattern
public class ConfigurationBuilder {
    private final Configuration config = new Configuration();
    
    public static ConfigurationBuilder create() {
        return new ConfigurationBuilder();
    }
    
    public ConfigurationBuilder withParallelism(int parallelism) {
        config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
        return this;
    }
    
    public ConfigurationBuilder withCheckpointing(Duration interval) {
        config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, interval);
        return this;
    }
    
    public ConfigurationBuilder withMemory(MemorySize totalMemory) {
        config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemory);
        return this;
    }
    
    public <T> ConfigurationBuilder with(ConfigOption<T> option, T value) {
        config.set(option, value);
        return this;
    }
    
    public Configuration build() {
        return new Configuration(config);
    }
}

// Usage example
Configuration config = ConfigurationBuilder.create()
    .withParallelism(8)
    .withCheckpointing(Duration.ofMinutes(2))
    .withMemory(MemorySize.parse("4g"))
    .with(MyConfigOptions.DATABASE_URL, "jdbc:postgresql://localhost/db")
    .with(MyConfigOptions.ENABLE_METRICS, true)
    .build();

Environment-Specific Configuration

public class EnvironmentConfiguration {
    
    public enum Environment {
        DEVELOPMENT, TESTING, PRODUCTION
    }
    
    public static Configuration createConfiguration(Environment env) {
        Configuration baseConfig = createBaseConfiguration();
        
        switch (env) {
            case DEVELOPMENT:
                return applyDevelopmentOverrides(baseConfig);
            case TESTING:
                return applyTestingOverrides(baseConfig);
            case PRODUCTION:
                return applyProductionOverrides(baseConfig);
            default:
                return baseConfig;
        }
    }
    
    private static Configuration createBaseConfiguration() {
        return ConfigurationBuilder.create()
            .withParallelism(1)
            .with(MyConfigOptions.ENABLE_METRICS, true)
            .with(CoreOptions.TMP_DIRS, "/tmp/flink")
            .build();
    }
    
    private static Configuration applyDevelopmentOverrides(Configuration base) {
        Configuration config = new Configuration(base);
        config.set(CoreOptions.DEFAULT_PARALLELISM, 2);
        config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5));
        config.setString("logging.level", "DEBUG");
        return config;
    }
    
    private static Configuration applyTestingOverrides(Configuration base) {
        Configuration config = new Configuration(base);
        config.set(CoreOptions.DEFAULT_PARALLELISM, 1);
        config.set(MyConfigOptions.DATABASE_URL, "jdbc:h2:mem:testdb");
        config.setString("logging.level", "INFO");
        return config;
    }
    
    private static Configuration applyProductionOverrides(Configuration base) {
        Configuration config = new Configuration(base);
        config.set(CoreOptions.DEFAULT_PARALLELISM, 16);
        config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1));
        config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("8g"));
        config.setString("logging.level", "WARN");
        return config;
    }
}

Apache Flink's configuration system provides powerful capabilities for managing application settings in a type-safe, validated, and flexible manner. By leveraging ConfigOptions, proper validation, and configuration management patterns, you can build maintainable and robust Flink applications that adapt to different deployment environments and requirements.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-core

docs

configuration.md

connectors.md

event-time-watermarks.md

execution-jobs.md

functions-and-operators.md

index.md

state-management.md

type-system-serialization.md

utilities.md

tile.json