Comprehensive configuration options for customizing Mesos framework behavior, resource requirements, authentication, and cluster settings. The configuration system provides type-safe options for all aspects of Mesos integration.
Central configuration constants defining all Mesos-related settings for framework registration, resource management, and cluster behavior.
/**
* Configuration options for Mesos integration
* Provides type-safe configuration constants for all Mesos settings
*/
public class MesosOptions {
/**
* The Mesos master URL to connect to
* Format: mesos://host:port or zk://host1:port1,host2:port2/path
*/
public static final ConfigOption<String> MASTER_URL;
/**
* Failover timeout in seconds for framework re-registration
* Controls how long Mesos waits before considering framework failed
*/
public static final ConfigOption<Integer> FAILOVER_TIMEOUT_SECONDS;
/**
* Port for the artifact server that distributes job artifacts
* Use 0 for automatic port assignment
*/
public static final ConfigOption<Integer> ARTIFACT_SERVER_PORT;
/**
* Name of the Mesos framework for identification
* Must be unique within the Mesos cluster
*/
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_NAME;
/**
* Mesos framework role for resource reservation and quotas
*/
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_ROLE;
/**
* Principal for Mesos framework authentication
*/
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_PRINCIPAL;
/**
* Secret for Mesos framework authentication
*/
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_SECRET;
/**
* User context for running framework tasks
*/
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_USER;
/**
* Enable SSL/TLS for artifact server
*/
public static final ConfigOption<Boolean> ARTIFACT_SERVER_SSL_ENABLED;
/**
* Dynamic port assignment configuration keys
* Comma-separated list of port names to assign dynamically
*/
public static final ConfigOption<String> PORT_ASSIGNMENTS;
/**
* Time in milliseconds to hold unused resource offers
*/
public static final ConfigOption<Long> UNUSED_OFFER_EXPIRATION;
/**
* Duration in milliseconds to refuse declined offers
*/
public static final ConfigOption<Long> DECLINED_OFFER_REFUSE_DURATION;
}Basic Configuration Example:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.configuration.MesosOptions;
Configuration config = new Configuration();
// Required settings
config.setString(MesosOptions.MASTER_URL, "mesos://master1:5050,master2:5050,master3:5050");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME, "production-flink");
// Optional performance tuning
config.setInteger(MesosOptions.FAILOVER_TIMEOUT_SECONDS, 900);
config.setLong(MesosOptions.UNUSED_OFFER_EXPIRATION, 30000L);
config.setLong(MesosOptions.DECLINED_OFFER_REFUSE_DURATION, 5000L);Environment variable names used for passing configuration to Mesos containers and tasks.
/**
* Environment variable names for Mesos container configuration
* Used internally by the framework to pass settings to TaskManager containers
*/
public class MesosConfigKeys {
/**
* Environment variable for Flink temporary directory path
*/
public static final String ENV_FLINK_TMP_DIR;
/**
* Environment variable for JVM arguments
*/
public static final String ENV_JVM_ARGS;
/**
* Environment variable for task name (DCOS integration)
*/
public static final String ENV_TASK_NAME;
/**
* Environment variable for framework name (DCOS integration)
*/
public static final String ENV_FRAMEWORK_NAME;
}Typed configuration object for Mesos scheduler settings, providing structured access to framework information and credentials.
/**
* Structured configuration for Mesos scheduler
* Provides typed access to framework settings and authentication
*/
public class MesosConfiguration {
/**
* Get the configured Mesos master URL
* @return Mesos master URL string
*/
public String masterUrl();
/**
* Get Mesos framework information builder
* @return FrameworkInfo builder with configured settings
*/
public Protos.FrameworkInfo.Builder frameworkInfo();
/**
* Get authentication credentials if configured
* @return Optional credential builder for framework authentication
*/
public Option<Protos.Credential.Builder> credential();
/**
* Create a new configuration with updated framework info
* @param frameworkInfo - Updated framework information
* @return New MesosConfiguration instance
*/
public MesosConfiguration withFrameworkInfo(Protos.FrameworkInfo.Builder frameworkInfo);
/**
* Get configured framework roles
* @return Set of role names for resource allocation
*/
public Set<String> roles();
/**
* Log configuration details for debugging
* @param logger - Logger instance for output
* @param config - Configuration to log
*/
public static void logMesosConfig(Logger logger, MesosConfiguration config);
}Authentication Configuration Example:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.util.MesosUtils;
Configuration config = new Configuration();
// Framework authentication
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, "flink-framework");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET, "secret-key");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE, "production");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER, "flink");
// Create typed configuration
MesosConfiguration mesosConfig = MesosUtils.createMesosSchedulerConfiguration(config, "hostname");Configuration parameters specific to TaskManager processes running in Mesos containers, including resource requirements and container settings.
/**
* Mesos-specific parameters for TaskManager processes
* Defines resource requirements and container configuration
*/
public class MesosTaskManagerParameters {
/**
* Create TaskManager parameters from configuration
* @param config - Flink configuration
* @return Configured TaskManager parameters
*/
public static MesosTaskManagerParameters create(Configuration config);
/**
* Get CPU core requirements for TaskManager
* @return Number of CPU cores needed
*/
public double cpus();
/**
* Get GPU requirements for TaskManager
* @return Number of GPUs needed
*/
public double gpus();
/**
* Get disk space requirements in MB
* @return Disk space needed in megabytes
*/
public int disk();
/**
* Get network bandwidth requirements in Mbps
* @return Network bandwidth needed
*/
public int network();
/**
* Get container type for TaskManager execution
* @return Container type (MESOS or DOCKER)
*/
public ContainerType containerType();
/**
* Get Docker image name if using Docker containers
* @return Optional Docker image name
*/
public Option<String> dockerImageName();
/**
* Get memory requirements in MB
* @return Memory needed in megabytes
*/
public int memoryMB();
/**
* Container types supported by Mesos TaskManager
*/
public enum ContainerType {
/** Use Mesos native containerizer */
MESOS,
/** Use Docker containerizer */
DOCKER
}
}Resource Configuration Example:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
Configuration config = new Configuration();
// TaskManager resource requirements
config.setDouble("taskmanager.numberOfTaskSlots", 4.0);
config.setString("taskmanager.memory.process.size", "2g");
config.setString("taskmanager.memory.managed.size", "512m");
config.setDouble("mesos.resourcemanager.tasks.cpus", 2.0);
config.setInteger("mesos.resourcemanager.tasks.disk", 1024);
// Container configuration
config.setString("mesos.resourcemanager.tasks.container.type", "docker");
config.setString("mesos.resourcemanager.tasks.container.docker.image", "flink:1.13.6");
// Create typed parameters
MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);Recommended configuration for production Mesos deployments:
Configuration config = new Configuration();
// Framework settings
config.setString(MesosOptions.MASTER_URL, "zk://zk1:2181,zk2:2181,zk3:2181/mesos");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME, "production-flink");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE, "production");
config.setInteger(MesosOptions.FAILOVER_TIMEOUT_SECONDS, 1800);
// Resource management
config.setLong(MesosOptions.UNUSED_OFFER_EXPIRATION, 60000L);
config.setLong(MesosOptions.DECLINED_OFFER_REFUSE_DURATION, 10000L);
// Artifact distribution
config.setInteger(MesosOptions.ARTIFACT_SERVER_PORT, 0); // Auto-assign
config.setBoolean(MesosOptions.ARTIFACT_SERVER_SSL_ENABLED, true);
// High availability
config.setString("high-availability", "zookeeper");
config.setString("high-availability.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");Simplified configuration for development and testing:
Configuration config = new Configuration();
// Minimal required settings
config.setString(MesosOptions.MASTER_URL, "mesos://localhost:5050");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME, "dev-flink");
config.setInteger(MesosOptions.FAILOVER_TIMEOUT_SECONDS, 300);
// Fast resource allocation for development
config.setLong(MesosOptions.UNUSED_OFFER_EXPIRATION, 10000L);
config.setLong(MesosOptions.DECLINED_OFFER_REFUSE_DURATION, 1000L);The configuration system handles common configuration errors:
All configuration classes in this module are deprecated as of Flink 1.13. For migration:
org.apache.flink.kubernetes.configuration.* packagesorg.apache.flink.yarn.configuration.* packages/**
* Resource allocation information for Mesos tasks
*/
public class MesosResourceAllocation {
public double cpus();
public double memoryMB();
public double diskMB();
public List<Protos.Resource> resources();
}
/**
* Framework information and credentials
*/
public class FrameworkConfiguration {
public String name();
public String role();
public String principal();
public String user();
public int failoverTimeoutSeconds();
}