CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-clients

Apache Flink client library providing programmatic APIs and command-line interfaces for submitting, managing, and monitoring Flink jobs.

Pending
Overview
Eval results
Files

cluster-client.mddocs/

Cluster Client Management

Core interface for programmatic cluster interaction, supporting job submission, status monitoring, and cluster lifecycle management across different deployment targets including standalone, REST-based, and mini clusters.

Capabilities

Cluster Client Interface

Main interface for communicating with Flink clusters, providing comprehensive job and cluster management capabilities.

/**
 * Interface for cluster clients that communicate with Flink clusters
 * @param <T> Type of cluster identifier
 */
public interface ClusterClient<T> extends AutoCloseable {
    /**
     * Close the cluster client and release resources
     */
    @Override
    void close();
    
    /**
     * Get the cluster identifier
     * @return Cluster ID identifying the connected cluster
     */
    T getClusterId();
    
    /**
     * Get the Flink configuration used by this client
     * @return Flink configuration object
     */
    Configuration getFlinkConfiguration();
    
    /**
     * Shut down the cluster that this client communicates with
     */
    void shutDownCluster();
    
    /**
     * Get URL to the cluster web interface
     * @return Web interface URL as string
     */
    String getWebInterfaceURL();
    
    /**
     * List all jobs on the cluster (running and finished)
     * @return Future collection of job status messages
     * @throws Exception if connection fails
     */
    CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;
    
    /**
     * Submit an execution plan to the cluster
     * @param executionPlan Execution plan to submit
     * @return Future with assigned job ID
     */
    CompletableFuture<JobID> submitJob(ExecutionPlan executionPlan);
    
    /**
     * Get status of a specific job
     * @param jobId ID of the job to query
     * @return Future with job status
     */
    CompletableFuture<JobStatus> getJobStatus(JobID jobId);
    
    /**
     * Cancel a running job
     * @param jobId ID of the job to cancel
     * @return Future with acknowledgment
     */
    CompletableFuture<Acknowledge> cancel(JobID jobId);
    
    /**
     * Stop a job with optional savepoint
     * @param jobId ID of the job to stop
     * @param advanceToEndOfEventTime Whether to advance to end of event time
     * @param savepointDir Optional savepoint directory
     * @param formatType Optional savepoint format type
     * @return Future with savepoint path
     */
    CompletableFuture<String> stopWithSavepoint(
        JobID jobId,
        boolean advanceToEndOfEventTime,
        @Nullable String savepointDir,
        SavepointFormatType formatType
    );
    
    /**
     * Trigger a savepoint for a running job
     * @param jobId ID of the job
     * @param savepointDir Target directory for savepoint
     * @param formatType Savepoint format type
     * @return Future with savepoint path
     */
    CompletableFuture<String> triggerSavepoint(
        JobID jobId,
        @Nullable String savepointDir,
        SavepointFormatType formatType
    );
    
    /**
     * Dispose a savepoint
     * @param savepointPath Path to the savepoint to dispose
     * @return Future with acknowledgment
     * @throws FlinkException if disposal fails
     */
    CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;
    
    /**
     * Cancel a job and trigger a savepoint
     * @param jobId ID of the job to cancel
     * @param savepointDirectory Directory for the savepoint
     * @param formatType Savepoint format type
     * @return Future with savepoint path
     */
    CompletableFuture<String> cancelWithSavepoint(
        JobID jobId, 
        @Nullable String savepointDirectory, 
        SavepointFormatType formatType
    );
    
    /**
     * Trigger a detached savepoint (returns quickly with trigger ID)
     * @param jobId ID of the job
     * @param savepointDirectory Target directory for savepoint
     * @param formatType Savepoint format type
     * @return Future with savepoint trigger ID
     */
    CompletableFuture<String> triggerDetachedSavepoint(
        JobID jobId, 
        @Nullable String savepointDirectory, 
        SavepointFormatType formatType
    );
    
    /**
     * Trigger a checkpoint for a job
     * @param jobId ID of the job
     * @param checkpointType Type of checkpoint (configured/full/incremental)
     * @return Future with checkpoint ID
     */
    CompletableFuture<Long> triggerCheckpoint(JobID jobId, CheckpointType checkpointType);
    
    /**
     * Get accumulators for a job
     * @param jobID Job identifier
     * @param loader Class loader for deserializing results
     * @return Future with accumulator map
     */
    CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
}

Usage Examples:

import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;

// Create and use a REST cluster client
Configuration config = new Configuration();
config.setString(RestOptions.ADDRESS, "localhost");
config.setInteger(RestOptions.PORT, 8081);

RestClusterClientConfiguration clientConfig = 
    RestClusterClientConfiguration.fromConfiguration(config);

try (ClusterClient<?> client = new RestClusterClient<>(clientConfig, "default")) {
    // List all jobs
    Collection<JobStatusMessage> jobs = client.listJobs().get();
    System.out.println("Found " + jobs.size() + " jobs");
    
    // Get job status
    JobID jobId = JobID.fromHexString("a1b2c3d4e5f6");
    JobStatus status = client.getJobStatus(jobId).get();
    System.out.println("Job status: " + status);
    
    // Trigger savepoint
    String savepointPath = client.triggerSavepoint(
        jobId, 
        "/path/to/savepoints",
        SavepointFormatType.CANONICAL
    ).get();
    System.out.println("Savepoint created: " + savepointPath);
}

REST Cluster Client

REST-based implementation of cluster client for communication with Flink clusters via REST API.

/**
 * REST-based cluster client implementation
 */
public class RestClusterClient<T> implements ClusterClient<T> {
    /**
     * Create REST cluster client with configuration and cluster ID
     * @param configuration REST client configuration
     * @param clusterId Cluster identifier
     */
    public RestClusterClient(
        RestClusterClientConfiguration configuration, 
        T clusterId
    );
    
    /**
     * Create REST cluster client with configuration, cluster ID, and thread pool size
     * @param configuration REST client configuration
     * @param clusterId Cluster identifier
     * @param maxRetryAttempts Maximum retry attempts for requests
     */
    public RestClusterClient(
        RestClusterClientConfiguration configuration,
        T clusterId,
        int maxRetryAttempts
    );
}

/**
 * Configuration for REST cluster clients
 */
public class RestClusterClientConfiguration {
    /**
     * Create configuration from Flink configuration
     * @param config Flink configuration
     * @return REST client configuration
     */
    public static RestClusterClientConfiguration fromConfiguration(Configuration config);
    
    /**
     * Get REST server endpoint
     * @return REST endpoint configuration
     */
    public RestServerEndpointConfiguration getRestServerEndpointConfiguration();
    
    /**
     * Get executor service for async operations
     * @return Scheduled executor service
     */
    public ScheduledExecutorService getExecutorService();
}

Mini Cluster Client

Cluster client implementation for mini clusters used in testing and local development.

/**
 * Cluster client for mini clusters
 */
public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniClusterId> {
    /**
     * Create mini cluster client
     * @param configuration Flink configuration
     * @param miniCluster Mini cluster instance
     */
    public MiniClusterClient(Configuration configuration, MiniCluster miniCluster);
    
    /**
     * Cluster ID type for mini clusters
     */
    public static class MiniClusterId {
        public MiniClusterId();
    }
}

Cluster Client Provider

Provider interface for cluster clients, enabling lazy creation and resource management.

/**
 * Provider interface for cluster clients
 * @param <T> Type of cluster identifier
 */
public interface ClusterClientProvider<T> extends AutoCloseable {
    /**
     * Get the cluster client instance
     * @return Cluster client
     */
    ClusterClient<T> getClusterClient();
    
    /**
     * Get the cluster ID
     * @return Cluster identifier
     */
    T getClusterId();
    
    /**
     * Close the provider and release resources
     */
    @Override
    void close() throws Exception;
}

Cluster Client Job Client Adapter

Adapter that bridges between cluster client and job client interfaces for unified job management.

/**
 * Adapter between cluster client and job client interfaces
 * @param <T> Type of cluster identifier
 */
public class ClusterClientJobClientAdapter<T> implements JobClient {
    /**
     * Create adapter with cluster client and job ID
     * @param clusterClient Cluster client instance
     * @param jobID Job identifier
     * @param userCodeClassLoader User code class loader
     */
    public ClusterClientJobClientAdapter(
        ClusterClientProvider<T> clusterClient,
        JobID jobID,
        ClassLoader userCodeClassLoader
    );
    
    @Override
    public JobID getJobID();
    
    @Override
    public CompletableFuture<JobStatus> getJobStatus();
    
    @Override
    public CompletableFuture<Void> cancel();
    
    @Override
    public CompletableFuture<String> stopWithSavepoint(
        boolean advanceToEndOfEventTime,
        @Nullable String savepointDir,
        SavepointFormatType formatType
    );
    
    @Override
    public CompletableFuture<String> triggerSavepoint(
        @Nullable String savepointDir,
        SavepointFormatType formatType
    );
    
    @Override
    public CompletableFuture<JobExecutionResult> getJobExecutionResult();
}

Retry Strategies

Configurable retry mechanisms for REST operations with exponential backoff support.

/**
 * Interface for wait strategies in retry logic
 */
public interface WaitStrategy {
    /**
     * Calculate wait time for given attempt
     * @param attempt Current attempt number (starting from 1)
     * @return Wait duration
     */
    Duration calculateWaitTime(int attempt);
}

/**
 * Exponential backoff wait strategy
 */
public class ExponentialWaitStrategy implements WaitStrategy {
    /**
     * Create exponential wait strategy
     * @param initialWait Initial wait duration
     * @param maxWait Maximum wait duration
     * @param multiplier Backoff multiplier
     */
    public ExponentialWaitStrategy(
        Duration initialWait,
        Duration maxWait,
        double multiplier
    );
    
    @Override
    public Duration calculateWaitTime(int attempt);
}

Types

/**
 * Job status message containing job information
 */
public class JobStatusMessage {
    /**
     * Get job ID
     * @return Job identifier
     */
    public JobID getJobId();
    
    /**
     * Get job name
     * @return Job name
     */
    public String getJobName();
    
    /**
     * Get current job state
     * @return Job status enum
     */
    public JobStatus getJobState();
    
    /**
     * Get job start time
     * @return Start time as timestamp
     */
    public long getStartTime();
}

/**
 * Job execution result containing final job information
 */
public class JobExecutionResult {
    public JobID getJobID();
    public long getNetRuntime();
    public Map<String, Object> getAllAccumulatorResults();
}

Error Handling

Cluster client operations handle various error conditions:

  • Connection Errors: Network failures, cluster unavailability
  • Authentication Errors: Security configuration issues
  • Job Errors: Invalid job submissions, job not found
  • Resource Errors: Insufficient cluster resources
  • Timeout Errors: Long-running operations exceeding timeouts

All asynchronous operations return CompletableFuture instances that can be composed and handle exceptions appropriately using handle(), exceptionally(), or whenComplete() methods.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-clients

docs

application-mode.md

artifact-management.md

cli-frontend.md

cluster-client.md

deployment-management.md

index.md

program-packaging.md

tile.json