Collection of utility classes for Mesos integration, including artifact distribution, resource management, configuration helpers, and Mesos protobuf utilities. These utilities provide essential support functions for all aspects of Flink-Mesos integration.
HTTP server and resolver interfaces for distributing job artifacts to Mesos tasks via Mesos Fetcher integration.
/**
* Interface for resolving artifact URIs for Mesos Fetcher
* Provides URI resolution for files that need to be distributed to tasks
*/
public interface MesosArtifactResolver {
/**
* Resolve artifact URI for the given remote file path
* @param remoteFile - Path to file as it should appear in task container
* @return Optional URL where file can be fetched, empty if not found
*/
Option<URL> resolve(Path remoteFile);
}
/**
* HTTP server for distributing artifacts to Mesos tasks via Mesos Fetcher
* Extends resolver interface with server lifecycle management
*/
public interface MesosArtifactServer extends MesosArtifactResolver {
/**
* Add a local file path for distribution to tasks
* @param path - Local file system path to serve
* @param remoteFile - Path as it should appear in task containers
* @return URL where file can be fetched by Mesos Fetcher
*/
URL addPath(Path path, Path remoteFile);
/**
* Stop the artifact server and cleanup resources
* Closes all connections and releases server port
*/
void stop();
}
/**
* HTTP server implementation for artifact distribution
* Provides secure, scalable file distribution to Mesos tasks
*/
public class MesosArtifactServerImpl implements MesosArtifactServer {
/**
* Create artifact server with configuration
* @param hostname - Hostname for server binding
* @param port - Port for server (0 for automatic assignment)
* @param sslConfig - Optional SSL configuration for secure distribution
*/
public MesosArtifactServerImpl(String hostname, int port, SSLConfiguration sslConfig);
/**
* Start the artifact server
* @return Server URL for artifact access
*/
public URL start();
/**
* Get the server port (useful when auto-assigned)
* @return Actual port number being used
*/
public int getPort();
}Artifact Server Usage Example:
import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.mesos.util.MesosArtifactServerImpl;
// Create and start artifact server
MesosArtifactServerImpl server = new MesosArtifactServerImpl("master-host", 0, null);
URL serverUrl = server.start();
// Add job JAR for distribution
Path jobJarPath = Paths.get("/path/to/job.jar");
URL jarUrl = server.addPath(jobJarPath, Paths.get("lib/job.jar"));
// Add configuration files
Path configPath = Paths.get("/opt/flink/conf/flink-conf.yaml");
URL configUrl = server.addPath(configPath, Paths.get("conf/flink-conf.yaml"));
// URLs can now be used in Mesos TaskInfo URIs
// Mesos Fetcher will download files to task containersComprehensive utilities for creating and managing Mesos-specific configurations, TaskManager parameters, and container specifications.
/**
* Collection of Mesos-related utility methods
* Provides configuration creation and management functions
*/
public class MesosUtils {
/**
* Create Mesos scheduler configuration from Flink configuration
* @param config - Flink configuration with Mesos settings
* @param hostname - Hostname for framework registration
* @return Configured MesosConfiguration for scheduler
*/
public static MesosConfiguration createMesosSchedulerConfiguration(Configuration config,
String hostname);
/**
* Create TaskManager parameters from configuration
* @param config - Flink configuration
* @param logger - Logger for parameter validation messages
* @return Configured TaskManager parameters for Mesos deployment
*/
public static MesosTaskManagerParameters createTmParameters(Configuration config,
Logger logger);
/**
* Create container specification from configuration
* @param config - Flink configuration with container settings
* @return Container specification for TaskManager deployment
*/
public static ContainerSpecification createContainerSpec(Configuration config);
/**
* Apply configuration overlays to container specification
* Merges environment variables, volumes, and other container settings
* @param config - Flink configuration
* @param containerSpec - Base container specification to modify
*/
public static void applyOverlays(Configuration config,
ContainerSpecification containerSpec);
/**
* Load and merge Flink configuration from multiple sources
* @param baseConfig - Base configuration
* @param logger - Logger for configuration loading messages
* @return Merged configuration with all sources applied
*/
public static Configuration loadConfiguration(Configuration baseConfig, Logger logger);
}Configuration Creation Example:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.util.MesosUtils;
import org.apache.flink.mesos.configuration.MesosOptions;
// Create base configuration
Configuration config = new Configuration();
config.setString(MesosOptions.MASTER_URL, "mesos://master:5050");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME, "flink-cluster");
config.setDouble("taskmanager.numberOfTaskSlots", 4.0);
config.setString("taskmanager.memory.process.size", "2g");
// Create Mesos scheduler configuration
MesosConfiguration mesosConfig = MesosUtils.createMesosSchedulerConfiguration(
config, "cluster-master"
);
// Create TaskManager parameters
MesosTaskManagerParameters tmParams = MesosUtils.createTmParameters(
config, LoggerFactory.getLogger(MyClass.class)
);
// Create container specification
ContainerSpecification containerSpec = MesosUtils.createContainerSpec(config);
MesosUtils.applyOverlays(config, containerSpec);Collection of utility methods for creating and manipulating Mesos protobuf objects including resources, environment variables, and URIs.
/**
* Collection of Mesos protobuf and resource utility methods
* Provides helpers for creating Mesos protocol buffer objects
*/
public class Utils {
/**
* Create CPU resource specification
* @param cpus - Number of CPU cores
* @return Mesos Resource for CPU allocation
*/
public static Protos.Resource cpus(double cpus);
/**
* Create memory resource specification
* @param mem - Memory amount in MB
* @return Mesos Resource for memory allocation
*/
public static Protos.Resource mem(double mem);
/**
* Create GPU resource specification
* @param gpus - Number of GPU units
* @return Mesos Resource for GPU allocation
*/
public static Protos.Resource gpus(double gpus);
/**
* Create disk resource specification
* @param disk - Disk space in MB
* @return Mesos Resource for disk allocation
*/
public static Protos.Resource disk(double disk);
/**
* Create network resource specification
* @param bandwidth - Network bandwidth in Mbps
* @return Mesos Resource for network allocation
*/
public static Protos.Resource network(double bandwidth);
/**
* Create port range resource specification
* @param begin - Start of port range (inclusive)
* @param end - End of port range (inclusive)
* @return Mesos Resource for port range allocation
*/
public static Protos.Resource ports(long begin, long end);
/**
* Create environment variable for Mesos tasks
* @param key - Environment variable name
* @param value - Environment variable value
* @return Mesos Environment.Variable object
*/
public static Protos.Environment.Variable variable(String key, String value);
/**
* Create URI specification for Mesos Fetcher
* @param uri - URI to fetch
* @param extract - Whether to extract archives
* @param executable - Whether file should be executable
* @param cache - Whether to cache the URI
* @param outputFile - Optional output filename
* @return Mesos CommandInfo.URI object
*/
public static Protos.CommandInfo.URI uri(String uri,
boolean extract,
boolean executable,
boolean cache,
String outputFile);
/**
* Convert range values to string representation
* @param ranges - List of Value.Range objects
* @return String representation of ranges (e.g., "8000-8010,9000-9010")
*/
public static String rangeValues(List<Protos.Value.Range> ranges);
/**
* Convert Mesos resource to string representation
* @param resource - Mesos Resource object
* @return Human-readable string representation
*/
public static String toString(Protos.Resource resource);
}Protobuf Utilities Example:
import org.apache.flink.mesos.Utils;
import org.apache.mesos.Protos;
// Create resource specifications
Protos.Resource cpuResource = Utils.cpus(2.0);
Protos.Resource memResource = Utils.mem(2048.0);
Protos.Resource diskResource = Utils.disk(1024.0);
Protos.Resource portsResource = Utils.ports(8000, 8010);
// Create environment variables
Protos.Environment.Variable javaHome = Utils.variable("JAVA_HOME", "/usr/lib/jvm/java-8");
Protos.Environment.Variable flinkHome = Utils.variable("FLINK_HOME", "/opt/flink");
// Create URIs for Mesos Fetcher
Protos.CommandInfo.URI jobJarUri = Utils.uri(
"http://artifact-server:8080/job.jar",
false, // don't extract
false, // not executable
true, // cache
"lib/job.jar" // output filename
);
Protos.CommandInfo.URI configUri = Utils.uri(
"http://artifact-server:8080/flink-conf.yaml",
false, false, true, "conf/flink-conf.yaml"
);
// Use in TaskInfo creation
Protos.TaskInfo taskInfo = Protos.TaskInfo.newBuilder()
.addAllResources(Arrays.asList(cpuResource, memResource, diskResource, portsResource))
.setCommand(Protos.CommandInfo.newBuilder()
.addAllUris(Arrays.asList(jobJarUri, configUri))
.setEnvironment(Protos.Environment.newBuilder()
.addAllVariables(Arrays.asList(javaHome, flinkHome))
)
)
.build();Utilities for managing Mesos resource allocations and calculations.
/**
* Represents allocated Mesos resources for a task
* Provides resource information and allocation details
*/
public class MesosResourceAllocation {
/**
* Create resource allocation from Mesos resources
* @param resources - List of allocated Mesos resources
*/
public MesosResourceAllocation(List<Protos.Resource> resources);
/**
* Get allocated CPU cores
* @return Number of CPU cores allocated
*/
public double cpus();
/**
* Get allocated memory in megabytes
* @return Memory allocation in MB
*/
public double memoryMB();
/**
* Get allocated disk space in megabytes
* @return Disk allocation in MB
*/
public double diskMB();
/**
* Get allocated network bandwidth in Mbps
* @return Network bandwidth allocation
*/
public double networkMbps();
/**
* Get allocated GPU units
* @return Number of GPUs allocated
*/
public double gpus();
/**
* Get allocated port ranges
* @return List of allocated port ranges
*/
public List<Protos.Value.Range> ports();
/**
* Get all allocated Mesos resources
* @return Complete list of Mesos Resource objects
*/
public List<Protos.Resource> mesosResources();
/**
* Check if allocation satisfies resource requirements
* @param requirements - Required resource amounts
* @return true if allocation meets or exceeds requirements
*/
public boolean satisfies(ResourceProfile requirements);
}Comprehensive validation of Mesos configurations:
// Configuration validation utility
public class MesosConfigValidator {
public static void validateConfiguration(Configuration config) {
// Validate required settings
if (!config.contains(MesosOptions.MASTER_URL)) {
throw new IllegalArgumentException("Mesos master URL is required");
}
// Validate resource requirements
double cpus = config.getDouble("mesos.resourcemanager.tasks.cpus", 1.0);
if (cpus <= 0) {
throw new IllegalArgumentException("CPU requirement must be positive");
}
// Validate framework settings
String frameworkName = config.getString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME);
if (frameworkName == null || frameworkName.trim().isEmpty()) {
throw new IllegalArgumentException("Framework name cannot be empty");
}
// Validate timeout settings
int failoverTimeout = config.getInteger(MesosOptions.FAILOVER_TIMEOUT_SECONDS);
if (failoverTimeout < 0) {
throw new IllegalArgumentException("Failover timeout cannot be negative");
}
}
}Helper methods for resource requirement calculations:
// Resource calculation utilities
public class ResourceCalculator {
public static ResourceProfile calculateTaskManagerProfile(Configuration config) {
// Calculate memory requirements
MemorySize processMemory = TaskExecutorResourceUtils.extractTotalProcessMemoryConfiguration(config);
MemorySize managedMemory = TaskExecutorResourceUtils.extractManagedMemoryConfiguration(config);
// Calculate CPU requirements
double cpuCores = config.getDouble("taskmanager.numberOfTaskSlots", 1.0);
// Calculate disk requirements
long diskSize = config.getLong("mesos.resourcemanager.tasks.disk", 1024);
return ResourceProfile.newBuilder()
.setCpuCores(cpuCores)
.setTaskHeapMemory(processMemory)
.setManagedMemory(managedMemory)
.build();
}
public static boolean isResourceSufficient(ResourceProfile required,
MesosResourceAllocation available) {
return available.cpus() >= required.getCpuCores().getValue() &&
available.memoryMB() >= required.getTotalMemory().getMebiBytes() &&
available.diskMB() >= 1024; // Minimum disk requirement
}
}Utilities for Docker container image handling:
// Container image utilities
public class ContainerImageUtils {
public static String resolveImageName(Configuration config) {
String imageName = config.getString("mesos.resourcemanager.tasks.container.docker.image");
if (imageName == null) {
// Default image based on Flink version
String flinkVersion = config.getString("flink.version", "1.13.6");
String scalaVersion = config.getString("scala.version", "2.11");
imageName = String.format("flink:%s-scala_%s", flinkVersion, scalaVersion);
}
return imageName;
}
public static List<String> buildDockerParameters(Configuration config) {
List<String> parameters = new ArrayList<>();
// Network configuration
String network = config.getString("mesos.resourcemanager.tasks.container.docker.network", "HOST");
parameters.add("--net=" + network);
// Volume mounts
String volumes = config.getString("mesos.resourcemanager.tasks.container.volumes", "");
for (String volume : volumes.split(",")) {
if (!volume.trim().isEmpty()) {
parameters.add("-v");
parameters.add(volume.trim());
}
}
return parameters;
}
}Comprehensive error handling patterns for utility operations:
Enhanced logging for troubleshooting:
// Enhanced logging utilities
public class MesosLoggingUtils {
public static void logResourceAllocation(Logger logger, MesosResourceAllocation allocation) {
logger.info("Resource allocation: CPU={}, Memory={}MB, Disk={}MB, GPU={}",
allocation.cpus(), allocation.memoryMB(),
allocation.diskMB(), allocation.gpus());
}
public static void logConfigurationSummary(Logger logger, Configuration config) {
logger.info("Mesos configuration summary:");
logger.info(" Master URL: {}", config.getString(MesosOptions.MASTER_URL));
logger.info(" Framework: {}", config.getString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
logger.info(" Resources: CPU={}, Memory={}",
config.getDouble("mesos.resourcemanager.tasks.cpus", 1.0),
config.getString("taskmanager.memory.process.size", "1g"));
}
}Efficient caching for expensive operations:
Optimal connection handling for external services:
All utility classes are deprecated as of Flink 1.13. Migration guidance:
org.apache.flink.kubernetes.utils.*org.apache.flink.yarn.utils.*org.apache.flink.runtime.util.*/**
* SSL configuration for artifact server
*/
public class SSLConfiguration {
public String getKeystorePath();
public String getKeystorePassword();
public String getTruststorePath();
public String getTruststorePassword();
public boolean isClientAuthRequired();
}
/**
* Container specification for TaskManager deployment
*/
public class ContainerSpecification {
public String getImageName();
public ContainerType getType();
public Map<String, String> getEnvironmentVariables();
public List<VolumeMount> getVolumeMounts();
public List<String> getCommand();
public ResourceProfile getResourceProfile();
}
/**
* Volume mount specification
*/
public class VolumeMount {
public String getHostPath();
public String getContainerPath();
public MountMode getMode(); // READ_ONLY, READ_WRITE
}
/**
* Artifact distribution statistics
*/
public class ArtifactStats {
public int getTotalArtifacts();
public long getTotalSizeBytes();
public int getDownloadCount();
public double getAverageDownloadTime();
public List<String> getMostRequestedArtifacts();
}