Apache Flink client library providing programmatic APIs and command-line interfaces for submitting, managing, and monitoring Flink jobs.
—
Specialized deployment mode for long-running applications with dedicated cluster resources, lifecycle management, and optimized resource utilization for application-centric workloads.
Core interface for running applications in application mode with full lifecycle management.
/**
* Interface for running applications in application mode
*/
public interface ApplicationRunner {
/**
* Run application with dispatcher gateway and configuration
* @param dispatcherGateway Gateway to the dispatcher
* @param scheduledExecutor Executor for scheduled operations
* @param applicationConfiguration Application configuration
* @return Future completing when application finishes
*/
CompletableFuture<Void> run(
DispatcherGateway dispatcherGateway,
ScheduledExecutor scheduledExecutor,
ApplicationConfiguration applicationConfiguration
);
}Application runner implementation for detached execution where the client doesn't wait for job completion.
/**
* Application runner for detached execution
*/
public class DetachedApplicationRunner implements ApplicationRunner {
/**
* Create detached application runner
* @param highAvailabilityServices High availability services
* @param configuration Flink configuration
* @param mainThreadExecutor Main thread executor
*/
public DetachedApplicationRunner(
HighAvailabilityServices highAvailabilityServices,
Configuration configuration,
Executor mainThreadExecutor
);
@Override
public CompletableFuture<Void> run(
DispatcherGateway dispatcherGateway,
ScheduledExecutor scheduledExecutor,
ApplicationConfiguration applicationConfiguration
);
}Configuration class for application deployments containing program arguments, entry point information, and execution settings.
/**
* Configuration for application deployments
*/
public class ApplicationConfiguration {
/**
* Create configuration builder from Flink configuration
* @param configuration Flink configuration
* @return Configuration builder
*/
public static Builder fromConfiguration(Configuration configuration);
/**
* Get program arguments
* @return Array of program arguments
*/
public String[] getProgramArguments();
/**
* Get application arguments
* @return Array of application arguments
*/
public String[] getApplicationArgs();
/**
* Get parallelism setting
* @return Parallelism or null if not set
*/
@Nullable
public Integer getParallelism();
/**
* Get savepoint restore settings
* @return Savepoint restore settings
*/
public SavepointRestoreSettings getSavepointRestoreSettings();
/**
* Builder for application configuration
*/
public static class Builder {
/**
* Set program arguments
* @param programArguments Array of arguments
* @return Builder instance
*/
public Builder setProgramArguments(String... programArguments);
/**
* Set application arguments
* @param applicationArgs Array of application arguments
* @return Builder instance
*/
public Builder setApplicationArgs(String... applicationArgs);
/**
* Set parallelism
* @param parallelism Parallelism setting
* @return Builder instance
*/
public Builder setParallelism(Integer parallelism);
/**
* Set savepoint restore settings
* @param savepointRestoreSettings Savepoint settings
* @return Builder instance
*/
public Builder setSavepointRestoreSettings(
SavepointRestoreSettings savepointRestoreSettings
);
/**
* Build application configuration
* @return Configured application configuration
*/
public ApplicationConfiguration build();
}
}Usage Examples:
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
// Create basic application configuration
ApplicationConfiguration config = ApplicationConfiguration
.fromConfiguration(flinkConfig)
.setProgramArguments("--input", "input.txt", "--output", "output.txt")
.setParallelism(4)
.build();
// Create configuration with savepoint restore
SavepointRestoreSettings savepointSettings = SavepointRestoreSettings
.forPath("/path/to/savepoint", false);
ApplicationConfiguration configWithSavepoint = ApplicationConfiguration
.fromConfiguration(flinkConfig)
.setProgramArguments("--mode", "batch")
.setSavepointRestoreSettings(savepointSettings)
.build();Entry point for application cluster mode providing cluster initialization and application execution coordination.
/**
* Entry point for application cluster mode
*/
public class ApplicationClusterEntryPoint extends ClusterEntrypoint {
/**
* Main method for application cluster entry point
* @param args Command-line arguments
*/
public static void main(String[] args);
/**
* Create application cluster entry point
* @param configuration Cluster configuration
* @param pluginManager Plugin manager
*/
protected ApplicationClusterEntryPoint(
Configuration configuration,
PluginManager pluginManager
);
@Override
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(
Configuration configuration
);
}Specialized job client implementations for application mode execution.
/**
* Job client for embedded application execution
*/
public class EmbeddedJobClient implements JobClient {
/**
* Create embedded job client
* @param jobID Job identifier
* @param dispatcher Dispatcher gateway
* @param executorService Executor service
* @param classLoader User code class loader
*/
public EmbeddedJobClient(
JobID jobID,
DispatcherGateway dispatcher,
ScheduledExecutorService executorService,
ClassLoader classLoader
);
@Override
public JobID getJobID();
@Override
public CompletableFuture<JobStatus> getJobStatus();
@Override
public CompletableFuture<Void> cancel();
@Override
public CompletableFuture<String> stopWithSavepoint(
boolean advanceToEndOfEventTime,
@Nullable String savepointDir,
SavepointFormatType formatType
);
@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult();
}
/**
* Job client for web submission application execution
*/
public class WebSubmissionJobClient implements JobClient {
/**
* Create web submission job client
* @param jobSubmissionResult Job submission result
* @param jobStatusSupplier Supplier for job status
* @param jobResultSupplier Supplier for job result
* @param classLoader User code class loader
*/
public WebSubmissionJobClient(
JobSubmissionResult jobSubmissionResult,
Supplier<CompletableFuture<JobStatus>> jobStatusSupplier,
Supplier<CompletableFuture<JobResult>> jobResultSupplier,
ClassLoader classLoader
);
@Override
public JobID getJobID();
@Override
public CompletableFuture<JobStatus> getJobStatus();
@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult();
}System for discovering and providing entry class information from JARs and classpaths.
/**
* Provider for entry class information
*/
public interface EntryClassInformationProvider {
/**
* Get program entry point class name
* @return Entry point class name or null
*/
@Nullable
String getJobClassName();
/**
* Get JAR file location
* @return JAR file or null
*/
@Nullable
File getJarFile();
}
/**
* Entry class provider from JAR manifest
*/
public class FromJarEntryClassInformationProvider implements EntryClassInformationProvider {
/**
* Create provider from JAR file
* @param jarFile JAR file to analyze
* @param programArguments Program arguments
*/
public FromJarEntryClassInformationProvider(File jarFile, String[] programArguments);
@Override
public String getJobClassName();
@Override
public File getJarFile();
}
/**
* Entry class provider from classpath scanning
*/
public class FromClasspathEntryClassInformationProvider implements EntryClassInformationProvider {
/**
* Create provider from classpath
* @param jobClassName Job class name
* @param programArguments Program arguments
*/
public FromClasspathEntryClassInformationProvider(
String jobClassName,
String[] programArguments
);
@Override
public String getJobClassName();
@Override
public File getJarFile();
}Executor implementations specifically designed for application mode execution.
/**
* Executor for embedded application execution
*/
public class EmbeddedExecutor implements PipelineExecutor {
/**
* Create embedded executor
* @param dispatcherGateway Dispatcher gateway
* @param executorService Executor service
* @param configuration Flink configuration
* @param userCodeClassLoader User code class loader
*/
public EmbeddedExecutor(
DispatcherGateway dispatcherGateway,
ScheduledExecutorService executorService,
Configuration configuration,
ClassLoader userCodeClassLoader
);
@Override
public CompletableFuture<JobClient> execute(
Pipeline pipeline,
Configuration configuration,
ClassLoader userCodeClassloader
) throws Exception;
}
/**
* Factory for embedded executors
*/
public class EmbeddedExecutorFactory implements PipelineExecutorFactory {
@Override
public String getName();
@Override
public boolean isCompatibleWith(Configuration configuration);
@Override
public PipelineExecutor getExecutor(Configuration configuration);
}Utility classes for application mode operations and status monitoring.
/**
* Utilities for polling job status in application mode
*/
public class JobStatusPollingUtils {
/**
* Poll job status until completion
* @param jobClient Job client
* @param scheduledExecutor Scheduled executor
* @param timeout Polling timeout
* @return Future with final job result
*/
public static CompletableFuture<JobExecutionResult> pollJobStatusUntilFinished(
JobClient jobClient,
ScheduledExecutorService scheduledExecutor,
Duration timeout
);
/**
* Poll job status with custom polling interval
* @param jobStatusSupplier Job status supplier
* @param scheduledExecutor Scheduled executor
* @param pollingInterval Polling interval
* @param timeout Total timeout
* @return Future with final job status
*/
public static CompletableFuture<JobStatus> pollJobStatus(
Supplier<CompletableFuture<JobStatus>> jobStatusSupplier,
ScheduledExecutorService scheduledExecutor,
Duration pollingInterval,
Duration timeout
);
}
/**
* Parser for JAR manifest files
*/
public class JarManifestParser {
/**
* Find entry class from JAR manifest
* @param jarFile JAR file to analyze
* @return Entry class name or null
*/
@Nullable
public static String findEntryClass(File jarFile);
/**
* Check if JAR contains main class
* @param jarFile JAR file to check
* @return true if main class found
*/
public static boolean hasMainClass(File jarFile);
}/**
* Job submission result for application mode
*/
public class JobSubmissionResult {
/**
* Get submitted job ID
* @return Job identifier
*/
public JobID getJobID();
/**
* Check if submission was successful
* @return true if successful
*/
public boolean isSuccess();
}
/**
* Embedded job client creator
*/
public class EmbeddedJobClientCreator {
/**
* Create embedded job client
* @param dispatcherGateway Dispatcher gateway
* @param executorService Executor service
* @param archivedExecutionGraph Archived execution graph
* @param userCodeClassLoader User code class loader
* @return Embedded job client
*/
public static EmbeddedJobClient create(
DispatcherGateway dispatcherGateway,
ScheduledExecutorService executorService,
ArchivedExecutionGraph archivedExecutionGraph,
ClassLoader userCodeClassLoader
);
}Application mode operations handle specific error conditions:
ApplicationExecutionException for application runtime failuresUnsuccessfulExecutionException for failed job executionsError Handling Examples:
try {
ApplicationConfiguration config = ApplicationConfiguration
.fromConfiguration(flinkConfig)
.setProgramArguments("--input", "data.txt")
.build();
CompletableFuture<Void> applicationResult = runner.run(
dispatcherGateway,
scheduledExecutor,
config
);
applicationResult.get(); // Wait for completion
} catch (ApplicationExecutionException e) {
System.err.println("Application execution failed: " + e.getMessage());
} catch (UnsuccessfulExecutionException e) {
System.err.println("Job execution unsuccessful: " + e.getMessage());
}Application mode provides dedicated cluster resources for long-running applications, enabling better resource isolation, simplified deployment, and optimized lifecycle management compared to session mode deployments.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-clients