Apache Flink Mesos integration module that provides resource manager implementation for running Flink clusters on Apache Mesos.
Comprehensive configuration options for customizing Mesos framework behavior, resource requirements, authentication, and cluster settings. The configuration system provides type-safe options for all aspects of Mesos integration.
Central configuration constants defining all Mesos-related settings for framework registration, resource management, and cluster behavior.
/**
* Configuration options for Mesos integration
* Provides type-safe configuration constants for all Mesos settings
*/
public class MesosOptions {
/**
* The Mesos master URL to connect to
* Format: mesos://host:port or zk://host1:port1,host2:port2/path
*/
public static final ConfigOption<String> MASTER_URL;
/**
* Failover timeout in seconds for framework re-registration
* Controls how long Mesos waits before considering framework failed
*/
public static final ConfigOption<Integer> FAILOVER_TIMEOUT_SECONDS;
/**
* Port for the artifact server that distributes job artifacts
* Use 0 for automatic port assignment
*/
public static final ConfigOption<Integer> ARTIFACT_SERVER_PORT;
/**
* Name of the Mesos framework for identification
* Must be unique within the Mesos cluster
*/
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_NAME;
/**
* Mesos framework role for resource reservation and quotas
*/
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_ROLE;
/**
* Principal for Mesos framework authentication
*/
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_PRINCIPAL;
/**
* Secret for Mesos framework authentication
*/
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_SECRET;
/**
* User context for running framework tasks
*/
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_USER;
/**
* Enable SSL/TLS for artifact server
*/
public static final ConfigOption<Boolean> ARTIFACT_SERVER_SSL_ENABLED;
/**
* Dynamic port assignment configuration keys
* Comma-separated list of port names to assign dynamically
*/
public static final ConfigOption<String> PORT_ASSIGNMENTS;
/**
* Time in milliseconds to hold unused resource offers
*/
public static final ConfigOption<Long> UNUSED_OFFER_EXPIRATION;
/**
* Duration in milliseconds to refuse declined offers
*/
public static final ConfigOption<Long> DECLINED_OFFER_REFUSE_DURATION;
}Basic Configuration Example:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.configuration.MesosOptions;
Configuration config = new Configuration();
// Required settings
config.setString(MesosOptions.MASTER_URL, "mesos://master1:5050,master2:5050,master3:5050");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME, "production-flink");
// Optional performance tuning
config.setInteger(MesosOptions.FAILOVER_TIMEOUT_SECONDS, 900);
config.setLong(MesosOptions.UNUSED_OFFER_EXPIRATION, 30000L);
config.setLong(MesosOptions.DECLINED_OFFER_REFUSE_DURATION, 5000L);Environment variable names used for passing configuration to Mesos containers and tasks.
/**
* Environment variable names for Mesos container configuration
* Used internally by the framework to pass settings to TaskManager containers
*/
public class MesosConfigKeys {
/**
* Environment variable for Flink temporary directory path
*/
public static final String ENV_FLINK_TMP_DIR;
/**
* Environment variable for JVM arguments
*/
public static final String ENV_JVM_ARGS;
/**
* Environment variable for task name (DCOS integration)
*/
public static final String ENV_TASK_NAME;
/**
* Environment variable for framework name (DCOS integration)
*/
public static final String ENV_FRAMEWORK_NAME;
}Typed configuration object for Mesos scheduler settings, providing structured access to framework information and credentials.
/**
* Structured configuration for Mesos scheduler
* Provides typed access to framework settings and authentication
*/
public class MesosConfiguration {
/**
* Get the configured Mesos master URL
* @return Mesos master URL string
*/
public String masterUrl();
/**
* Get Mesos framework information builder
* @return FrameworkInfo builder with configured settings
*/
public Protos.FrameworkInfo.Builder frameworkInfo();
/**
* Get authentication credentials if configured
* @return Optional credential builder for framework authentication
*/
public Option<Protos.Credential.Builder> credential();
/**
* Create a new configuration with updated framework info
* @param frameworkInfo - Updated framework information
* @return New MesosConfiguration instance
*/
public MesosConfiguration withFrameworkInfo(Protos.FrameworkInfo.Builder frameworkInfo);
/**
* Get configured framework roles
* @return Set of role names for resource allocation
*/
public Set<String> roles();
/**
* Log configuration details for debugging
* @param logger - Logger instance for output
* @param config - Configuration to log
*/
public static void logMesosConfig(Logger logger, MesosConfiguration config);
}Authentication Configuration Example:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.util.MesosUtils;
Configuration config = new Configuration();
// Framework authentication
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, "flink-framework");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET, "secret-key");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE, "production");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER, "flink");
// Create typed configuration
MesosConfiguration mesosConfig = MesosUtils.createMesosSchedulerConfiguration(config, "hostname");Configuration parameters specific to TaskManager processes running in Mesos containers, including resource requirements and container settings.
/**
* Mesos-specific parameters for TaskManager processes
* Defines resource requirements and container configuration
*/
public class MesosTaskManagerParameters {
/**
* Create TaskManager parameters from configuration
* @param config - Flink configuration
* @return Configured TaskManager parameters
*/
public static MesosTaskManagerParameters create(Configuration config);
/**
* Get CPU core requirements for TaskManager
* @return Number of CPU cores needed
*/
public double cpus();
/**
* Get GPU requirements for TaskManager
* @return Number of GPUs needed
*/
public double gpus();
/**
* Get disk space requirements in MB
* @return Disk space needed in megabytes
*/
public int disk();
/**
* Get network bandwidth requirements in Mbps
* @return Network bandwidth needed
*/
public int network();
/**
* Get container type for TaskManager execution
* @return Container type (MESOS or DOCKER)
*/
public ContainerType containerType();
/**
* Get Docker image name if using Docker containers
* @return Optional Docker image name
*/
public Option<String> dockerImageName();
/**
* Get memory requirements in MB
* @return Memory needed in megabytes
*/
public int memoryMB();
/**
* Container types supported by Mesos TaskManager
*/
public enum ContainerType {
/** Use Mesos native containerizer */
MESOS,
/** Use Docker containerizer */
DOCKER
}
}Resource Configuration Example:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
Configuration config = new Configuration();
// TaskManager resource requirements
config.setDouble("taskmanager.numberOfTaskSlots", 4.0);
config.setString("taskmanager.memory.process.size", "2g");
config.setString("taskmanager.memory.managed.size", "512m");
config.setDouble("mesos.resourcemanager.tasks.cpus", 2.0);
config.setInteger("mesos.resourcemanager.tasks.disk", 1024);
// Container configuration
config.setString("mesos.resourcemanager.tasks.container.type", "docker");
config.setString("mesos.resourcemanager.tasks.container.docker.image", "flink:1.13.6");
// Create typed parameters
MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);Recommended configuration for production Mesos deployments:
Configuration config = new Configuration();
// Framework settings
config.setString(MesosOptions.MASTER_URL, "zk://zk1:2181,zk2:2181,zk3:2181/mesos");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME, "production-flink");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE, "production");
config.setInteger(MesosOptions.FAILOVER_TIMEOUT_SECONDS, 1800);
// Resource management
config.setLong(MesosOptions.UNUSED_OFFER_EXPIRATION, 60000L);
config.setLong(MesosOptions.DECLINED_OFFER_REFUSE_DURATION, 10000L);
// Artifact distribution
config.setInteger(MesosOptions.ARTIFACT_SERVER_PORT, 0); // Auto-assign
config.setBoolean(MesosOptions.ARTIFACT_SERVER_SSL_ENABLED, true);
// High availability
config.setString("high-availability", "zookeeper");
config.setString("high-availability.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");Simplified configuration for development and testing:
Configuration config = new Configuration();
// Minimal required settings
config.setString(MesosOptions.MASTER_URL, "mesos://localhost:5050");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME, "dev-flink");
config.setInteger(MesosOptions.FAILOVER_TIMEOUT_SECONDS, 300);
// Fast resource allocation for development
config.setLong(MesosOptions.UNUSED_OFFER_EXPIRATION, 10000L);
config.setLong(MesosOptions.DECLINED_OFFER_REFUSE_DURATION, 1000L);The configuration system handles common configuration errors:
All configuration classes in this module are deprecated as of Flink 1.13. For migration:
org.apache.flink.kubernetes.configuration.* packagesorg.apache.flink.yarn.configuration.* packages/**
* Resource allocation information for Mesos tasks
*/
public class MesosResourceAllocation {
public double cpus();
public double memoryMB();
public double diskMB();
public List<Protos.Resource> resources();
}
/**
* Framework information and credentials
*/
public class FrameworkConfiguration {
public String name();
public String role();
public String principal();
public String user();
public int failoverTimeoutSeconds();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-mesos-2-11