Application-specific deployment classes for running Flink applications in application mode with full lifecycle management. Provides specialized executors, job clients, and deployment utilities for application clusters.
Configuration class for application deployment containing program arguments, class information, and classpaths.
/**
* Configuration for application deployment
*/
public class ApplicationConfiguration {
/**
* Gets the program arguments for the application
* @return Array of program arguments
*/
public String[] getProgramArguments();
/**
* Gets the application main class name
* @return Fully qualified application class name
*/
public String getApplicationClassName();
/**
* Gets the application classpaths
* @return List of classpath strings
*/
public List<String> getApplicationClasspaths();
/**
* Builder for creating ApplicationConfiguration instances
*/
public static class ApplicationConfigurationBuilder {
/**
* Sets the application class name
* @param applicationClassName Fully qualified class name
* @return This builder instance
*/
public ApplicationConfigurationBuilder setApplicationClassName(String applicationClassName);
/**
* Sets the program arguments
* @param programArguments Array of program arguments
* @return This builder instance
*/
public ApplicationConfigurationBuilder setProgramArguments(String[] programArguments);
/**
* Sets the application classpaths
* @param applicationClasspaths List of classpath strings
* @return This builder instance
*/
public ApplicationConfigurationBuilder setApplicationClasspaths(List<String> applicationClasspaths);
/**
* Builds the ApplicationConfiguration
* @return ApplicationConfiguration instance
*/
public ApplicationConfiguration build();
}
}Usage Example:
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
// Create application configuration
ApplicationConfiguration appConfig = new ApplicationConfiguration.ApplicationConfigurationBuilder()
.setApplicationClassName("com.example.MyStreamingApplication")
.setProgramArguments(new String[]{"--parallelism", "4", "--input", "/data/stream"})
.setApplicationClasspaths(Arrays.asList("/path/to/app.jar", "/path/to/lib"))
.build();
System.out.println("Application class: " + appConfig.getApplicationClassName());
System.out.println("Arguments: " + Arrays.toString(appConfig.getProgramArguments()));Interface for running applications with dispatcher integration.
/**
* Runner for application execution
*/
public interface ApplicationRunner {
/**
* Runs an application with the given dispatcher, program, and configuration
* @param dispatcherGateway Gateway to the dispatcher
* @param program Packaged program to run
* @param configuration Flink configuration
* @return CompletableFuture that completes when application finishes
*/
CompletableFuture<Void> run(
DispatcherGateway dispatcherGateway,
PackagedProgram program,
Configuration configuration);
}
/**
* Runs applications in detached mode
*/
public class DetachedApplicationRunner implements ApplicationRunner {
@Override
public CompletableFuture<Void> run(
DispatcherGateway dispatcherGateway,
PackagedProgram program,
Configuration configuration);
}Entry point class for application clusters providing cluster startup and lifecycle management.
/**
* Entry point for application clusters
*/
public class ApplicationClusterEntryPoint extends ClusterEntrypoint {
/**
* Main method for starting application clusters
* @param args Command line arguments
*/
public static void main(String[] args);
// Extends ClusterEntrypoint with application-specific initialization
}Specialized job clients for application mode execution.
/**
* Embedded job client for application mode
*/
public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway {
/**
* Creates embedded job client
* @param jobId Job ID
* @param jobResultFuture Future containing job result
* @param userCodeClassloader Classloader for user code
* @param coordinationRequestGateway Gateway for coordination requests
*/
public EmbeddedJobClient(
JobID jobId,
CompletableFuture<JobResult> jobResultFuture,
ClassLoader userCodeClassloader,
CoordinationRequestGateway coordinationRequestGateway);
// Implements JobClient interface for embedded execution
}
/**
* Job client for web submission
*/
public class WebSubmissionJobClient implements JobClient {
// Implements JobClient interface for web-based submission
// Used when submitting applications through REST API
}Providers for extracting entry class information from different sources.
/**
* Provides entry class information
*/
public interface EntryClassInformationProvider {
/**
* Gets the job class name
* @return Fully qualified job class name
*/
String getJobClassName();
/**
* Gets the job parameters
* @return List of job parameter strings
*/
List<String> getJobParameters();
}
/**
* Entry class information from classpath
*/
public class FromClasspathEntryClassInformationProvider implements EntryClassInformationProvider {
/**
* Creates provider from classpath configuration
* @param programClassName Program class name from classpath
* @param programArguments Program arguments
*/
public FromClasspathEntryClassInformationProvider(String programClassName, String[] programArguments);
@Override
public String getJobClassName();
@Override
public List<String> getJobParameters();
}
/**
* Entry class information from JAR manifest
*/
public class FromJarEntryClassInformationProvider implements EntryClassInformationProvider {
/**
* Creates provider from JAR file
* @param jarFile JAR file to analyze
* @param programArguments Program arguments
*/
public FromJarEntryClassInformationProvider(File jarFile, String[] programArguments);
@Override
public String getJobClassName();
@Override
public List<String> getJobParameters();
}Utility for parsing JAR manifest files to extract entry class information.
/**
* Parser for JAR manifest information
*/
public class JarManifestParser {
/**
* Finds entry class in JAR manifest
* @param jarFile JAR file to analyze
* @return Optional containing entry class name, empty if not found
*/
public static Optional<String> findEntryClass(JarFile jarFile);
}Usage Example:
import org.apache.flink.client.deployment.application.JarManifestParser;
import java.util.jar.JarFile;
// Parse JAR manifest for entry class
JarFile jarFile = new JarFile(new File("my-application.jar"));
Optional<String> entryClass = JarManifestParser.findEntryClass(jarFile);
if (entryClass.isPresent()) {
System.out.println("Found entry class: " + entryClass.get());
// Create entry class provider
EntryClassInformationProvider provider = new FromJarEntryClassInformationProvider(
new File("my-application.jar"),
new String[]{"--config", "production"}
);
System.out.println("Job class: " + provider.getJobClassName());
System.out.println("Parameters: " + provider.getJobParameters());
}Specialized executors for application mode deployment and execution.
/**
* Executor for embedded application execution
*/
public class EmbeddedExecutor implements PipelineExecutor {
@Override
public CompletableFuture<JobClient> execute(
Pipeline pipeline,
Configuration configuration,
ClassLoader userCodeClassloader);
}
/**
* Factory for embedded executors
*/
public class EmbeddedExecutorFactory implements PipelineExecutorFactory {
@Override
public String getName();
@Override
public boolean isCompatibleWith(Configuration configuration);
@Override
public PipelineExecutor getExecutor(Configuration configuration);
}
/**
* Service loader for embedded executors
*/
public class EmbeddedExecutorServiceLoader implements PipelineExecutorServiceLoader {
@Override
public Stream<PipelineExecutorFactory> getExecutorFactories();
}
/**
* Factory for web submission executors
*/
public class WebSubmissionExecutorFactory implements PipelineExecutorFactory {
@Override
public String getName();
@Override
public boolean isCompatibleWith(Configuration configuration);
@Override
public PipelineExecutor getExecutor(Configuration configuration);
}Utilities for polling job status and handling job completion.
/**
* Utilities for polling job status
*/
public class JobStatusPollingUtils {
/**
* Polls job result asynchronously
* @param jobStatusSupplier Supplier for job status
* @param jobResultSupplier Supplier for job result
* @param userCodeClassloader Classloader for user code
* @return CompletableFuture containing final job result
*/
public static CompletableFuture<JobResult> pollJobResultAsync(
Supplier<CompletableFuture<JobStatus>> jobStatusSupplier,
Supplier<CompletableFuture<JobResult>> jobResultSupplier,
ClassLoader userCodeClassloader);
}Dispatcher-related components for application cluster management.
/**
* Bootstrap for application dispatcher
*/
public class ApplicationDispatcherBootstrap implements DispatcherBootstrap {
// Implements dispatcher bootstrap for application mode
}
/**
* Factory for application dispatcher gateway services
*/
public class ApplicationDispatcherGatewayServiceFactory
extends AbstractDispatcherLeaderService.DispatcherGatewayServiceFactory {
// Creates dispatcher gateway services for application mode
}
/**
* Factory for application dispatcher leader process factory
*/
public class ApplicationDispatcherLeaderProcessFactoryFactory
implements DispatcherResourceManagerComponentFactory.DispatcherLeaderProcessFactoryFactory {
// Creates dispatcher leader process factories for application mode
}CLI-specific deployer for application clusters.
/**
* Deploys application clusters via CLI
*/
public class ApplicationClusterDeployer implements ApplicationDeployer {
@Override
public <ClusterID> void run(
Configuration configuration,
ApplicationConfiguration applicationConfiguration);
}/**
* Exception for application execution failures
*/
public class ApplicationExecutionException extends FlinkException {
/**
* Creates exception with message
* @param message Error message
*/
public ApplicationExecutionException(String message);
/**
* Creates exception with message and cause
* @param message Error message
* @param cause Root cause throwable
*/
public ApplicationExecutionException(String message, Throwable cause);
}
/**
* Exception for unsuccessful execution
*/
public class UnsuccessfulExecutionException extends JobExecutionException {
/**
* Creates exception with message, job ID, and cause
* @param message Error message
* @param jobId Job ID that failed
* @param cause Root cause throwable
*/
public UnsuccessfulExecutionException(String message, JobID jobId, Throwable cause);
}public interface ApplicationDeployer {
<ClusterID> void run(Configuration configuration, ApplicationConfiguration applicationConfiguration);
}
public interface DispatcherGateway {
// Gateway interface for dispatcher communication
CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Duration timeout);
CompletableFuture<Collection<JobStatusMessage>> listJobs(Duration timeout);
}
public interface CoordinationRequestGateway {
CompletableFuture<CoordinationResponse> sendCoordinationRequest(
JobID jobId,
OperatorID operatorId,
CoordinationRequest request);
}
public interface EmbeddedJobClientCreator {
CompletableFuture<JobClient> getJobClient(JobID jobId);
}
public abstract class ClusterEntrypoint {
protected abstract DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration);
public static void main(String[] args);
}
public interface DispatcherBootstrap {
CompletableFuture<Void> initializeServices() throws Exception;
}
public interface PipelineExecutor {
CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader);
}
public interface PipelineExecutorFactory {
String getName();
boolean isCompatibleWith(Configuration configuration);
PipelineExecutor getExecutor(Configuration configuration);
}
public interface PipelineExecutorServiceLoader {
Stream<PipelineExecutorFactory> getExecutorFactories();
}
public class JobExecutionException extends FlinkException {
public JobExecutionException(String message);
public JobExecutionException(String message, Throwable cause);
}