or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-container

Apache Flink Container Module providing standalone application cluster entry point functionality for containerized Flink deployments.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-container_2.11@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-container@1.14.0

0

# Flink Container

1

2

Apache Flink Container Module provides standalone application cluster entry point functionality for containerized Flink deployments. It enables Flink applications to run as standalone application clusters with predefined job locations and configurations, supporting reactive mode scaling and container orchestration platforms.

3

4

## Package Information

5

6

- **Package Name**: org.apache.flink:flink-container_2.11

7

- **Package Type**: Maven JAR

8

- **Language**: Java

9

- **Installation**: `<dependency><groupId>org.apache.flink</groupId><artifactId>flink-container_2.11</artifactId><version>1.14.6</version></dependency>`

10

11

## Core Imports

12

13

```java

14

import org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;

15

import org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfiguration;

16

import org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfigurationParserFactory;

17

import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;

18

import org.apache.flink.client.program.PackagedProgram;

19

import org.apache.flink.configuration.Configuration;

20

import org.apache.flink.api.common.JobID;

21

import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

22

```

23

24

## Basic Usage

25

26

The primary usage is through the main method for containerized deployment:

27

28

```java

29

// Command line execution - typically invoked from container

30

// java -cp flink-container.jar org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint

31

// --configDir /opt/flink/conf

32

// --job-classname com.example.MyFlinkJob

33

// --webui-port 8081

34

// --host localhost

35

36

// For programmatic usage with configuration parsing

37

StandaloneApplicationClusterConfigurationParserFactory factory =

38

new StandaloneApplicationClusterConfigurationParserFactory();

39

CommandLineParser<StandaloneApplicationClusterConfiguration> parser =

40

new CommandLineParser<>(factory);

41

StandaloneApplicationClusterConfiguration config = parser.parse(args);

42

43

// Configuration loading and program setup (as done in main method)

44

Configuration configuration = StandaloneApplicationClusterEntryPoint

45

.loadConfigurationFromClusterConfig(config);

46

```

47

48

## Capabilities

49

50

### Application Cluster Entry Point

51

52

Provides the main entry point for running Flink applications in containerized standalone application cluster mode.

53

54

```java { .api }

55

@Internal

56

public final class StandaloneApplicationClusterEntryPoint extends ApplicationClusterEntryPoint {

57

58

// Main entry point for containerized Flink application

59

public static void main(String[] args);

60

61

// Indicates support for reactive scaling mode

62

@Override

63

protected boolean supportsReactiveMode();

64

65

// Load configuration from cluster configuration (package-visible for testing)

66

@VisibleForTesting

67

static Configuration loadConfigurationFromClusterConfig(

68

StandaloneApplicationClusterConfiguration clusterConfiguration);

69

}

70

```

71

72

**Key Methods:**

73

74

- `main(String[] args)` - Starts the standalone application cluster with command line arguments

75

- `supportsReactiveMode()` - Returns `true` to indicate reactive mode support for dynamic scaling

76

- `loadConfigurationFromClusterConfig()` - Loads Flink configuration from cluster configuration, sets static job ID, and applies savepoint restore settings

77

78

### Configuration Parsing

79

80

Parses command line arguments to create configuration for the standalone application cluster.

81

82

```java { .api }

83

public class StandaloneApplicationClusterConfigurationParserFactory

84

implements ParserResultFactory<StandaloneApplicationClusterConfiguration> {

85

86

// Returns available command line options

87

@Override

88

public Options getOptions();

89

90

// Creates configuration from parsed command line

91

@Override

92

public StandaloneApplicationClusterConfiguration createResult(@Nonnull CommandLine commandLine)

93

throws FlinkParseException;

94

}

95

```

96

97

**Command Line Options:**

98

99

- `--configDir`, `-c` - Configuration directory (required)

100

- `--job-classname`, `-j` - Class name of the job to run

101

- `--job-id`, `-jid` - Job ID of the job to run

102

- `--webui-port`, `-r` - REST port for web UI

103

- `--host`, `-h` - Hostname override

104

- `--fromSavepoint`, `-s` - Savepoint path for restoration

105

- `--allowNonRestoredState`, `-n` - Allow non-restored state from savepoint

106

- `-D<key>=<value>` - Dynamic configuration properties

107

108

### Configuration Management

109

110

Handles configuration loading and savepoint restoration for the application cluster.

111

112

```java { .api }

113

@VisibleForTesting

114

static Configuration loadConfigurationFromClusterConfig(

115

StandaloneApplicationClusterConfiguration clusterConfiguration);

116

117

// Private helper methods (part of main() workflow)

118

private static PackagedProgram getPackagedProgram(

119

StandaloneApplicationClusterConfiguration clusterConfiguration,

120

Configuration flinkConfiguration) throws FlinkException;

121

122

private static void setStaticJobId(

123

StandaloneApplicationClusterConfiguration clusterConfiguration,

124

Configuration configuration);

125

126

private static void configureExecution(

127

Configuration configuration,

128

PackagedProgram program) throws Exception;

129

```

130

131

**Configuration Features:**

132

133

- `loadConfigurationFromClusterConfig()` - Loads Flink configuration from cluster configuration, sets static job ID and savepoint restore settings

134

- `getPackagedProgram()` - Retrieves packaged program from user lib directory using job class name and arguments

135

- `setStaticJobId()` - Sets static job ID in configuration when provided via command line

136

- `configureExecution()` - Applies program configuration to execution environment

137

138

## Types

139

140

### StandaloneApplicationClusterConfiguration

141

142

```java { .api }

143

// Package-private configuration class

144

final class StandaloneApplicationClusterConfiguration extends EntrypointClusterConfiguration {

145

146

// Package-private constructor with all configuration parameters

147

StandaloneApplicationClusterConfiguration(

148

@Nonnull String configDir,

149

@Nonnull Properties dynamicProperties,

150

@Nonnull String[] args,

151

@Nullable String hostname,

152

int restPort,

153

@Nonnull SavepointRestoreSettings savepointRestoreSettings,

154

@Nullable JobID jobId,

155

@Nullable String jobClassName);

156

157

// Access savepoint restore settings (package-private)

158

@Nonnull SavepointRestoreSettings getSavepointRestoreSettings();

159

160

// Access job ID (may be null, package-private)

161

@Nullable JobID getJobId();

162

163

// Access job class name (may be null, package-private)

164

@Nullable String getJobClassName();

165

}

166

```

167

168

### Command Line Options Constants

169

170

```java { .api }

171

// Job class name option

172

private static final Option JOB_CLASS_NAME_OPTION =

173

Option.builder("j").longOpt("job-classname").required(false)

174

.hasArg(true).argName("job class name")

175

.desc("Class name of the job to run.").build();

176

177

// Job ID option

178

private static final Option JOB_ID_OPTION =

179

Option.builder("jid").longOpt("job-id").required(false)

180

.hasArg(true).argName("job id")

181

.desc("Job ID of the job to run.").build();

182

183

// Additional options from base classes

184

// REST_PORT_OPTION: Option.builder("r").longOpt("webui-port")...

185

// HOST_OPTION: Option.builder("h").longOpt("host")...

186

// CONFIG_DIR_OPTION: Option.builder("c").longOpt("configDir")...

187

// DYNAMIC_PROPERTY_OPTION: Option.builder("D")...

188

// SAVEPOINT_PATH_OPTION: Option.builder("s").longOpt("fromSavepoint")...

189

// SAVEPOINT_ALLOW_NON_RESTORED_OPTION: Option.builder("n").longOpt("allowNonRestoredState")...

190

```

191

192

## Error Handling

193

194

The module handles parsing and configuration errors:

195

196

- `FlinkParseException` - Thrown when command line parsing fails (from `createResult()` method)

197

- Invalid job ID format throws `FlinkParseException` with `IllegalArgumentException` cause (from `getJobId()`)

198

- Invalid REST port throws `FlinkParseException` with `NumberFormatException` cause (from `getRestPort()`)

199

- Missing required configuration directory throws parsing exceptions

200

- Program loading failures in `getPackagedProgram()` throw `FlinkException`

201

- Configuration application failures in `configureExecution()` throw generic `Exception`

202

203

## Dependencies

204

205

### Required Dependencies (provided scope)

206

- `org.apache.flink:flink-runtime` - Flink runtime components

207

- `org.apache.flink:flink-clients_${scala.binary.version}` - Flink client libraries

208

209

### Key External Dependencies

210

- Apache Commons CLI for command line parsing

211

- Flink's configuration and job management APIs

212

- Flink's entry point and resource manager frameworks

213

214

## Deployment Context

215

216

This module is designed for containerized deployments where:

217

218

- **Container Orchestration**: Integrates with Kubernetes, Docker Swarm, or other container platforms

219

- **Standalone Mode**: Runs as standalone application clusters (not session clusters)

220

- **Predefined Jobs**: Job location and configuration are specified at container startup

221

- **Reactive Scaling**: Supports dynamic resource scaling based on workload

222

- **Configuration**: Uses external configuration files and command-line parameters

223

- **Savepoint Recovery**: Supports job recovery from savepoints for fault tolerance

224

225

## Architecture

226

227

The module follows Flink's entry point pattern with this main method workflow:

228

229

1. **Environment Setup** - Logs environment information, registers signal handlers, and installs JVM shutdown safeguard

230

2. **Command Line Parsing** - Uses Apache Commons CLI through StandaloneApplicationClusterConfigurationParserFactory

231

3. **Configuration Loading** - Loads Flink configuration via `loadConfigurationFromClusterConfig()` and applies dynamic properties

232

4. **Program Discovery** - Locates packaged programs in user lib directory using `getPackagedProgram()`

233

5. **Execution Configuration** - Applies program configuration through `configureExecution()`

234

6. **Cluster Initialization** - Creates standalone application cluster with StandaloneResourceManagerFactory

235

7. **Lifecycle Management** - Runs cluster entry point with proper error handling and system exit codes

236

237

Key architectural components:

238

- **Entry Point Pattern**: Extends ApplicationClusterEntryPoint for consistent Flink startup behavior

239

- **Configuration Management**: Centralizes all configuration loading, job ID setting, and savepoint restoration

240

- **Resource Management**: Uses StandaloneResourceManagerFactory for standalone deployment mode

241

- **Reactive Scaling**: Built-in support for dynamic resource scaling based on workload