or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-flink--flink-container

Apache Flink Container Module providing standalone application cluster entry point functionality for containerized Flink deployments.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-container_2.11@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-container@1.14.0

index.mddocs/

Flink Container

Apache Flink Container Module provides standalone application cluster entry point functionality for containerized Flink deployments. It enables Flink applications to run as standalone application clusters with predefined job locations and configurations, supporting reactive mode scaling and container orchestration platforms.

Package Information

  • Package Name: org.apache.flink:flink-container_2.11
  • Package Type: Maven JAR
  • Language: Java
  • Installation: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-container_2.11</artifactId><version>1.14.6</version></dependency>

Core Imports

import org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;
import org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfiguration;
import org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfigurationParserFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

Basic Usage

The primary usage is through the main method for containerized deployment:

// Command line execution - typically invoked from container
// java -cp flink-container.jar org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint 
//   --configDir /opt/flink/conf 
//   --job-classname com.example.MyFlinkJob
//   --webui-port 8081
//   --host localhost

// For programmatic usage with configuration parsing
StandaloneApplicationClusterConfigurationParserFactory factory = 
    new StandaloneApplicationClusterConfigurationParserFactory();
CommandLineParser<StandaloneApplicationClusterConfiguration> parser = 
    new CommandLineParser<>(factory);
StandaloneApplicationClusterConfiguration config = parser.parse(args);

// Configuration loading and program setup (as done in main method)
Configuration configuration = StandaloneApplicationClusterEntryPoint
    .loadConfigurationFromClusterConfig(config);

Capabilities

Application Cluster Entry Point

Provides the main entry point for running Flink applications in containerized standalone application cluster mode.

@Internal
public final class StandaloneApplicationClusterEntryPoint extends ApplicationClusterEntryPoint {
    
    // Main entry point for containerized Flink application
    public static void main(String[] args);
    
    // Indicates support for reactive scaling mode
    @Override
    protected boolean supportsReactiveMode();
    
    // Load configuration from cluster configuration (package-visible for testing)
    @VisibleForTesting
    static Configuration loadConfigurationFromClusterConfig(
        StandaloneApplicationClusterConfiguration clusterConfiguration);
}

Key Methods:

  • main(String[] args) - Starts the standalone application cluster with command line arguments
  • supportsReactiveMode() - Returns true to indicate reactive mode support for dynamic scaling
  • loadConfigurationFromClusterConfig() - Loads Flink configuration from cluster configuration, sets static job ID, and applies savepoint restore settings

Configuration Parsing

Parses command line arguments to create configuration for the standalone application cluster.

public class StandaloneApplicationClusterConfigurationParserFactory 
    implements ParserResultFactory<StandaloneApplicationClusterConfiguration> {
    
    // Returns available command line options
    @Override
    public Options getOptions();
    
    // Creates configuration from parsed command line
    @Override
    public StandaloneApplicationClusterConfiguration createResult(@Nonnull CommandLine commandLine) 
        throws FlinkParseException;
}

Command Line Options:

  • --configDir, -c - Configuration directory (required)
  • --job-classname, -j - Class name of the job to run
  • --job-id, -jid - Job ID of the job to run
  • --webui-port, -r - REST port for web UI
  • --host, -h - Hostname override
  • --fromSavepoint, -s - Savepoint path for restoration
  • --allowNonRestoredState, -n - Allow non-restored state from savepoint
  • -D<key>=<value> - Dynamic configuration properties

Configuration Management

Handles configuration loading and savepoint restoration for the application cluster.

@VisibleForTesting
static Configuration loadConfigurationFromClusterConfig(
    StandaloneApplicationClusterConfiguration clusterConfiguration);

// Private helper methods (part of main() workflow)
private static PackagedProgram getPackagedProgram(
    StandaloneApplicationClusterConfiguration clusterConfiguration,
    Configuration flinkConfiguration) throws FlinkException;

private static void setStaticJobId(
    StandaloneApplicationClusterConfiguration clusterConfiguration,
    Configuration configuration);

private static void configureExecution(
    Configuration configuration,
    PackagedProgram program) throws Exception;

Configuration Features:

  • loadConfigurationFromClusterConfig() - Loads Flink configuration from cluster configuration, sets static job ID and savepoint restore settings
  • getPackagedProgram() - Retrieves packaged program from user lib directory using job class name and arguments
  • setStaticJobId() - Sets static job ID in configuration when provided via command line
  • configureExecution() - Applies program configuration to execution environment

Types

StandaloneApplicationClusterConfiguration

// Package-private configuration class
final class StandaloneApplicationClusterConfiguration extends EntrypointClusterConfiguration {
    
    // Package-private constructor with all configuration parameters
    StandaloneApplicationClusterConfiguration(
        @Nonnull String configDir,
        @Nonnull Properties dynamicProperties,
        @Nonnull String[] args,
        @Nullable String hostname,
        int restPort,
        @Nonnull SavepointRestoreSettings savepointRestoreSettings,
        @Nullable JobID jobId,
        @Nullable String jobClassName);
    
    // Access savepoint restore settings (package-private)
    @Nonnull SavepointRestoreSettings getSavepointRestoreSettings();
    
    // Access job ID (may be null, package-private)
    @Nullable JobID getJobId();
    
    // Access job class name (may be null, package-private)  
    @Nullable String getJobClassName();
}

Command Line Options Constants

// Job class name option
private static final Option JOB_CLASS_NAME_OPTION = 
    Option.builder("j").longOpt("job-classname").required(false)
          .hasArg(true).argName("job class name")
          .desc("Class name of the job to run.").build();

// Job ID option  
private static final Option JOB_ID_OPTION = 
    Option.builder("jid").longOpt("job-id").required(false)
          .hasArg(true).argName("job id")
          .desc("Job ID of the job to run.").build();

// Additional options from base classes
// REST_PORT_OPTION: Option.builder("r").longOpt("webui-port")...
// HOST_OPTION: Option.builder("h").longOpt("host")...
// CONFIG_DIR_OPTION: Option.builder("c").longOpt("configDir")...
// DYNAMIC_PROPERTY_OPTION: Option.builder("D")...
// SAVEPOINT_PATH_OPTION: Option.builder("s").longOpt("fromSavepoint")...
// SAVEPOINT_ALLOW_NON_RESTORED_OPTION: Option.builder("n").longOpt("allowNonRestoredState")...

Error Handling

The module handles parsing and configuration errors:

  • FlinkParseException - Thrown when command line parsing fails (from createResult() method)
  • Invalid job ID format throws FlinkParseException with IllegalArgumentException cause (from getJobId())
  • Invalid REST port throws FlinkParseException with NumberFormatException cause (from getRestPort())
  • Missing required configuration directory throws parsing exceptions
  • Program loading failures in getPackagedProgram() throw FlinkException
  • Configuration application failures in configureExecution() throw generic Exception

Dependencies

Required Dependencies (provided scope)

  • org.apache.flink:flink-runtime - Flink runtime components
  • org.apache.flink:flink-clients_${scala.binary.version} - Flink client libraries

Key External Dependencies

  • Apache Commons CLI for command line parsing
  • Flink's configuration and job management APIs
  • Flink's entry point and resource manager frameworks

Deployment Context

This module is designed for containerized deployments where:

  • Container Orchestration: Integrates with Kubernetes, Docker Swarm, or other container platforms
  • Standalone Mode: Runs as standalone application clusters (not session clusters)
  • Predefined Jobs: Job location and configuration are specified at container startup
  • Reactive Scaling: Supports dynamic resource scaling based on workload
  • Configuration: Uses external configuration files and command-line parameters
  • Savepoint Recovery: Supports job recovery from savepoints for fault tolerance

Architecture

The module follows Flink's entry point pattern with this main method workflow:

  1. Environment Setup - Logs environment information, registers signal handlers, and installs JVM shutdown safeguard
  2. Command Line Parsing - Uses Apache Commons CLI through StandaloneApplicationClusterConfigurationParserFactory
  3. Configuration Loading - Loads Flink configuration via loadConfigurationFromClusterConfig() and applies dynamic properties
  4. Program Discovery - Locates packaged programs in user lib directory using getPackagedProgram()
  5. Execution Configuration - Applies program configuration through configureExecution()
  6. Cluster Initialization - Creates standalone application cluster with StandaloneResourceManagerFactory
  7. Lifecycle Management - Runs cluster entry point with proper error handling and system exit codes

Key architectural components:

  • Entry Point Pattern: Extends ApplicationClusterEntryPoint for consistent Flink startup behavior
  • Configuration Management: Centralizes all configuration loading, job ID setting, and savepoint restoration
  • Resource Management: Uses StandaloneResourceManagerFactory for standalone deployment mode
  • Reactive Scaling: Built-in support for dynamic resource scaling based on workload