Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications
—
Apache Flink's configuration system provides a flexible and type-safe way to manage application and cluster settings. The system supports default values, validation, documentation, and seamless integration with the execution environment.
The core configuration classes for reading and writing configuration values.
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
// Basic configuration usage
public class ConfigurationBasics {
public static void basicConfigurationExample() {
Configuration config = new Configuration();
// Set basic values
config.setString("my.string.key", "hello world");
config.setInteger("my.int.key", 42);
config.setBoolean("my.boolean.key", true);
config.setLong("my.long.key", 1234567890L);
config.setDouble("my.double.key", 3.14159);
// Read values
String stringValue = config.getString("my.string.key", "default");
int intValue = config.getInteger("my.int.key", 0);
boolean boolValue = config.getBoolean("my.boolean.key", false);
long longValue = config.getLong("my.long.key", 0L);
double doubleValue = config.getDouble("my.double.key", 0.0);
// Check if key exists
boolean hasKey = config.containsKey("my.string.key");
// Get all keys
Set<String> allKeys = config.keySet();
// Convert to map
Map<String, String> configMap = config.toMap();
}
public static void createFromMap() {
// Create configuration from map
Map<String, String> properties = new HashMap<>();
properties.put("parallelism.default", "4");
properties.put("taskmanager.memory.process.size", "1024m");
Configuration config = Configuration.fromMap(properties);
}
}Type-safe configuration options with metadata and validation.
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;
public class MyConfigOptions {
// String configuration option
public static final ConfigOption<String> DATABASE_URL =
ConfigOptions.key("database.url")
.stringType()
.noDefaultValue()
.withDescription("The URL of the database to connect to");
// Integer option with default value
public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
ConfigOptions.key("database.connection.pool.size")
.intType()
.defaultValue(10)
.withDescription("Maximum number of database connections in the pool");
// Boolean option
public static final ConfigOption<Boolean> ENABLE_METRICS =
ConfigOptions.key("metrics.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether to enable metrics collection");
// Duration option
public static final ConfigOption<Duration> TIMEOUT =
ConfigOptions.key("request.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription("Timeout for requests to external systems");
// Memory size option
public static final ConfigOption<MemorySize> BUFFER_SIZE =
ConfigOptions.key("buffer.size")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(64))
.withDescription("Size of the internal buffer");
// Enum option
public static final ConfigOption<CompressionType> COMPRESSION =
ConfigOptions.key("compression.type")
.enumType(CompressionType.class)
.defaultValue(CompressionType.GZIP)
.withDescription("Compression algorithm to use");
// List option
public static final ConfigOption<List<String>> ALLOWED_HOSTS =
ConfigOptions.key("security.allowed.hosts")
.stringType()
.asList()
.defaultValues("localhost", "127.0.0.1")
.withDescription("List of allowed host names");
// Map option
public static final ConfigOption<Map<String, String>> CUSTOM_PROPERTIES =
ConfigOptions.key("custom.properties")
.mapType()
.defaultValue(Collections.emptyMap())
.withDescription("Custom key-value properties");
}
// Using config options
public class ConfigOptionUsage {
public static void useConfigOptions() {
Configuration config = new Configuration();
// Set values using config options
config.set(MyConfigOptions.DATABASE_URL, "jdbc:postgresql://localhost:5432/mydb");
config.set(MyConfigOptions.CONNECTION_POOL_SIZE, 20);
config.set(MyConfigOptions.ENABLE_METRICS, false);
// Read values using config options
String dbUrl = config.get(MyConfigOptions.DATABASE_URL);
int poolSize = config.get(MyConfigOptions.CONNECTION_POOL_SIZE);
boolean metricsEnabled = config.get(MyConfigOptions.ENABLE_METRICS);
// Use optional for potentially missing values
Optional<String> optionalUrl = config.getOptional(MyConfigOptions.DATABASE_URL);
// Check if option has been set
boolean hasUrl = config.contains(MyConfigOptions.DATABASE_URL);
}
}// Complex configuration options with validation and fallbacks
public class AdvancedConfigOptions {
// Option with deprecated keys
public static final ConfigOption<Integer> PARALLELISM =
ConfigOptions.key("parallelism.default")
.intType()
.defaultValue(1)
.withDeprecatedKeys("env.parallelism", "taskmanager.parallelism")
.withDescription("The default parallelism for operators");
// Option with fallback keys
public static final ConfigOption<String> CHECKPOINT_DIR =
ConfigOptions.key("state.checkpoints.dir")
.stringType()
.noDefaultValue()
.withFallbackKeys("state.backend.fs.checkpointdir")
.withDescription("Directory for storing checkpoints");
// Option with rich description
public static final ConfigOption<Duration> CHECKPOINT_INTERVAL =
ConfigOptions.key("execution.checkpointing.interval")
.durationType()
.noDefaultValue()
.withDescription(
Description.builder()
.text("Interval between consecutive checkpoints. ")
.text("Setting this value enables checkpointing. ")
.linebreak()
.text("Example: 10s, 5min, 1h")
.build()
);
// Option with validation
public static final ConfigOption<Integer> NETWORK_BUFFERS =
ConfigOptions.key("taskmanager.network.numberOfBuffers")
.intType()
.defaultValue(2048)
.withDescription("Number of network buffers available to each TaskManager")
.withValidator(value -> {
if (value < 1) {
throw new IllegalArgumentException("Number of buffers must be positive");
}
if (value > 100000) {
throw new IllegalArgumentException("Number of buffers too large (max: 100000)");
}
});
}import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.TaskManagerOptions;
public class BuiltInOptions {
public static void coreConfigurationExample() {
Configuration config = new Configuration();
// Core execution options
config.set(CoreOptions.DEFAULT_PARALLELISM, 4);
config.set(CoreOptions.TMP_DIRS, "/tmp/flink");
config.set(CoreOptions.FLINK_SHUTDOWN_TIMEOUT, Duration.ofMinutes(1));
// Checkpointing options
config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1));
config.set(CheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofMinutes(10));
config.set(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);
config.set(CheckpointingOptions.CHECKPOINT_STORAGE_ACCESS_ENV_VAR, "CHECKPOINT_STORAGE");
// TaskManager options
config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1024));
config.set(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.ofMebiBytes(512));
}
public static void readBuiltInOptions() {
Configuration config = new Configuration();
// Read core options
int parallelism = config.get(CoreOptions.DEFAULT_PARALLELISM);
String tmpDirs = config.get(CoreOptions.TMP_DIRS);
// Read checkpointing options
Optional<Duration> checkpointInterval =
config.getOptional(CheckpointingOptions.CHECKPOINTING_INTERVAL);
if (checkpointInterval.isPresent()) {
System.out.println("Checkpointing enabled with interval: " +
checkpointInterval.get());
}
}
}import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
public class MemoryConfiguration {
public static void configureTaskManagerMemory() {
Configuration config = new Configuration();
// Total process memory
config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("2g"));
// Or configure individual memory components
config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.ofMebiBytes(1024));
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(512));
config.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, MemorySize.ofMebiBytes(128));
config.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(128));
config.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(64));
// Network memory
config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(64));
config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.ofMebiBytes(1024));
config.set(TaskManagerOptions.NETWORK_MEMORY_FRACTION, 0.1f);
}
public static void memoryUtilities() {
// Creating memory sizes
MemorySize size1 = MemorySize.ofBytes(1024);
MemorySize size2 = MemorySize.ofMebiBytes(64);
MemorySize size3 = MemorySize.parse("1gb");
MemorySize size4 = MemorySize.parse("512mb");
// Memory arithmetic
MemorySize total = size2.add(size3);
MemorySize difference = size3.subtract(size2);
MemorySize scaled = size2.multiply(2);
// Conversions
long bytes = size3.getBytes();
long kiloBytes = size3.getKibiBytes();
long megaBytes = size3.getMebiBytes();
// Formatting
String humanReadable = size3.toHumanReadableString(); // "1 gb"
}
}public class ConfigurationValidation {
// Custom validator function
public static final ConfigOption<Integer> PORT =
ConfigOptions.key("server.port")
.intType()
.defaultValue(8080)
.withDescription("Server port number")
.withValidator(ConfigurationValidation::validatePort);
private static void validatePort(Integer port) {
if (port < 1 || port > 65535) {
throw new IllegalArgumentException(
"Port must be between 1 and 65535, got: " + port);
}
if (port < 1024) {
System.out.println("Warning: Using privileged port " + port);
}
}
// Validate configuration object
public static void validateConfiguration(Configuration config) {
// Check required options
if (!config.contains(MyConfigOptions.DATABASE_URL)) {
throw new IllegalArgumentException("Database URL is required");
}
// Validate option combinations
boolean metricsEnabled = config.get(MyConfigOptions.ENABLE_METRICS);
if (metricsEnabled && !config.contains(MyConfigOptions.CUSTOM_PROPERTIES)) {
System.out.println("Warning: Metrics enabled but no custom properties set");
}
// Validate memory settings
MemorySize heapMemory = config.get(TaskManagerOptions.TASK_HEAP_MEMORY);
MemorySize totalMemory = config.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
if (heapMemory.compareTo(totalMemory) > 0) {
throw new IllegalArgumentException(
"Heap memory cannot exceed total process memory");
}
}
}import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.ConfigurationUtils;
public class ConfigurationUtilities {
public static void configurationUtilsExample() {
Configuration config = new Configuration();
// Parse string to map
String propertiesString = "key1:value1,key2:value2";
Map<String, String> parsed = ConfigurationUtils.parseStringToMap(
propertiesString, ",", ":");
// Encode/decode arrays and collections
List<String> hosts = Arrays.asList("host1", "host2", "host3");
String encoded = ConfigUtils.encodeCollectionToConfig(
config, "allowed.hosts", hosts, Object::toString);
List<String> decoded = ConfigUtils.decodeListFromConfig(
config, "allowed.hosts", String::valueOf);
// Parse temporary directories
String tempDirsConfig = "/tmp1,/tmp2,/tmp3";
String[] tempDirs = ConfigurationUtils.parseTempDirectories(tempDirsConfig);
String randomTempDir = ConfigurationUtils.getRandomTempDirectory(tempDirs);
}
public static void workingWithMaps() {
Configuration config = new Configuration();
// Set map values
Map<String, String> properties = new HashMap<>();
properties.put("timeout", "30s");
properties.put("retries", "3");
properties.put("compression", "gzip");
config.set(MyConfigOptions.CUSTOM_PROPERTIES, properties);
// Read map values
Map<String, String> readProperties = config.get(MyConfigOptions.CUSTOM_PROPERTIES);
// Add individual map entries
config.setString("custom.properties.max-connections", "100");
config.setString("custom.properties.buffer-size", "64kb");
}
}public class DynamicConfiguration {
// Configuration that can be updated at runtime
private volatile Configuration currentConfig;
private final Object configLock = new Object();
public DynamicConfiguration(Configuration initialConfig) {
this.currentConfig = new Configuration(initialConfig);
}
public void updateConfiguration(Configuration newConfig) {
synchronized (configLock) {
// Validate new configuration
validateConfiguration(newConfig);
// Apply updates
this.currentConfig = new Configuration(newConfig);
// Notify components of configuration change
notifyConfigurationChange();
}
}
public <T> T getConfigValue(ConfigOption<T> option) {
synchronized (configLock) {
return currentConfig.get(option);
}
}
public Configuration getSnapshot() {
synchronized (configLock) {
return new Configuration(currentConfig);
}
}
private void validateConfiguration(Configuration config) {
// Validation logic
}
private void notifyConfigurationChange() {
// Notify listeners about configuration changes
}
}// Configuration provider interface
public interface ConfigurationProvider {
Configuration getConfiguration();
void addListener(ConfigurationListener listener);
void removeListener(ConfigurationListener listener);
}
public interface ConfigurationListener {
void onConfigurationChanged(Configuration newConfig);
}
// File-based configuration provider
public class FileConfigurationProvider implements ConfigurationProvider {
private final Path configFile;
private final List<ConfigurationListener> listeners;
private Configuration currentConfig;
private final ScheduledExecutorService watcherService;
public FileConfigurationProvider(Path configFile) {
this.configFile = configFile;
this.listeners = new CopyOnWriteArrayList<>();
this.currentConfig = loadConfiguration();
this.watcherService = Executors.newSingleThreadScheduledExecutor();
// Watch for file changes
watcherService.scheduleWithFixedDelay(this::checkForUpdates, 5, 5, TimeUnit.SECONDS);
}
@Override
public Configuration getConfiguration() {
return new Configuration(currentConfig);
}
@Override
public void addListener(ConfigurationListener listener) {
listeners.add(listener);
}
@Override
public void removeListener(ConfigurationListener listener) {
listeners.remove(listener);
}
private Configuration loadConfiguration() {
try {
Properties props = new Properties();
props.load(Files.newBufferedReader(configFile));
Configuration config = new Configuration();
for (String key : props.stringPropertyNames()) {
config.setString(key, props.getProperty(key));
}
return config;
} catch (IOException e) {
throw new RuntimeException("Failed to load configuration from " + configFile, e);
}
}
private void checkForUpdates() {
try {
Configuration newConfig = loadConfiguration();
if (!newConfig.equals(currentConfig)) {
currentConfig = newConfig;
notifyListeners(newConfig);
}
} catch (Exception e) {
System.err.println("Error checking for configuration updates: " + e.getMessage());
}
}
private void notifyListeners(Configuration newConfig) {
for (ConfigurationListener listener : listeners) {
try {
listener.onConfigurationChanged(newConfig);
} catch (Exception e) {
System.err.println("Error notifying configuration listener: " + e.getMessage());
}
}
}
}// Configuration holder with lazy initialization
public class ApplicationConfig {
private static final ApplicationConfig INSTANCE = new ApplicationConfig();
// Configuration options
public static final ConfigOption<String> APP_NAME =
ConfigOptions.key("app.name")
.stringType()
.defaultValue("MyFlinkApp")
.withDescription("Application name");
public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
ConfigOptions.key("app.heartbeat.interval")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription("Heartbeat interval for health checks");
private final Configuration config;
private ApplicationConfig() {
this.config = loadConfiguration();
}
public static ApplicationConfig getInstance() {
return INSTANCE;
}
public Configuration getConfiguration() {
return new Configuration(config);
}
public <T> T get(ConfigOption<T> option) {
return config.get(option);
}
private Configuration loadConfiguration() {
Configuration config = new Configuration();
// Load from system properties
System.getProperties().forEach((key, value) -> {
if (key.toString().startsWith("app.")) {
config.setString(key.toString(), value.toString());
}
});
// Load from environment variables
System.getenv().forEach((key, value) -> {
if (key.startsWith("FLINK_")) {
String configKey = key.toLowerCase().replace("_", ".");
config.setString(configKey, value);
}
});
// Load from configuration file
loadFromFile(config);
return config;
}
private void loadFromFile(Configuration config) {
// Load from application.properties or flink-conf.yaml
try {
Path configPath = Paths.get("conf/application.properties");
if (Files.exists(configPath)) {
Properties props = new Properties();
props.load(Files.newBufferedReader(configPath));
props.forEach((key, value) ->
config.setString(key.toString(), value.toString()));
}
} catch (IOException e) {
System.err.println("Could not load configuration file: " + e.getMessage());
}
}
}
// Configuration builder pattern
public class ConfigurationBuilder {
private final Configuration config = new Configuration();
public static ConfigurationBuilder create() {
return new ConfigurationBuilder();
}
public ConfigurationBuilder withParallelism(int parallelism) {
config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
return this;
}
public ConfigurationBuilder withCheckpointing(Duration interval) {
config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, interval);
return this;
}
public ConfigurationBuilder withMemory(MemorySize totalMemory) {
config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemory);
return this;
}
public <T> ConfigurationBuilder with(ConfigOption<T> option, T value) {
config.set(option, value);
return this;
}
public Configuration build() {
return new Configuration(config);
}
}
// Usage example
Configuration config = ConfigurationBuilder.create()
.withParallelism(8)
.withCheckpointing(Duration.ofMinutes(2))
.withMemory(MemorySize.parse("4g"))
.with(MyConfigOptions.DATABASE_URL, "jdbc:postgresql://localhost/db")
.with(MyConfigOptions.ENABLE_METRICS, true)
.build();public class EnvironmentConfiguration {
public enum Environment {
DEVELOPMENT, TESTING, PRODUCTION
}
public static Configuration createConfiguration(Environment env) {
Configuration baseConfig = createBaseConfiguration();
switch (env) {
case DEVELOPMENT:
return applyDevelopmentOverrides(baseConfig);
case TESTING:
return applyTestingOverrides(baseConfig);
case PRODUCTION:
return applyProductionOverrides(baseConfig);
default:
return baseConfig;
}
}
private static Configuration createBaseConfiguration() {
return ConfigurationBuilder.create()
.withParallelism(1)
.with(MyConfigOptions.ENABLE_METRICS, true)
.with(CoreOptions.TMP_DIRS, "/tmp/flink")
.build();
}
private static Configuration applyDevelopmentOverrides(Configuration base) {
Configuration config = new Configuration(base);
config.set(CoreOptions.DEFAULT_PARALLELISM, 2);
config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5));
config.setString("logging.level", "DEBUG");
return config;
}
private static Configuration applyTestingOverrides(Configuration base) {
Configuration config = new Configuration(base);
config.set(CoreOptions.DEFAULT_PARALLELISM, 1);
config.set(MyConfigOptions.DATABASE_URL, "jdbc:h2:mem:testdb");
config.setString("logging.level", "INFO");
return config;
}
private static Configuration applyProductionOverrides(Configuration base) {
Configuration config = new Configuration(base);
config.set(CoreOptions.DEFAULT_PARALLELISM, 16);
config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1));
config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("8g"));
config.setString("logging.level", "WARN");
return config;
}
}Apache Flink's configuration system provides powerful capabilities for managing application settings in a type-safe, validated, and flexible manner. By leveraging ConfigOptions, proper validation, and configuration management patterns, you can build maintainable and robust Flink applications that adapt to different deployment environments and requirements.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-core