Apache Flink client library providing programmatic APIs and command-line interfaces for submitting, managing, and monitoring Flink jobs.
—
Core interface for programmatic cluster interaction, supporting job submission, status monitoring, and cluster lifecycle management across different deployment targets including standalone, REST-based, and mini clusters.
Main interface for communicating with Flink clusters, providing comprehensive job and cluster management capabilities.
/**
* Interface for cluster clients that communicate with Flink clusters
* @param <T> Type of cluster identifier
*/
public interface ClusterClient<T> extends AutoCloseable {
/**
* Close the cluster client and release resources
*/
@Override
void close();
/**
* Get the cluster identifier
* @return Cluster ID identifying the connected cluster
*/
T getClusterId();
/**
* Get the Flink configuration used by this client
* @return Flink configuration object
*/
Configuration getFlinkConfiguration();
/**
* Shut down the cluster that this client communicates with
*/
void shutDownCluster();
/**
* Get URL to the cluster web interface
* @return Web interface URL as string
*/
String getWebInterfaceURL();
/**
* List all jobs on the cluster (running and finished)
* @return Future collection of job status messages
* @throws Exception if connection fails
*/
CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;
/**
* Submit an execution plan to the cluster
* @param executionPlan Execution plan to submit
* @return Future with assigned job ID
*/
CompletableFuture<JobID> submitJob(ExecutionPlan executionPlan);
/**
* Get status of a specific job
* @param jobId ID of the job to query
* @return Future with job status
*/
CompletableFuture<JobStatus> getJobStatus(JobID jobId);
/**
* Cancel a running job
* @param jobId ID of the job to cancel
* @return Future with acknowledgment
*/
CompletableFuture<Acknowledge> cancel(JobID jobId);
/**
* Stop a job with optional savepoint
* @param jobId ID of the job to stop
* @param advanceToEndOfEventTime Whether to advance to end of event time
* @param savepointDir Optional savepoint directory
* @param formatType Optional savepoint format type
* @return Future with savepoint path
*/
CompletableFuture<String> stopWithSavepoint(
JobID jobId,
boolean advanceToEndOfEventTime,
@Nullable String savepointDir,
SavepointFormatType formatType
);
/**
* Trigger a savepoint for a running job
* @param jobId ID of the job
* @param savepointDir Target directory for savepoint
* @param formatType Savepoint format type
* @return Future with savepoint path
*/
CompletableFuture<String> triggerSavepoint(
JobID jobId,
@Nullable String savepointDir,
SavepointFormatType formatType
);
/**
* Dispose a savepoint
* @param savepointPath Path to the savepoint to dispose
* @return Future with acknowledgment
* @throws FlinkException if disposal fails
*/
CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;
/**
* Cancel a job and trigger a savepoint
* @param jobId ID of the job to cancel
* @param savepointDirectory Directory for the savepoint
* @param formatType Savepoint format type
* @return Future with savepoint path
*/
CompletableFuture<String> cancelWithSavepoint(
JobID jobId,
@Nullable String savepointDirectory,
SavepointFormatType formatType
);
/**
* Trigger a detached savepoint (returns quickly with trigger ID)
* @param jobId ID of the job
* @param savepointDirectory Target directory for savepoint
* @param formatType Savepoint format type
* @return Future with savepoint trigger ID
*/
CompletableFuture<String> triggerDetachedSavepoint(
JobID jobId,
@Nullable String savepointDirectory,
SavepointFormatType formatType
);
/**
* Trigger a checkpoint for a job
* @param jobId ID of the job
* @param checkpointType Type of checkpoint (configured/full/incremental)
* @return Future with checkpoint ID
*/
CompletableFuture<Long> triggerCheckpoint(JobID jobId, CheckpointType checkpointType);
/**
* Get accumulators for a job
* @param jobID Job identifier
* @param loader Class loader for deserializing results
* @return Future with accumulator map
*/
CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
}Usage Examples:
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
// Create and use a REST cluster client
Configuration config = new Configuration();
config.setString(RestOptions.ADDRESS, "localhost");
config.setInteger(RestOptions.PORT, 8081);
RestClusterClientConfiguration clientConfig =
RestClusterClientConfiguration.fromConfiguration(config);
try (ClusterClient<?> client = new RestClusterClient<>(clientConfig, "default")) {
// List all jobs
Collection<JobStatusMessage> jobs = client.listJobs().get();
System.out.println("Found " + jobs.size() + " jobs");
// Get job status
JobID jobId = JobID.fromHexString("a1b2c3d4e5f6");
JobStatus status = client.getJobStatus(jobId).get();
System.out.println("Job status: " + status);
// Trigger savepoint
String savepointPath = client.triggerSavepoint(
jobId,
"/path/to/savepoints",
SavepointFormatType.CANONICAL
).get();
System.out.println("Savepoint created: " + savepointPath);
}REST-based implementation of cluster client for communication with Flink clusters via REST API.
/**
* REST-based cluster client implementation
*/
public class RestClusterClient<T> implements ClusterClient<T> {
/**
* Create REST cluster client with configuration and cluster ID
* @param configuration REST client configuration
* @param clusterId Cluster identifier
*/
public RestClusterClient(
RestClusterClientConfiguration configuration,
T clusterId
);
/**
* Create REST cluster client with configuration, cluster ID, and thread pool size
* @param configuration REST client configuration
* @param clusterId Cluster identifier
* @param maxRetryAttempts Maximum retry attempts for requests
*/
public RestClusterClient(
RestClusterClientConfiguration configuration,
T clusterId,
int maxRetryAttempts
);
}
/**
* Configuration for REST cluster clients
*/
public class RestClusterClientConfiguration {
/**
* Create configuration from Flink configuration
* @param config Flink configuration
* @return REST client configuration
*/
public static RestClusterClientConfiguration fromConfiguration(Configuration config);
/**
* Get REST server endpoint
* @return REST endpoint configuration
*/
public RestServerEndpointConfiguration getRestServerEndpointConfiguration();
/**
* Get executor service for async operations
* @return Scheduled executor service
*/
public ScheduledExecutorService getExecutorService();
}Cluster client implementation for mini clusters used in testing and local development.
/**
* Cluster client for mini clusters
*/
public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniClusterId> {
/**
* Create mini cluster client
* @param configuration Flink configuration
* @param miniCluster Mini cluster instance
*/
public MiniClusterClient(Configuration configuration, MiniCluster miniCluster);
/**
* Cluster ID type for mini clusters
*/
public static class MiniClusterId {
public MiniClusterId();
}
}Provider interface for cluster clients, enabling lazy creation and resource management.
/**
* Provider interface for cluster clients
* @param <T> Type of cluster identifier
*/
public interface ClusterClientProvider<T> extends AutoCloseable {
/**
* Get the cluster client instance
* @return Cluster client
*/
ClusterClient<T> getClusterClient();
/**
* Get the cluster ID
* @return Cluster identifier
*/
T getClusterId();
/**
* Close the provider and release resources
*/
@Override
void close() throws Exception;
}Adapter that bridges between cluster client and job client interfaces for unified job management.
/**
* Adapter between cluster client and job client interfaces
* @param <T> Type of cluster identifier
*/
public class ClusterClientJobClientAdapter<T> implements JobClient {
/**
* Create adapter with cluster client and job ID
* @param clusterClient Cluster client instance
* @param jobID Job identifier
* @param userCodeClassLoader User code class loader
*/
public ClusterClientJobClientAdapter(
ClusterClientProvider<T> clusterClient,
JobID jobID,
ClassLoader userCodeClassLoader
);
@Override
public JobID getJobID();
@Override
public CompletableFuture<JobStatus> getJobStatus();
@Override
public CompletableFuture<Void> cancel();
@Override
public CompletableFuture<String> stopWithSavepoint(
boolean advanceToEndOfEventTime,
@Nullable String savepointDir,
SavepointFormatType formatType
);
@Override
public CompletableFuture<String> triggerSavepoint(
@Nullable String savepointDir,
SavepointFormatType formatType
);
@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult();
}Configurable retry mechanisms for REST operations with exponential backoff support.
/**
* Interface for wait strategies in retry logic
*/
public interface WaitStrategy {
/**
* Calculate wait time for given attempt
* @param attempt Current attempt number (starting from 1)
* @return Wait duration
*/
Duration calculateWaitTime(int attempt);
}
/**
* Exponential backoff wait strategy
*/
public class ExponentialWaitStrategy implements WaitStrategy {
/**
* Create exponential wait strategy
* @param initialWait Initial wait duration
* @param maxWait Maximum wait duration
* @param multiplier Backoff multiplier
*/
public ExponentialWaitStrategy(
Duration initialWait,
Duration maxWait,
double multiplier
);
@Override
public Duration calculateWaitTime(int attempt);
}/**
* Job status message containing job information
*/
public class JobStatusMessage {
/**
* Get job ID
* @return Job identifier
*/
public JobID getJobId();
/**
* Get job name
* @return Job name
*/
public String getJobName();
/**
* Get current job state
* @return Job status enum
*/
public JobStatus getJobState();
/**
* Get job start time
* @return Start time as timestamp
*/
public long getStartTime();
}
/**
* Job execution result containing final job information
*/
public class JobExecutionResult {
public JobID getJobID();
public long getNetRuntime();
public Map<String, Object> getAllAccumulatorResults();
}Cluster client operations handle various error conditions:
All asynchronous operations return CompletableFuture instances that can be composed and handle exceptions appropriately using handle(), exceptionally(), or whenComplete() methods.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-clients