Mesos-specific resource manager implementation that handles dynamic TaskManager allocation, lifecycle management, and integration with Mesos cluster resources. The resource management system provides automatic scaling, fault tolerance, and efficient resource utilization.
Central service factory interface for creating and managing all Mesos-related components including worker stores, artifact servers, and scheduler drivers.
/**
* Service factory interface for Mesos components
* Provides lifecycle management for all Mesos integration services
*/
public interface MesosServices extends AutoCloseable {
/**
* Create a worker store for persistent TaskManager state
* @param configuration - Configuration for store implementation
* @return MesosWorkerStore instance (standalone or ZooKeeper-based)
* @throws Exception if the worker store could not be created
*/
MesosWorkerStore createMesosWorkerStore(Configuration configuration) throws Exception;
/**
* Create factory for Mesos resource manager actors
* @return Actor factory for resource management
*/
MesosResourceManagerActorFactory createMesosResourceManagerActorFactory();
/**
* Get artifact server for distributing job files to tasks
* @return Artifact server instance
*/
MesosArtifactServer getArtifactServer();
/**
* Create Mesos scheduler driver for framework communication
* @param mesosConfig - Mesos-specific configuration
* @param scheduler - Scheduler implementation
* @param implicitAcknowledgements - Whether to configure driver for implicit acknowledgements
* @return Configured SchedulerDriver instance
*/
SchedulerDriver createMesosSchedulerDriver(MesosConfiguration mesosConfig,
Scheduler scheduler,
boolean implicitAcknowledgements);
/**
* Close all services and cleanup resources
* @param cleanup - Whether to perform cleanup operations
* @throws Exception if the closing operation failed
*/
void close(boolean cleanup) throws Exception;
}Utility class for creating appropriate MesosServices implementations based on high availability configuration.
/**
* Utilities for creating MesosServices instances
* Handles selection between standalone and ZooKeeper-based implementations
*/
public class MesosServicesUtils {
/**
* Create MesosServices instance based on HA configuration
* @param config - Flink configuration containing HA settings
* @param hostname - Hostname for service binding
* @return Appropriate MesosServices implementation
*/
public static MesosServices createMesosServices(Configuration config, String hostname);
}Usage Example:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
// Create services based on configuration
Configuration config = new Configuration();
config.setString("high-availability", "zookeeper");
config.setString("high-availability.zookeeper.quorum", "zk1:2181,zk2:2181");
MesosServices services = MesosServicesUtils.createMesosServices(config, "master-host");
// Use services
MesosWorkerStore workerStore = services.createMesosWorkerStore(config);
MesosArtifactServer artifactServer = services.getArtifactServer();
// Cleanup when done
services.close(true);Factory for creating Mesos-specific resource managers that integrate with Flink's active resource management system.
/**
* Factory for creating Mesos resource managers
* Integrates with Flink's ActiveResourceManager framework
*/
public class MesosResourceManagerFactory extends ActiveResourceManagerFactory<RegisteredMesosWorkerNode> {
/**
* Create Mesos resource manager factory
* @param mesosServices - Mesos services instance
* @param mesosConfiguration - Mesos scheduler configuration
*/
public MesosResourceManagerFactory(MesosServices mesosServices,
MesosConfiguration mesosConfiguration);
}Implementation of task launching for Mesos workers, handling the conversion from Flink's container specifications to Mesos TaskInfo.
/**
* Handles launching of TaskManager processes in Mesos containers
* Converts Flink ContainerSpecification to Mesos TaskInfo
*/
public class LaunchableMesosWorker implements LaunchableTask {
/**
* Get unique task identifier
* @return Mesos task ID for this worker
*/
public Protos.TaskID taskID();
/**
* Get Fenzo task requirements for resource scheduling
* @return TaskRequest with resource and constraint requirements
*/
public TaskRequest taskRequest();
/**
* Launch the TaskManager task on the specified Mesos slave
* @param slaveId - Target Mesos slave for task execution
* @param allocation - Allocated resources for the task
* @return TaskInfo for Mesos task launch
*/
public Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation allocation);
/**
* Extract port configuration keys from Flink configuration
* @param config - Flink configuration
* @return Set of port keys requiring dynamic assignment
*/
public static Set<String> extractPortKeys(Configuration config);
/**
* Configure artifact server for task artifact distribution
* @param artifactServer - Server instance for artifact hosting
* @param taskManagerParameters - TaskManager configuration parameters
* @param config - Flink configuration
* @param logger - Logger for operation reporting
*/
public static void configureArtifactServer(MesosArtifactServer artifactServer,
MesosTaskManagerParameters taskManagerParameters,
Configuration config,
Logger logger);
}Worker Launch Example:
import org.apache.flink.mesos.runtime.clusterframework.LaunchableMesosWorker;
import org.apache.flink.mesos.util.MesosResourceAllocation;
// Create worker with resource requirements
LaunchableMesosWorker worker = new LaunchableMesosWorker(/* constructor params */);
// Get resource requirements for scheduling
TaskRequest taskRequest = worker.taskRequest();
double cpuCores = taskRequest.getCPUs();
double memoryMB = taskRequest.getMemory();
// Launch on allocated resources
Protos.SlaveID slaveId = Protos.SlaveID.newBuilder().setValue("slave-001").build();
MesosResourceAllocation allocation = new MesosResourceAllocation(/* resources */);
Protos.TaskInfo taskInfo = worker.launch(slaveId, allocation);Resource specification factory for creating Mesos worker resource specs that integrate with Flink's resource management system.
/**
* Factory for creating Mesos worker resource specifications
* Handles resource requirement calculation and specification
*/
public class MesosWorkerResourceSpecFactory implements WorkerResourceSpecFactory<RegisteredMesosWorkerNode> {
/**
* Create worker resource specification from TaskManager parameters
* @param taskManagerParameters - Resource requirements
* @return Worker resource specification
*/
public RegisteredMesosWorkerNode createWorkerResourceSpec(MesosTaskManagerParameters taskManagerParameters);
}
/**
* Registered Mesos worker node with resource information
* Represents a TaskManager instance registered with the resource manager
*/
public class RegisteredMesosWorkerNode extends WorkerResourceSpec {
/**
* Get worker resource specification
* @return Resource requirements and allocation details
*/
public WorkerResourceSpec getResourceSpec();
/**
* Get Mesos task ID for this worker
* @return Unique task identifier
*/
public Protos.TaskID getTaskId();
}Interface defining actions that can be performed by the Mesos resource manager for cluster lifecycle management.
/**
* Actions interface for Mesos resource manager operations
* Defines cluster management operations available to the resource manager
*/
public interface MesosResourceManagerActions {
/**
* Request allocation of new TaskManager resources
* @param resourceProfile - Required resource profile
* @param timeout - Timeout for resource allocation
*/
void requestNewWorker(ResourceProfile resourceProfile, Duration timeout);
/**
* Release allocated TaskManager resources
* @param workerId - Identifier of worker to release
*/
void releaseWorker(ResourceID workerId);
/**
* Get current cluster resource status
* @return Current allocation and utilization information
*/
ClusterResourceStatus getClusterResourceStatus();
}The resource manager supports automatic scaling based on job requirements:
// Configure auto-scaling behavior
Configuration config = new Configuration();
config.setString("resourcemanager.rpc.port", "0");
config.setString("resourcemanager.rpc.bind-port", "0");
// Enable reactive scaling
config.setBoolean("scheduler-mode.reactive", true);
config.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
// Resource constraints
config.setDouble("mesos.resourcemanager.tasks.cpus", 2.0);
config.setString("taskmanager.memory.process.size", "2g");Configure resource reservation for guaranteed allocation:
Configuration config = new Configuration();
// Framework role for reservations
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE, "production");
// Resource constraints with reservations
config.setString("mesos.constraints.hard.attribute", "rack:LIKE:rack-1");
config.setString("mesos.resourcemanager.tasks.cpus", "2.0");
config.setString("mesos.resourcemanager.tasks.mem", "2048");Support for both Mesos native containers and Docker containers:
Configuration config = new Configuration();
// Docker container configuration
config.setString("mesos.resourcemanager.tasks.container.type", "docker");
config.setString("mesos.resourcemanager.tasks.container.docker.image", "flink:1.13.6-scala_2.11");
config.setString("mesos.resourcemanager.tasks.container.docker.network", "HOST");
// Volume mounts
config.setString("mesos.resourcemanager.tasks.container.volumes",
"/host-data:/container-data:RO,/host-logs:/container-logs:RW");
// Environment variables
config.setString("containerized.master.env.FLINK_CONF_DIR", "/opt/flink/conf");The resource management system provides comprehensive error handling:
All resource management classes are deprecated as of Flink 1.13. Migration paths:
org.apache.flink.kubernetes.operator.* for resource managementorg.apache.flink.yarn.* resource management classes/**
* Resource allocation details for Mesos tasks
*/
public class MesosResourceAllocation {
public double cpus();
public double memoryMB();
public double diskMB();
public double networkMbps();
public List<Protos.Resource> mesosResources();
}
/**
* Cluster resource status information
*/
public class ClusterResourceStatus {
public int totalTaskManagers();
public int availableTaskSlots();
public int allocatedTaskSlots();
public ResourceProfile totalResources();
public ResourceProfile availableResources();
}
/**
* Task launch context and parameters
*/
public class TaskLaunchContext {
public ContainerSpecification containerSpec();
public MesosTaskManagerParameters taskManagerParameters();
public Map<String, String> environmentVariables();
public List<String> commandLineArguments();
}