Apache Flink Mesos integration module that provides resource manager implementation for running Flink clusters on Apache Mesos.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-mesos-2-11@1.13.0Apache Flink Mesos integration module that provides resource manager implementation for running Flink clusters on Apache Mesos. This module enables Flink to dynamically allocate and manage TaskManager resources through Mesos, supporting both session and per-job cluster modes with automatic resource scaling and fault tolerance.
Important: This module was deprecated in Apache Flink 1.13 (FLINK-22352) and is scheduled for removal in future versions. Users are encouraged to migrate to Kubernetes or YARN resource managers.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-mesos_${scala.binary.version}</artifactId>
<version>1.13.6</version>
</dependency>import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint;
import org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint;import org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint;
import org.apache.flink.configuration.Configuration;
// Configure Mesos settings
Configuration config = new Configuration();
config.setString("mesos.master", "mesos://localhost:5050");
config.setString("mesos.resourcemanager.framework.name", "flink-session");
// Start session cluster
MesosSessionClusterEntrypoint.main(new String[]{});import org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint;
import org.apache.flink.configuration.Configuration;
// Configure Mesos settings for per-job cluster
Configuration config = new Configuration();
config.setString("mesos.master", "mesos://localhost:5050");
config.setString("mesos.resourcemanager.framework.name", "flink-job-cluster");
// Start per-job cluster
MesosJobClusterEntrypoint.main(new String[]{"--job-classname", "com.example.MyJob"});The Flink Mesos integration is built around several key components:
MesosJobClusterEntrypoint, MesosSessionClusterEntrypoint)Main entry points for launching Flink clusters on Mesos, supporting both session and per-job deployment modes.
public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
public static void main(String[] args);
}
public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
public static void main(String[] args);
}Comprehensive configuration options for customizing Mesos framework behavior, resource requirements, and cluster settings.
public class MesosOptions {
public static final ConfigOption<String> MASTER_URL;
public static final ConfigOption<Integer> FAILOVER_TIMEOUT_SECONDS;
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_NAME;
public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_ROLE;
// ... additional configuration options
}Mesos-specific resource manager implementation that handles dynamic TaskManager allocation, lifecycle management, and integration with Mesos cluster resources.
public interface MesosServices {
MesosWorkerStore createMesosWorkerStore(Configuration configuration) throws Exception;
MesosResourceManagerActorFactory createMesosResourceManagerActorFactory();
MesosArtifactServer getArtifactServer();
SchedulerDriver createMesosSchedulerDriver(MesosConfiguration mesosConfig,
Scheduler scheduler,
boolean implicitAcknowledgements);
void close(boolean cleanup) throws Exception;
}Advanced task scheduling capabilities using Netflix Fenzo integration for optimal resource utilization and task placement on Mesos clusters.
public interface LaunchableTask {
TaskRequest taskRequest();
Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation allocation);
}
public class Offer implements VirtualMachineLease {
public double cpuCores();
public double memoryMB();
public double diskMB();
// ... resource availability methods
}Persistent storage interfaces for maintaining cluster state and worker information across framework restarts and failures.
public interface MesosWorkerStore {
void start() throws Exception;
void stop(boolean cleanup) throws Exception;
Option<Protos.FrameworkID> getFrameworkID() throws Exception;
void setFrameworkID(Option<Protos.FrameworkID> frameworkID) throws Exception;
List<Worker> recoverWorkers() throws Exception;
Protos.TaskID newTaskID() throws Exception;
void putWorker(Worker worker) throws Exception;
boolean removeWorker(Protos.TaskID taskID) throws Exception;
}Collection of utility classes for Mesos integration, including artifact distribution, resource management, and configuration helpers.
public interface MesosArtifactServer extends MesosArtifactResolver {
URL addPath(Path path, Path remoteFile);
void stop();
}
public class MesosUtils {
public static MesosConfiguration createMesosSchedulerConfiguration(Configuration config, String hostname);
public static MesosTaskManagerParameters createTmParameters(Configuration config, Logger logger);
}public class MesosConfiguration {
public String masterUrl();
public Protos.FrameworkInfo.Builder frameworkInfo();
public Option<Protos.Credential.Builder> credential();
public Set<String> roles();
}
public class MesosTaskManagerParameters {
public double cpus();
public double gpus();
public int disk();
public int network();
public ContainerType containerType();
public enum ContainerType {
MESOS, DOCKER
}
}
public class MesosWorkerStore.Worker {
public Protos.TaskID taskID();
public LaunchableMesosWorker launchableMesosWorker();
public WorkerState state();
public enum WorkerState {
New, Launched, Released
}
}