0
# Flink Container
1
2
Apache Flink Container Module provides standalone application cluster entry point functionality for containerized Flink deployments. It enables Flink applications to run as standalone application clusters with predefined job locations and configurations, supporting reactive mode scaling and container orchestration platforms.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-container_2.11
7
- **Package Type**: Maven JAR
8
- **Language**: Java
9
- **Installation**: `<dependency><groupId>org.apache.flink</groupId><artifactId>flink-container_2.11</artifactId><version>1.14.6</version></dependency>`
10
11
## Core Imports
12
13
```java
14
import org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;
15
import org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfiguration;
16
import org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfigurationParserFactory;
17
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
18
import org.apache.flink.client.program.PackagedProgram;
19
import org.apache.flink.configuration.Configuration;
20
import org.apache.flink.api.common.JobID;
21
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
22
```
23
24
## Basic Usage
25
26
The primary usage is through the main method for containerized deployment:
27
28
```java
29
// Command line execution - typically invoked from container
30
// java -cp flink-container.jar org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
31
// --configDir /opt/flink/conf
32
// --job-classname com.example.MyFlinkJob
33
// --webui-port 8081
34
// --host localhost
35
36
// For programmatic usage with configuration parsing
37
StandaloneApplicationClusterConfigurationParserFactory factory =
38
new StandaloneApplicationClusterConfigurationParserFactory();
39
CommandLineParser<StandaloneApplicationClusterConfiguration> parser =
40
new CommandLineParser<>(factory);
41
StandaloneApplicationClusterConfiguration config = parser.parse(args);
42
43
// Configuration loading and program setup (as done in main method)
44
Configuration configuration = StandaloneApplicationClusterEntryPoint
45
.loadConfigurationFromClusterConfig(config);
46
```
47
48
## Capabilities
49
50
### Application Cluster Entry Point
51
52
Provides the main entry point for running Flink applications in containerized standalone application cluster mode.
53
54
```java { .api }
55
@Internal
56
public final class StandaloneApplicationClusterEntryPoint extends ApplicationClusterEntryPoint {
57
58
// Main entry point for containerized Flink application
59
public static void main(String[] args);
60
61
// Indicates support for reactive scaling mode
62
@Override
63
protected boolean supportsReactiveMode();
64
65
// Load configuration from cluster configuration (package-visible for testing)
66
@VisibleForTesting
67
static Configuration loadConfigurationFromClusterConfig(
68
StandaloneApplicationClusterConfiguration clusterConfiguration);
69
}
70
```
71
72
**Key Methods:**
73
74
- `main(String[] args)` - Starts the standalone application cluster with command line arguments
75
- `supportsReactiveMode()` - Returns `true` to indicate reactive mode support for dynamic scaling
76
- `loadConfigurationFromClusterConfig()` - Loads Flink configuration from cluster configuration, sets static job ID, and applies savepoint restore settings
77
78
### Configuration Parsing
79
80
Parses command line arguments to create configuration for the standalone application cluster.
81
82
```java { .api }
83
public class StandaloneApplicationClusterConfigurationParserFactory
84
implements ParserResultFactory<StandaloneApplicationClusterConfiguration> {
85
86
// Returns available command line options
87
@Override
88
public Options getOptions();
89
90
// Creates configuration from parsed command line
91
@Override
92
public StandaloneApplicationClusterConfiguration createResult(@Nonnull CommandLine commandLine)
93
throws FlinkParseException;
94
}
95
```
96
97
**Command Line Options:**
98
99
- `--configDir`, `-c` - Configuration directory (required)
100
- `--job-classname`, `-j` - Class name of the job to run
101
- `--job-id`, `-jid` - Job ID of the job to run
102
- `--webui-port`, `-r` - REST port for web UI
103
- `--host`, `-h` - Hostname override
104
- `--fromSavepoint`, `-s` - Savepoint path for restoration
105
- `--allowNonRestoredState`, `-n` - Allow non-restored state from savepoint
106
- `-D<key>=<value>` - Dynamic configuration properties
107
108
### Configuration Management
109
110
Handles configuration loading and savepoint restoration for the application cluster.
111
112
```java { .api }
113
@VisibleForTesting
114
static Configuration loadConfigurationFromClusterConfig(
115
StandaloneApplicationClusterConfiguration clusterConfiguration);
116
117
// Private helper methods (part of main() workflow)
118
private static PackagedProgram getPackagedProgram(
119
StandaloneApplicationClusterConfiguration clusterConfiguration,
120
Configuration flinkConfiguration) throws FlinkException;
121
122
private static void setStaticJobId(
123
StandaloneApplicationClusterConfiguration clusterConfiguration,
124
Configuration configuration);
125
126
private static void configureExecution(
127
Configuration configuration,
128
PackagedProgram program) throws Exception;
129
```
130
131
**Configuration Features:**
132
133
- `loadConfigurationFromClusterConfig()` - Loads Flink configuration from cluster configuration, sets static job ID and savepoint restore settings
134
- `getPackagedProgram()` - Retrieves packaged program from user lib directory using job class name and arguments
135
- `setStaticJobId()` - Sets static job ID in configuration when provided via command line
136
- `configureExecution()` - Applies program configuration to execution environment
137
138
## Types
139
140
### StandaloneApplicationClusterConfiguration
141
142
```java { .api }
143
// Package-private configuration class
144
final class StandaloneApplicationClusterConfiguration extends EntrypointClusterConfiguration {
145
146
// Package-private constructor with all configuration parameters
147
StandaloneApplicationClusterConfiguration(
148
@Nonnull String configDir,
149
@Nonnull Properties dynamicProperties,
150
@Nonnull String[] args,
151
@Nullable String hostname,
152
int restPort,
153
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
154
@Nullable JobID jobId,
155
@Nullable String jobClassName);
156
157
// Access savepoint restore settings (package-private)
158
@Nonnull SavepointRestoreSettings getSavepointRestoreSettings();
159
160
// Access job ID (may be null, package-private)
161
@Nullable JobID getJobId();
162
163
// Access job class name (may be null, package-private)
164
@Nullable String getJobClassName();
165
}
166
```
167
168
### Command Line Options Constants
169
170
```java { .api }
171
// Job class name option
172
private static final Option JOB_CLASS_NAME_OPTION =
173
Option.builder("j").longOpt("job-classname").required(false)
174
.hasArg(true).argName("job class name")
175
.desc("Class name of the job to run.").build();
176
177
// Job ID option
178
private static final Option JOB_ID_OPTION =
179
Option.builder("jid").longOpt("job-id").required(false)
180
.hasArg(true).argName("job id")
181
.desc("Job ID of the job to run.").build();
182
183
// Additional options from base classes
184
// REST_PORT_OPTION: Option.builder("r").longOpt("webui-port")...
185
// HOST_OPTION: Option.builder("h").longOpt("host")...
186
// CONFIG_DIR_OPTION: Option.builder("c").longOpt("configDir")...
187
// DYNAMIC_PROPERTY_OPTION: Option.builder("D")...
188
// SAVEPOINT_PATH_OPTION: Option.builder("s").longOpt("fromSavepoint")...
189
// SAVEPOINT_ALLOW_NON_RESTORED_OPTION: Option.builder("n").longOpt("allowNonRestoredState")...
190
```
191
192
## Error Handling
193
194
The module handles parsing and configuration errors:
195
196
- `FlinkParseException` - Thrown when command line parsing fails (from `createResult()` method)
197
- Invalid job ID format throws `FlinkParseException` with `IllegalArgumentException` cause (from `getJobId()`)
198
- Invalid REST port throws `FlinkParseException` with `NumberFormatException` cause (from `getRestPort()`)
199
- Missing required configuration directory throws parsing exceptions
200
- Program loading failures in `getPackagedProgram()` throw `FlinkException`
201
- Configuration application failures in `configureExecution()` throw generic `Exception`
202
203
## Dependencies
204
205
### Required Dependencies (provided scope)
206
- `org.apache.flink:flink-runtime` - Flink runtime components
207
- `org.apache.flink:flink-clients_${scala.binary.version}` - Flink client libraries
208
209
### Key External Dependencies
210
- Apache Commons CLI for command line parsing
211
- Flink's configuration and job management APIs
212
- Flink's entry point and resource manager frameworks
213
214
## Deployment Context
215
216
This module is designed for containerized deployments where:
217
218
- **Container Orchestration**: Integrates with Kubernetes, Docker Swarm, or other container platforms
219
- **Standalone Mode**: Runs as standalone application clusters (not session clusters)
220
- **Predefined Jobs**: Job location and configuration are specified at container startup
221
- **Reactive Scaling**: Supports dynamic resource scaling based on workload
222
- **Configuration**: Uses external configuration files and command-line parameters
223
- **Savepoint Recovery**: Supports job recovery from savepoints for fault tolerance
224
225
## Architecture
226
227
The module follows Flink's entry point pattern with this main method workflow:
228
229
1. **Environment Setup** - Logs environment information, registers signal handlers, and installs JVM shutdown safeguard
230
2. **Command Line Parsing** - Uses Apache Commons CLI through StandaloneApplicationClusterConfigurationParserFactory
231
3. **Configuration Loading** - Loads Flink configuration via `loadConfigurationFromClusterConfig()` and applies dynamic properties
232
4. **Program Discovery** - Locates packaged programs in user lib directory using `getPackagedProgram()`
233
5. **Execution Configuration** - Applies program configuration through `configureExecution()`
234
6. **Cluster Initialization** - Creates standalone application cluster with StandaloneResourceManagerFactory
235
7. **Lifecycle Management** - Runs cluster entry point with proper error handling and system exit codes
236
237
Key architectural components:
238
- **Entry Point Pattern**: Extends ApplicationClusterEntryPoint for consistent Flink startup behavior
239
- **Configuration Management**: Centralizes all configuration loading, job ID setting, and savepoint restoration
240
- **Resource Management**: Uses StandaloneResourceManagerFactory for standalone deployment mode
241
- **Reactive Scaling**: Built-in support for dynamic resource scaling based on workload