Apache Flink Container Module providing standalone application cluster entry point functionality for containerized Flink deployments.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-container@1.14.0Apache Flink Container Module provides standalone application cluster entry point functionality for containerized Flink deployments. It enables Flink applications to run as standalone application clusters with predefined job locations and configurations, supporting reactive mode scaling and container orchestration platforms.
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-container_2.11</artifactId><version>1.14.6</version></dependency>import org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;
import org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfiguration;
import org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfigurationParserFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;The primary usage is through the main method for containerized deployment:
// Command line execution - typically invoked from container
// java -cp flink-container.jar org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
// --configDir /opt/flink/conf
// --job-classname com.example.MyFlinkJob
// --webui-port 8081
// --host localhost
// For programmatic usage with configuration parsing
StandaloneApplicationClusterConfigurationParserFactory factory =
new StandaloneApplicationClusterConfigurationParserFactory();
CommandLineParser<StandaloneApplicationClusterConfiguration> parser =
new CommandLineParser<>(factory);
StandaloneApplicationClusterConfiguration config = parser.parse(args);
// Configuration loading and program setup (as done in main method)
Configuration configuration = StandaloneApplicationClusterEntryPoint
.loadConfigurationFromClusterConfig(config);Provides the main entry point for running Flink applications in containerized standalone application cluster mode.
@Internal
public final class StandaloneApplicationClusterEntryPoint extends ApplicationClusterEntryPoint {
// Main entry point for containerized Flink application
public static void main(String[] args);
// Indicates support for reactive scaling mode
@Override
protected boolean supportsReactiveMode();
// Load configuration from cluster configuration (package-visible for testing)
@VisibleForTesting
static Configuration loadConfigurationFromClusterConfig(
StandaloneApplicationClusterConfiguration clusterConfiguration);
}Key Methods:
main(String[] args) - Starts the standalone application cluster with command line argumentssupportsReactiveMode() - Returns true to indicate reactive mode support for dynamic scalingloadConfigurationFromClusterConfig() - Loads Flink configuration from cluster configuration, sets static job ID, and applies savepoint restore settingsParses command line arguments to create configuration for the standalone application cluster.
public class StandaloneApplicationClusterConfigurationParserFactory
implements ParserResultFactory<StandaloneApplicationClusterConfiguration> {
// Returns available command line options
@Override
public Options getOptions();
// Creates configuration from parsed command line
@Override
public StandaloneApplicationClusterConfiguration createResult(@Nonnull CommandLine commandLine)
throws FlinkParseException;
}Command Line Options:
--configDir, -c - Configuration directory (required)--job-classname, -j - Class name of the job to run--job-id, -jid - Job ID of the job to run--webui-port, -r - REST port for web UI--host, -h - Hostname override--fromSavepoint, -s - Savepoint path for restoration--allowNonRestoredState, -n - Allow non-restored state from savepoint-D<key>=<value> - Dynamic configuration propertiesHandles configuration loading and savepoint restoration for the application cluster.
@VisibleForTesting
static Configuration loadConfigurationFromClusterConfig(
StandaloneApplicationClusterConfiguration clusterConfiguration);
// Private helper methods (part of main() workflow)
private static PackagedProgram getPackagedProgram(
StandaloneApplicationClusterConfiguration clusterConfiguration,
Configuration flinkConfiguration) throws FlinkException;
private static void setStaticJobId(
StandaloneApplicationClusterConfiguration clusterConfiguration,
Configuration configuration);
private static void configureExecution(
Configuration configuration,
PackagedProgram program) throws Exception;Configuration Features:
loadConfigurationFromClusterConfig() - Loads Flink configuration from cluster configuration, sets static job ID and savepoint restore settingsgetPackagedProgram() - Retrieves packaged program from user lib directory using job class name and argumentssetStaticJobId() - Sets static job ID in configuration when provided via command lineconfigureExecution() - Applies program configuration to execution environment// Package-private configuration class
final class StandaloneApplicationClusterConfiguration extends EntrypointClusterConfiguration {
// Package-private constructor with all configuration parameters
StandaloneApplicationClusterConfiguration(
@Nonnull String configDir,
@Nonnull Properties dynamicProperties,
@Nonnull String[] args,
@Nullable String hostname,
int restPort,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nullable JobID jobId,
@Nullable String jobClassName);
// Access savepoint restore settings (package-private)
@Nonnull SavepointRestoreSettings getSavepointRestoreSettings();
// Access job ID (may be null, package-private)
@Nullable JobID getJobId();
// Access job class name (may be null, package-private)
@Nullable String getJobClassName();
}// Job class name option
private static final Option JOB_CLASS_NAME_OPTION =
Option.builder("j").longOpt("job-classname").required(false)
.hasArg(true).argName("job class name")
.desc("Class name of the job to run.").build();
// Job ID option
private static final Option JOB_ID_OPTION =
Option.builder("jid").longOpt("job-id").required(false)
.hasArg(true).argName("job id")
.desc("Job ID of the job to run.").build();
// Additional options from base classes
// REST_PORT_OPTION: Option.builder("r").longOpt("webui-port")...
// HOST_OPTION: Option.builder("h").longOpt("host")...
// CONFIG_DIR_OPTION: Option.builder("c").longOpt("configDir")...
// DYNAMIC_PROPERTY_OPTION: Option.builder("D")...
// SAVEPOINT_PATH_OPTION: Option.builder("s").longOpt("fromSavepoint")...
// SAVEPOINT_ALLOW_NON_RESTORED_OPTION: Option.builder("n").longOpt("allowNonRestoredState")...The module handles parsing and configuration errors:
FlinkParseException - Thrown when command line parsing fails (from createResult() method)FlinkParseException with IllegalArgumentException cause (from getJobId())FlinkParseException with NumberFormatException cause (from getRestPort())getPackagedProgram() throw FlinkExceptionconfigureExecution() throw generic Exceptionorg.apache.flink:flink-runtime - Flink runtime componentsorg.apache.flink:flink-clients_${scala.binary.version} - Flink client librariesThis module is designed for containerized deployments where:
The module follows Flink's entry point pattern with this main method workflow:
loadConfigurationFromClusterConfig() and applies dynamic propertiesgetPackagedProgram()configureExecution()Key architectural components: