or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdentry-points.mdhigh-availability.mdindex.mdresource-management.mdtask-scheduling.mdutilities.md
tile.json

utilities.mddocs/

Utilities and Helpers

Collection of utility classes for Mesos integration, including artifact distribution, resource management, configuration helpers, and Mesos protobuf utilities. These utilities provide essential support functions for all aspects of Flink-Mesos integration.

Capabilities

Artifact Distribution

HTTP server and resolver interfaces for distributing job artifacts to Mesos tasks via Mesos Fetcher integration.

/**
 * Interface for resolving artifact URIs for Mesos Fetcher
 * Provides URI resolution for files that need to be distributed to tasks
 */
public interface MesosArtifactResolver {
    /**
     * Resolve artifact URI for the given remote file path
     * @param remoteFile - Path to file as it should appear in task container
     * @return Optional URL where file can be fetched, empty if not found
     */
    Option<URL> resolve(Path remoteFile);
}

/**
 * HTTP server for distributing artifacts to Mesos tasks via Mesos Fetcher
 * Extends resolver interface with server lifecycle management
 */
public interface MesosArtifactServer extends MesosArtifactResolver {
    /**
     * Add a local file path for distribution to tasks
     * @param path - Local file system path to serve
     * @param remoteFile - Path as it should appear in task containers
     * @return URL where file can be fetched by Mesos Fetcher
     */
    URL addPath(Path path, Path remoteFile);
    
    /**
     * Stop the artifact server and cleanup resources
     * Closes all connections and releases server port
     */
    void stop();
}

/**
 * HTTP server implementation for artifact distribution
 * Provides secure, scalable file distribution to Mesos tasks
 */
public class MesosArtifactServerImpl implements MesosArtifactServer {
    /**
     * Create artifact server with configuration
     * @param hostname - Hostname for server binding
     * @param port - Port for server (0 for automatic assignment)
     * @param sslConfig - Optional SSL configuration for secure distribution
     */
    public MesosArtifactServerImpl(String hostname, int port, SSLConfiguration sslConfig);
    
    /**
     * Start the artifact server
     * @return Server URL for artifact access
     */
    public URL start();
    
    /**
     * Get the server port (useful when auto-assigned)
     * @return Actual port number being used
     */
    public int getPort();
}

Artifact Server Usage Example:

import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.mesos.util.MesosArtifactServerImpl;

// Create and start artifact server
MesosArtifactServerImpl server = new MesosArtifactServerImpl("master-host", 0, null);
URL serverUrl = server.start();

// Add job JAR for distribution
Path jobJarPath = Paths.get("/path/to/job.jar");
URL jarUrl = server.addPath(jobJarPath, Paths.get("lib/job.jar"));

// Add configuration files
Path configPath = Paths.get("/opt/flink/conf/flink-conf.yaml");
URL configUrl = server.addPath(configPath, Paths.get("conf/flink-conf.yaml"));

// URLs can now be used in Mesos TaskInfo URIs
// Mesos Fetcher will download files to task containers

Configuration Utilities

Comprehensive utilities for creating and managing Mesos-specific configurations, TaskManager parameters, and container specifications.

/**
 * Collection of Mesos-related utility methods
 * Provides configuration creation and management functions
 */
public class MesosUtils {
    /**
     * Create Mesos scheduler configuration from Flink configuration
     * @param config - Flink configuration with Mesos settings
     * @param hostname - Hostname for framework registration
     * @return Configured MesosConfiguration for scheduler
     */
    public static MesosConfiguration createMesosSchedulerConfiguration(Configuration config, 
                                                                       String hostname);
    
    /**
     * Create TaskManager parameters from configuration
     * @param config - Flink configuration
     * @param logger - Logger for parameter validation messages
     * @return Configured TaskManager parameters for Mesos deployment
     */
    public static MesosTaskManagerParameters createTmParameters(Configuration config, 
                                                                Logger logger);
    
    /**
     * Create container specification from configuration
     * @param config - Flink configuration with container settings
     * @return Container specification for TaskManager deployment
     */
    public static ContainerSpecification createContainerSpec(Configuration config);
    
    /**
     * Apply configuration overlays to container specification
     * Merges environment variables, volumes, and other container settings
     * @param config - Flink configuration
     * @param containerSpec - Base container specification to modify
     */
    public static void applyOverlays(Configuration config, 
                                     ContainerSpecification containerSpec);
    
    /**
     * Load and merge Flink configuration from multiple sources
     * @param baseConfig - Base configuration
     * @param logger - Logger for configuration loading messages
     * @return Merged configuration with all sources applied
     */
    public static Configuration loadConfiguration(Configuration baseConfig, Logger logger);
}

Configuration Creation Example:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.util.MesosUtils;
import org.apache.flink.mesos.configuration.MesosOptions;

// Create base configuration
Configuration config = new Configuration();
config.setString(MesosOptions.MASTER_URL, "mesos://master:5050");
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME, "flink-cluster");
config.setDouble("taskmanager.numberOfTaskSlots", 4.0);
config.setString("taskmanager.memory.process.size", "2g");

// Create Mesos scheduler configuration
MesosConfiguration mesosConfig = MesosUtils.createMesosSchedulerConfiguration(
    config, "cluster-master"
);

// Create TaskManager parameters
MesosTaskManagerParameters tmParams = MesosUtils.createTmParameters(
    config, LoggerFactory.getLogger(MyClass.class)
);

// Create container specification
ContainerSpecification containerSpec = MesosUtils.createContainerSpec(config);
MesosUtils.applyOverlays(config, containerSpec);

Mesos Protobuf Utilities

Collection of utility methods for creating and manipulating Mesos protobuf objects including resources, environment variables, and URIs.

/**
 * Collection of Mesos protobuf and resource utility methods
 * Provides helpers for creating Mesos protocol buffer objects
 */
public class Utils {
    /**
     * Create CPU resource specification
     * @param cpus - Number of CPU cores
     * @return Mesos Resource for CPU allocation
     */
    public static Protos.Resource cpus(double cpus);
    
    /**
     * Create memory resource specification  
     * @param mem - Memory amount in MB
     * @return Mesos Resource for memory allocation
     */
    public static Protos.Resource mem(double mem);
    
    /**
     * Create GPU resource specification
     * @param gpus - Number of GPU units
     * @return Mesos Resource for GPU allocation
     */
    public static Protos.Resource gpus(double gpus);
    
    /**
     * Create disk resource specification
     * @param disk - Disk space in MB
     * @return Mesos Resource for disk allocation
     */
    public static Protos.Resource disk(double disk);
    
    /**
     * Create network resource specification
     * @param bandwidth - Network bandwidth in Mbps
     * @return Mesos Resource for network allocation
     */
    public static Protos.Resource network(double bandwidth);
    
    /**
     * Create port range resource specification
     * @param begin - Start of port range (inclusive)
     * @param end - End of port range (inclusive)
     * @return Mesos Resource for port range allocation
     */
    public static Protos.Resource ports(long begin, long end);
    
    /**
     * Create environment variable for Mesos tasks
     * @param key - Environment variable name
     * @param value - Environment variable value
     * @return Mesos Environment.Variable object
     */
    public static Protos.Environment.Variable variable(String key, String value);
    
    /**
     * Create URI specification for Mesos Fetcher
     * @param uri - URI to fetch
     * @param extract - Whether to extract archives
     * @param executable - Whether file should be executable
     * @param cache - Whether to cache the URI
     * @param outputFile - Optional output filename
     * @return Mesos CommandInfo.URI object
     */
    public static Protos.CommandInfo.URI uri(String uri, 
                                             boolean extract, 
                                             boolean executable, 
                                             boolean cache, 
                                             String outputFile);
    
    /**
     * Convert range values to string representation
     * @param ranges - List of Value.Range objects
     * @return String representation of ranges (e.g., "8000-8010,9000-9010")
     */
    public static String rangeValues(List<Protos.Value.Range> ranges);
    
    /**
     * Convert Mesos resource to string representation
     * @param resource - Mesos Resource object
     * @return Human-readable string representation
     */
    public static String toString(Protos.Resource resource);
}

Protobuf Utilities Example:

import org.apache.flink.mesos.Utils;
import org.apache.mesos.Protos;

// Create resource specifications
Protos.Resource cpuResource = Utils.cpus(2.0);
Protos.Resource memResource = Utils.mem(2048.0);
Protos.Resource diskResource = Utils.disk(1024.0);
Protos.Resource portsResource = Utils.ports(8000, 8010);

// Create environment variables
Protos.Environment.Variable javaHome = Utils.variable("JAVA_HOME", "/usr/lib/jvm/java-8");
Protos.Environment.Variable flinkHome = Utils.variable("FLINK_HOME", "/opt/flink");

// Create URIs for Mesos Fetcher
Protos.CommandInfo.URI jobJarUri = Utils.uri(
    "http://artifact-server:8080/job.jar",
    false, // don't extract
    false, // not executable
    true,  // cache
    "lib/job.jar" // output filename
);

Protos.CommandInfo.URI configUri = Utils.uri(
    "http://artifact-server:8080/flink-conf.yaml",
    false, false, true, "conf/flink-conf.yaml"
);

// Use in TaskInfo creation
Protos.TaskInfo taskInfo = Protos.TaskInfo.newBuilder()
    .addAllResources(Arrays.asList(cpuResource, memResource, diskResource, portsResource))
    .setCommand(Protos.CommandInfo.newBuilder()
        .addAllUris(Arrays.asList(jobJarUri, configUri))
        .setEnvironment(Protos.Environment.newBuilder()
            .addAllVariables(Arrays.asList(javaHome, flinkHome))
        )
    )
    .build();

Resource Allocation Utilities

Utilities for managing Mesos resource allocations and calculations.

/**
 * Represents allocated Mesos resources for a task
 * Provides resource information and allocation details
 */
public class MesosResourceAllocation {
    /**
     * Create resource allocation from Mesos resources
     * @param resources - List of allocated Mesos resources
     */
    public MesosResourceAllocation(List<Protos.Resource> resources);
    
    /**
     * Get allocated CPU cores
     * @return Number of CPU cores allocated
     */
    public double cpus();
    
    /**
     * Get allocated memory in megabytes
     * @return Memory allocation in MB
     */
    public double memoryMB();
    
    /**
     * Get allocated disk space in megabytes
     * @return Disk allocation in MB
     */
    public double diskMB();
    
    /**
     * Get allocated network bandwidth in Mbps
     * @return Network bandwidth allocation
     */
    public double networkMbps();
    
    /**
     * Get allocated GPU units
     * @return Number of GPUs allocated
     */
    public double gpus();
    
    /**
     * Get allocated port ranges
     * @return List of allocated port ranges
     */
    public List<Protos.Value.Range> ports();
    
    /**
     * Get all allocated Mesos resources
     * @return Complete list of Mesos Resource objects
     */
    public List<Protos.Resource> mesosResources();
    
    /**
     * Check if allocation satisfies resource requirements
     * @param requirements - Required resource amounts
     * @return true if allocation meets or exceeds requirements
     */
    public boolean satisfies(ResourceProfile requirements);
}

Utility Patterns

Configuration Validation

Comprehensive validation of Mesos configurations:

// Configuration validation utility
public class MesosConfigValidator {
    public static void validateConfiguration(Configuration config) {
        // Validate required settings
        if (!config.contains(MesosOptions.MASTER_URL)) {
            throw new IllegalArgumentException("Mesos master URL is required");
        }
        
        // Validate resource requirements
        double cpus = config.getDouble("mesos.resourcemanager.tasks.cpus", 1.0);
        if (cpus <= 0) {
            throw new IllegalArgumentException("CPU requirement must be positive");
        }
        
        // Validate framework settings
        String frameworkName = config.getString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME);
        if (frameworkName == null || frameworkName.trim().isEmpty()) {
            throw new IllegalArgumentException("Framework name cannot be empty");
        }
        
        // Validate timeout settings
        int failoverTimeout = config.getInteger(MesosOptions.FAILOVER_TIMEOUT_SECONDS);
        if (failoverTimeout < 0) {
            throw new IllegalArgumentException("Failover timeout cannot be negative");
        }
    }
}

Resource Calculation

Helper methods for resource requirement calculations:

// Resource calculation utilities  
public class ResourceCalculator {
    public static ResourceProfile calculateTaskManagerProfile(Configuration config) {
        // Calculate memory requirements
        MemorySize processMemory = TaskExecutorResourceUtils.extractTotalProcessMemoryConfiguration(config);
        MemorySize managedMemory = TaskExecutorResourceUtils.extractManagedMemoryConfiguration(config);
        
        // Calculate CPU requirements
        double cpuCores = config.getDouble("taskmanager.numberOfTaskSlots", 1.0);
        
        // Calculate disk requirements  
        long diskSize = config.getLong("mesos.resourcemanager.tasks.disk", 1024);
        
        return ResourceProfile.newBuilder()
            .setCpuCores(cpuCores)
            .setTaskHeapMemory(processMemory)
            .setManagedMemory(managedMemory)
            .build();
    }
    
    public static boolean isResourceSufficient(ResourceProfile required, 
                                               MesosResourceAllocation available) {
        return available.cpus() >= required.getCpuCores().getValue() &&
               available.memoryMB() >= required.getTotalMemory().getMebiBytes() &&
               available.diskMB() >= 1024; // Minimum disk requirement
    }
}

Container Image Management

Utilities for Docker container image handling:

// Container image utilities
public class ContainerImageUtils {
    public static String resolveImageName(Configuration config) {
        String imageName = config.getString("mesos.resourcemanager.tasks.container.docker.image");
        
        if (imageName == null) {
            // Default image based on Flink version
            String flinkVersion = config.getString("flink.version", "1.13.6");
            String scalaVersion = config.getString("scala.version", "2.11");
            imageName = String.format("flink:%s-scala_%s", flinkVersion, scalaVersion);
        }
        
        return imageName;
    }
    
    public static List<String> buildDockerParameters(Configuration config) {
        List<String> parameters = new ArrayList<>();
        
        // Network configuration
        String network = config.getString("mesos.resourcemanager.tasks.container.docker.network", "HOST");
        parameters.add("--net=" + network);
        
        // Volume mounts
        String volumes = config.getString("mesos.resourcemanager.tasks.container.volumes", "");
        for (String volume : volumes.split(",")) {
            if (!volume.trim().isEmpty()) {
                parameters.add("-v");
                parameters.add(volume.trim());
            }
        }
        
        return parameters;
    }
}

Error Handling

Robust Error Handling

Comprehensive error handling patterns for utility operations:

  • Configuration errors: Clear validation messages with suggestions
  • Resource allocation failures: Detailed resource requirement analysis
  • Network failures: Retry mechanisms with exponential backoff
  • File system errors: Graceful degradation and alternative paths

Logging and Debugging

Enhanced logging for troubleshooting:

// Enhanced logging utilities
public class MesosLoggingUtils {
    public static void logResourceAllocation(Logger logger, MesosResourceAllocation allocation) {
        logger.info("Resource allocation: CPU={}, Memory={}MB, Disk={}MB, GPU={}", 
                   allocation.cpus(), allocation.memoryMB(), 
                   allocation.diskMB(), allocation.gpus());
    }
    
    public static void logConfigurationSummary(Logger logger, Configuration config) {
        logger.info("Mesos configuration summary:");
        logger.info("  Master URL: {}", config.getString(MesosOptions.MASTER_URL));
        logger.info("  Framework: {}", config.getString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
        logger.info("  Resources: CPU={}, Memory={}", 
                   config.getDouble("mesos.resourcemanager.tasks.cpus", 1.0),
                   config.getString("taskmanager.memory.process.size", "1g"));
    }
}

Performance Optimization

Caching Strategies

Efficient caching for expensive operations:

  • Configuration parsing: Cache parsed configurations
  • Resource calculations: Memoize resource requirement calculations
  • Network operations: Cache artifact server connections
  • Protobuf objects: Reuse commonly created protobuf objects

Connection Management

Optimal connection handling for external services:

  • HTTP connection pooling: Reuse connections for artifact distribution
  • ZooKeeper session management: Persistent sessions with reconnection
  • Mesos master connections: Connection pooling and load balancing

Deprecation Notice

All utility classes are deprecated as of Flink 1.13. Migration guidance:

  • Kubernetes utilities: Use org.apache.flink.kubernetes.utils.*
  • YARN utilities: Use org.apache.flink.yarn.utils.*
  • Generic utilities: Use org.apache.flink.runtime.util.*

Types

/**
 * SSL configuration for artifact server
 */
public class SSLConfiguration {
    public String getKeystorePath();
    public String getKeystorePassword();
    public String getTruststorePath();
    public String getTruststorePassword();
    public boolean isClientAuthRequired();
}

/**
 * Container specification for TaskManager deployment
 */
public class ContainerSpecification {
    public String getImageName();
    public ContainerType getType();
    public Map<String, String> getEnvironmentVariables();
    public List<VolumeMount> getVolumeMounts();
    public List<String> getCommand();
    public ResourceProfile getResourceProfile();
}

/**
 * Volume mount specification
 */
public class VolumeMount {
    public String getHostPath();
    public String getContainerPath();
    public MountMode getMode(); // READ_ONLY, READ_WRITE
}

/**
 * Artifact distribution statistics
 */
public class ArtifactStats {
    public int getTotalArtifacts();
    public long getTotalSizeBytes();
    public int getDownloadCount();
    public double getAverageDownloadTime();
    public List<String> getMostRequestedArtifacts();
}