CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-clients

Apache Flink client library providing programmatic APIs and command-line interfaces for submitting, managing, and monitoring Flink jobs.

Pending
Overview
Eval results
Files

deployment-management.mddocs/

Deployment Management

Cluster deployment abstraction supporting multiple deployment targets including standalone, YARN, Kubernetes, and other containerized environments with pluggable factory pattern and resource specification.

Capabilities

Cluster Descriptor Interface

Core interface for cluster descriptors that manage cluster lifecycle including deployment, retrieval, and termination.

/**
 * Interface for cluster descriptors that manage cluster lifecycle
 * @param <T> Type of cluster identifier
 */
public interface ClusterDescriptor<T> extends AutoCloseable {
    /**
     * Retrieve an existing cluster by ID
     * @param clusterId Cluster identifier
     * @return Cluster client provider for the existing cluster
     * @throws ClusterRetrieveException if cluster cannot be retrieved
     */
    ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException;
    
    /**
     * Deploy a new session cluster
     * @param clusterSpecification Resource specification for the cluster
     * @return Cluster client provider for the new cluster
     * @throws ClusterDeploymentException if deployment fails
     */
    ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification) 
        throws ClusterDeploymentException;
    
    /**
     * Deploy an application cluster (application mode)
     * @param clusterSpecification Resource specification for the cluster
     * @param applicationConfiguration Application configuration
     * @return Cluster client provider for the application cluster
     * @throws ClusterDeploymentException if deployment fails
     */
    default ClusterClientProvider<T> deployApplicationCluster(
        ClusterSpecification clusterSpecification,
        ApplicationConfiguration applicationConfiguration
    ) throws ClusterDeploymentException {
        throw new UnsupportedOperationException(
            "Application mode is not supported by this cluster descriptor."
        );
    }
    
    /**
     * Terminate the given cluster
     * @param clusterId Cluster identifier
     * @throws FlinkException if termination fails
     */
    void terminateCluster(T clusterId) throws FlinkException;
    
    /**
     * Close the cluster descriptor and release resources
     */
    @Override
    void close() throws Exception;
}

Usage Examples:

import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.deployment.StandaloneClusterId;

// Create cluster specification
ClusterSpecification spec = new ClusterSpecification.Builder()
    .setMasterMemoryMB(1024)
    .setTaskManagerMemoryMB(2048)
    .setNumberTaskManagers(2)
    .createClusterSpecification();

// Deploy session cluster
try (ClusterDescriptor<StandaloneClusterId> descriptor = 
         new StandaloneClusterDescriptor(config, highAvailabilityServices, rpcService)) {
    
    ClusterClientProvider<StandaloneClusterId> provider = 
        descriptor.deploySessionCluster(spec);
    
    try (ClusterClient<StandaloneClusterId> client = provider.getClusterClient()) {
        System.out.println("Cluster deployed: " + client.getClusterId());
        System.out.println("Web UI: " + client.getWebInterfaceURL());
        
        // Use cluster...
        
    } finally {
        provider.close();
    }
}

Cluster Client Factory

Factory interface for creating cluster clients with automatic deployment target detection based on configuration.

/**
 * Factory for creating cluster clients
 * @param <T> Type of cluster identifier
 */
public interface ClusterClientFactory<T> {
    /**
     * Check if this factory is compatible with the given configuration
     * @param configuration Flink configuration
     * @return true if compatible
     */
    boolean isCompatibleWith(Configuration configuration);
    
    /**
     * Create cluster descriptor from configuration
     * @param configuration Flink configuration
     * @return Cluster descriptor instance
     */
    ClusterDescriptor<T> createClusterDescriptor(Configuration configuration);
    
    /**
     * Get cluster ID from configuration
     * @param configuration Flink configuration
     * @return Cluster identifier or null if not specified
     */
    @Nullable
    T getClusterId(Configuration configuration);
    
    /**
     * Get cluster specification from configuration
     * @param configuration Flink configuration
     * @return Cluster specification
     */
    ClusterSpecification getClusterSpecification(Configuration configuration);
}

Cluster Client Service Loader

Service loader for discovering and loading cluster client factories dynamically.

/**
 * Service loader for cluster client factories
 */
public interface ClusterClientServiceLoader {
    /**
     * Get cluster client factory compatible with configuration
     * @param configuration Flink configuration
     * @return Compatible cluster client factory
     * @throws UnsupportedOperationException if no compatible factory found
     */
    <T> ClusterClientFactory<T> getClusterClientFactory(Configuration configuration);
}

/**
 * Default implementation of cluster client service loader
 */
public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {
    /**
     * Create service loader instance
     */
    public DefaultClusterClientServiceLoader();
    
    @Override
    public <T> ClusterClientFactory<T> getClusterClientFactory(Configuration configuration);
}

Standalone Deployment

Implementations for standalone cluster deployment and management.

/**
 * Cluster descriptor for standalone clusters
 */
public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {
    /**
     * Create standalone cluster descriptor
     * @param flinkConfig Flink configuration
     * @param haServices High availability services
     * @param rpcService RPC service
     */
    public StandaloneClusterDescriptor(
        Configuration flinkConfig,
        HighAvailabilityServices haServices,
        RpcService rpcService
    );
    
    @Override
    public ClusterClientProvider<StandaloneClusterId> retrieve(StandaloneClusterId clusterId)
        throws ClusterRetrieveException;
    
    @Override
    public ClusterClientProvider<StandaloneClusterId> deploySessionCluster(
        ClusterSpecification clusterSpecification
    ) throws ClusterDeploymentException;
}

/**
 * Factory for standalone cluster clients
 */
public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
    @Override
    public boolean isCompatibleWith(Configuration configuration);
    
    @Override
    public ClusterDescriptor<StandaloneClusterId> createClusterDescriptor(
        Configuration configuration
    );
    
    @Override
    public StandaloneClusterId getClusterId(Configuration configuration);
}

/**
 * Cluster ID for standalone clusters
 */
public class StandaloneClusterId {
    /**
     * Create standalone cluster ID
     */
    public StandaloneClusterId();
    
    @Override
    public String toString();
    
    @Override
    public boolean equals(Object obj);
    
    @Override
    public int hashCode();
}

Abstract Containerized Factory

Base class for containerized cluster client factories providing common functionality for Docker, Kubernetes, and other container-based deployments.

/**
 * Base factory for containerized cluster clients
 * @param <ClusterID> Type of cluster identifier
 * @param <ApplicationClusterID> Type of application cluster identifier
 */
public abstract class AbstractContainerizedClusterClientFactory<
    ClusterID, ApplicationClusterID> implements ClusterClientFactory<ClusterID> {
    
    /**
     * Get deployment target for this factory
     * @return Deployment target string
     */
    protected abstract String getDeploymentTargetName();
    
    /**
     * Check if cluster ID is compatible
     * @param clusterId Cluster ID to check
     * @return true if compatible
     */
    protected abstract boolean isCompatibleWith(ClusterID clusterId);
    
    @Override
    public boolean isCompatibleWith(Configuration configuration);
    
    @Override
    public ClusterSpecification getClusterSpecification(Configuration configuration);
}

Cluster Specification

Resource specification for cluster deployment including memory, CPU, and scaling parameters.

/**
 * Specification for cluster resource requirements
 */
public class ClusterSpecification {
    /**
     * Get master memory in MB
     * @return Master memory size
     */
    public int getMasterMemoryMB();
    
    /**
     * Get task manager memory in MB
     * @return Task manager memory size
     */
    public int getTaskManagerMemoryMB();
    
    /**
     * Get number of task managers
     * @return Number of task managers
     */
    public int getNumberTaskManagers();
    
    /**
     * Get slots per task manager
     * @return Number of slots per task manager
     */
    public int getSlotsPerTaskManager();
    
    /**
     * Builder for creating cluster specifications
     */
    public static class Builder {
        /**
         * Set master memory size
         * @param masterMemoryMB Memory in MB
         * @return Builder instance
         */
        public Builder setMasterMemoryMB(int masterMemoryMB);
        
        /**
         * Set task manager memory size
         * @param taskManagerMemoryMB Memory in MB
         * @return Builder instance
         */
        public Builder setTaskManagerMemoryMB(int taskManagerMemoryMB);
        
        /**
         * Set number of task managers
         * @param numberTaskManagers Number of task managers
         * @return Builder instance
         */
        public Builder setNumberTaskManagers(int numberTaskManagers);
        
        /**
         * Set slots per task manager
         * @param slotsPerTaskManager Number of slots
         * @return Builder instance
         */
        public Builder setSlotsPerTaskManager(int slotsPerTaskManager);
        
        /**
         * Create cluster specification
         * @return Configured cluster specification
         */
        public ClusterSpecification createClusterSpecification();
    }
}

Types

/**
 * Provider for cluster clients with resource management
 * @param <T> Type of cluster identifier
 */
public interface ClusterClientProvider<T> extends AutoCloseable {
    /**
     * Get cluster client instance
     * @return Cluster client
     */
    ClusterClient<T> getClusterClient();
    
    /**
     * Get cluster identifier
     * @return Cluster ID
     */
    T getClusterId();
    
    /**
     * Check if cluster is in per-job mode
     * @return true if per-job mode
     */
    default boolean isPerJobMode() {
        return false;
    }
    
    @Override
    void close() throws Exception;
}

Exception Handling

Deployment operations handle various error conditions:

  • Deployment Errors: ClusterDeploymentException for failed cluster deployments
  • Retrieval Errors: ClusterRetrieveException for failed cluster retrieval
  • Configuration Errors: Invalid cluster specifications or missing configuration
  • Resource Errors: Insufficient resources for cluster deployment
  • Network Errors: Communication failures with cluster management systems

Error Handling Examples:

try {
    ClusterClientProvider<StandaloneClusterId> provider = 
        descriptor.deploySessionCluster(spec);
    
    // Use cluster...
    
} catch (ClusterDeploymentException e) {
    System.err.println("Failed to deploy cluster: " + e.getMessage());
    // Handle deployment failure
} catch (ClusterRetrieveException e) {
    System.err.println("Failed to retrieve cluster: " + e.getMessage());
    // Handle retrieval failure
} finally {
    descriptor.close();
}

The deployment management system provides a pluggable architecture that allows Flink to support multiple deployment targets through a consistent interface, enabling seamless switching between different cluster types based on configuration.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-clients

docs

application-mode.md

artifact-management.md

cli-frontend.md

cluster-client.md

deployment-management.md

index.md

program-packaging.md

tile.json