CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-clients-2-10

Apache Flink client library providing APIs and utilities for submitting, monitoring and managing Flink jobs programmatically

Overview
Eval results
Files

cluster-management.mddocs/

Cluster Management

Core functionality for connecting to and managing Flink clusters, including job submission, monitoring, and lifecycle operations.

Capabilities

ClusterClient (Abstract Base Class)

Base class for all cluster clients that communicate with Flink clusters. Provides the core interface for job management operations.

/**
 * Abstract base class for all cluster clients that communicate with Flink clusters.
 * Encapsulates the functionality necessary to submit a program to a remote cluster.
 */
public abstract class ClusterClient {
    /**
     * Creates a instance that submits the programs to the JobManager defined in the configuration.
     * This method will try to resolve the JobManager hostname and throw an exception if that is not possible.
     * @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.
     * @throws Exception when cannot create the high availability services
     */
    public ClusterClient(Configuration flinkConfig) throws Exception;
    
    /**
     * Creates a instance that submits the programs to the JobManager defined in the configuration.
     * This method will try to resolve the JobManager hostname and throw an exception if that is not possible.
     * @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.
     * @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval
     */
    public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices);

    // Configuration Methods
    
    /**
     * Shuts down the client. This stops the internal actor system and actors.
     * @throws Exception In case the shutdown did not complete successfully
     */
    public void shutdown() throws Exception;
    
    /**
     * Configures whether the client should print progress updates during the execution to System.out.
     * All updates are logged via the SLF4J loggers regardless of this setting.
     * @param print True to print updates to standard out during execution, false to not print them.
     */
    public void setPrintStatusDuringExecution(boolean print);
    
    /**
     * Returns whether the client will print progress updates during the execution to System.out
     * @return boolean indicating print status
     */
    public boolean getPrintStatusDuringExecution();
    
    /**
     * Gets the current JobManager address (may change in case of a HA setup).
     * @return The address (host and port) of the leading JobManager
     */
    public InetSocketAddress getJobManagerAddress();
    
    /**
     * Return the Flink configuration object
     * @return The Flink configuration object
     */
    public Configuration getFlinkConfiguration();

    // Program Execution Methods
    
    /**
     * General purpose method to run a user jar from the CliFrontend in either blocking or detached mode,
     * depending on whether {@code setDetached(true)} or {@code setDetached(false)}.
     * @param prog The packaged program to execute
     * @param parallelism The parallelism level for job execution  
     * @return The result of the execution
     * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
     *         or if the submission failed. That might be either due to an I/O problem, i.e. the job-manager 
     *         is unreachable, or due to the fact that the parallel execution failed.
     * @throws ProgramMissingJobException Thrown, if the submitted program cannot be executed, because it lacks
     *         a job definition.
     */
    public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException;
    
    /**
     * Runs a program on the Flink cluster to which this client is connected.
     * @param program The job with associated JAR files
     * @param parallelism The parallelism level for job execution
     * @return JobSubmissionResult containing job ID and execution details
     * @throws ProgramInvocationException if job submission fails
     */
    public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException;
    
    /**
     * Runs a program on the Flink cluster to which this client is connected. The call blocks until the
     * execution is complete, and returns afterwards.
     * @param jobWithJars The job with associated JAR files
     * @param parallelism The parallelism level for job execution
     * @param savepointSettings Savepoint restore settings
     * @return JobSubmissionResult containing job ID and execution details
     * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
     * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
     *         or if the submission failed. That might be either due to an I/O problem, i.e. the job-manager 
     *         is unreachable, or due to the fact that the parallel execution failed.
     */
    public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings) throws CompilerException, ProgramInvocationException;
    
    /**
     * Submits a program to the cluster.
     * @param compiledPlan The optimized program plan to submit
     * @param libraries The libraries that contain the program and all dependencies.
     * @param classpaths The classpaths that contain the program and all dependencies.
     * @param classLoader The class-loader to deserialize the job and result.
     * @return JobSubmissionResult containing job ID and execution details
     * @throws ProgramInvocationException if job submission fails
     */
    public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException;
    
    /**
     * Submits a program to the cluster.
     * @param compiledPlan The optimized program plan to submit
     * @param libraries The libraries that contain the program and all dependencies.
     * @param classpaths The classpaths that contain the program and all dependencies.
     * @param classLoader The class-loader to deserialize the job and result.
     * @param savepointSettings Savepoint restore settings
     * @return JobSubmissionResult containing job ID and execution details
     * @throws ProgramInvocationException if job submission fails
     */
    public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException;
    
    /**
     * Submits a JobGraph blocking.
     * @param jobGraph The job graph to execute
     * @param classLoader User code class loader to deserialize the results and errors (may contain custom classes)
     * @return JobExecutionResult with execution details and results
     * @throws ProgramInvocationException if job execution fails
     */
    public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException;
    
    /**
     * Submits a JobGraph detached.
     * @param jobGraph The job graph to execute
     * @param classLoader User code class loader to deserialize the results and errors (may contain custom classes)
     * @return JobSubmissionResult containing job ID
     * @throws ProgramInvocationException if job submission fails
     */
    public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException;

    // Job Management Methods
    
    /**
     * Reattaches to a running from from the supplied job id
     * @param jobID The job id of the job to attach to
     * @return The JobExecutionResult for the jobID
     * @throws JobExecutionException if an error occurs during monitoring the job execution
     */
    public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException;
    
    /**
     * Reattaches to a running job with the given job id.
     * @param jobID The job id of the job to attach to
     * @return The JobExecutionResult for the jobID
     * @throws JobExecutionException if an error occurs during monitoring the job execution
     */
    public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException;
    
    /**
     * Cancels a job identified by the job id.
     * @param jobId the job id
     * @throws Exception In case an error occurred.
     */
    public void cancel(JobID jobId) throws Exception;
    
    /**
     * Stops a program on Flink cluster whose job-manager is configured in this client's configuration.
     * Stopping works only for streaming programs. Be aware, that the program might continue to run for a while after sending the stop command,
     * because after sources stopped to emit data all operators need to finish processing.
     * @param jobId the job ID of the streaming program to stop
     * @throws Exception If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal failed.
     *         That might be due to an I/O problem, ie, the job-manager is unreachable.
     */
    public void stop(final JobID jobId) throws Exception;

    // Accumulator Methods
    
    /**
     * Requests and returns the accumulators for the given job identifier. Accumulators can be requested while a
     * is running or after it has finished. The default class loader is used to deserialize the incoming accumulator results.
     * @param jobID The job identifier of a job.
     * @return A Map containing the accumulator's name and its value.
     */
    public Map<String, Object> getAccumulators(JobID jobID) throws Exception;
    
    /**
     * Requests and returns the accumulators for the given job identifier. Accumulators can be requested while a
     * is running or after it has finished.
     * @param jobID The job identifier of a job.
     * @param loader The class loader for deserializing the accumulator results.
     * @return A Map containing the accumulator's name and its value.
     */
    public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception;

    // Session Management Methods
    
    /**
     * Tells the JobManager to finish the session (job) defined by the given ID.
     * @param jobId The ID that identifies the session.
     */
    public void endSession(JobID jobId) throws Exception;
    
    /**
     * Tells the JobManager to finish the sessions (jobs) defined by the given IDs.
     * @param jobIds The IDs that identify the sessions.
     */
    public void endSessions(List<JobID> jobIds) throws Exception;

    // Static Plan Methods
    
    /**
     * Returns the optimized execution plan of the program as a JSON string.
     * @param compiler The optimizer to use
     * @param prog The program to get the plan for
     * @param parallelism The parallelism to compile the plan for
     * @return String representation of optimized plan as JSON
     * @throws CompilerException if compilation fails
     * @throws ProgramInvocationException if program cannot be invoked
     */
    public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException;
    
    /**
     * Returns the optimized execution plan of the program.
     * @param compiler The optimizer to use
     * @param prog The program to get the plan for
     * @param parallelism The parallelism to compile the plan for
     * @return FlinkPlan optimized plan
     * @throws CompilerException if compilation fails
     * @throws ProgramInvocationException if program cannot be invoked
     */
    public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException;
    
    /**
     * Returns the optimized execution plan of the program.
     * @param compiler The optimizer to use
     * @param p The program plan to optimize
     * @param parallelism The parallelism to compile the plan for
     * @return OptimizedPlan
     * @throws CompilerException if compilation fails
     */
    public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException;

    // Utility Methods
    
    /**
     * Creates a JobGraph from a packaged program and optimized plan.
     * @param prog The packaged program
     * @param optPlan The optimized plan
     * @param savepointSettings Savepoint restore settings
     * @return JobGraph
     * @throws ProgramInvocationException if JobGraph creation fails
     */
    public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException;
    
    /**
     * Returns the ActorGateway of the current job manager leader using the LeaderRetrievalService.
     * @return ActorGateway of the current job manager leader
     * @throws Exception
     */
    public ActorGateway getJobManagerGateway() throws Exception;

    // Client Configuration Methods
    
    /**
     * Set the mode of this client (detached or blocking job execution).
     * @param isDetached If true, the client will submit programs detached via the run method
     */
    public void setDetached(boolean isDetached);
    
    /**
     * A flag to indicate whether this clients submits jobs detached.
     * @return True if the Client submits detached, false otherwise
     */
    public boolean isDetached();

    // Abstract Methods (Must be implemented by subclasses)
    
    /**
     * Blocks until the client has determined that the cluster is ready for Job submission.
     * This is delayed until right before job submission to report any other errors first (e.g. invalid job definitions/errors in the user jar)
     */
    public abstract void waitForClusterToBeReady();
    
    /**
     * Returns an URL (as a string) to the JobManager web interface
     */
    public abstract String getWebInterfaceURL();
    
    /**
     * Returns the latest cluster status, with number of Taskmanagers and slots
     */
    public abstract GetClusterStatusResponse getClusterStatus();
    
    /**
     * Returns a string representation of the cluster.
     */
    public abstract String getClusterIdentifier();
    
    /**
     * The client may define an upper limit on the number of slots to use
     * @return -1 if unknown
     */
    public abstract int getMaxSlots();
    
    /**
     * Returns true if the client already has the user jar and providing it again would result in duplicate uploading of the jar.
     */
    public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles);
}

StandaloneClusterClient

Client implementation for connecting to standalone Flink clusters.

/**
 * Client for connecting to standalone Flink clusters
 */
public class StandaloneClusterClient extends ClusterClient {
    /**
     * Creates a client for connecting to a standalone cluster
     * @param config Configuration containing cluster connection details
     */
    public StandaloneClusterClient(Configuration config);
    
    /**
     * Creates a client with high availability services
     * @param config Configuration containing cluster connection details
     * @param highAvailabilityServices High availability services for cluster coordination
     */
    public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices);
}

Usage Examples:

import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobID;

// Create configuration for cluster connection
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, 6123);

// Create cluster client
StandaloneClusterClient client = new StandaloneClusterClient(config);

// Submit a job
PackagedProgram program = new PackagedProgram(new File("my-job.jar"));
JobSubmissionResult result = client.run(program, 4);
JobID jobId = result.getJobID();

// Monitor job execution
JobExecutionResult executionResult = client.retrieveJob(jobId);
System.out.println("Job execution time: " + executionResult.getNetRuntime() + "ms");

// Get accumulator results
Map<String, Object> accumulators = client.getAccumulators(jobId);
System.out.println("Accumulators: " + accumulators);

// Cancel job if needed
client.cancel(jobId);

// Clean up
client.shutdown();

Job Result Types

Result types returned by cluster operations.

/**
 * Result of job submission operations
 */
public class JobSubmissionResult {
    /**
     * Gets the unique identifier of the submitted job
     * @return JobID of the submitted job
     */
    public JobID getJobID();
    
    /**
     * Checks if this result contains execution details
     * @return true if this is a JobExecutionResult with execution details
     */
    public boolean isJobExecutionResult();
}

/**
 * Extended result containing job execution details and performance metrics
 */
public class JobExecutionResult extends JobSubmissionResult {
    /**
     * Gets the net runtime of the job execution in milliseconds
     * @return Job execution time in milliseconds
     */
    public long getNetRuntime();
    
    /**
     * Gets all accumulator results from the job execution
     * @return Map of accumulator names to their final values
     */
    public Map<String, Object> getAllAccumulatorResults();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-clients-2-10

docs

cli.md

cluster-management.md

execution-context.md

execution-environments.md

index.md

program-management.md

tile.json