or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-kubernetes

Apache Flink Kubernetes integration module that provides native Kubernetes support for deploying and managing Flink clusters on Kubernetes.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-kubernetes@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-kubernetes@2.1.0

0

# Flink Kubernetes

1

2

Apache Flink Kubernetes integration module that provides native Kubernetes support for deploying and managing Flink clusters on Kubernetes. This module enables Flink to leverage Kubernetes as a resource manager, supporting both session and application clusters with features like high availability, leader election, checkpoint recovery, and state management using Kubernetes resources.

3

4

## Package Information

5

6

- **Package Name**: flink-kubernetes

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-kubernetes

11

- **Installation**: Add as a Maven dependency to your project

12

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-kubernetes</artifactId>

17

<version>2.1.0</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;

25

import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;

26

```

27

28

## Basic Usage

29

30

This module is typically used through configuration rather than direct API calls. Configure Flink to use Kubernetes as a deployment target:

31

32

```java

33

import org.apache.flink.configuration.Configuration;

34

import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;

35

36

Configuration config = new Configuration();

37

config.setString("deployment.target", "kubernetes-session");

38

config.setString(KubernetesConfigOptions.NAMESPACE, "flink");

39

config.setString(KubernetesConfigOptions.CLUSTER_ID, "my-flink-cluster");

40

config.setString(KubernetesConfigOptions.CONTAINER_IMAGE, "flink:2.1.0");

41

```

42

43

## Architecture

44

45

The flink-kubernetes module is built around several key components:

46

47

- **Configuration System**: Public configuration options for customizing Kubernetes deployments

48

- **Cluster Management**: Internal cluster descriptors and client factories for managing Flink clusters

49

- **Kubernetes Client**: Internal abstraction layer over Fabric8 Kubernetes client for API interactions

50

- **High Availability**: Kubernetes-based leader election and state management services

51

- **Service Provider Interface**: Automatic integration via SPI for seamless Flink CLI usage

52

53

**Important Note**: Most implementation classes are marked `@Internal` and are not intended for direct use by external applications. Users primarily interact with this module through configuration options and the standard Flink client APIs.

54

55

## Capabilities

56

57

### Kubernetes Configuration

58

59

Configuration options for customizing Kubernetes deployment behavior, cluster resources, and integration settings.

60

61

```java { .api }

62

public class KubernetesConfigOptions {

63

// Cluster Configuration

64

public static final ConfigOption<String> CONTEXT;

65

public static final ConfigOption<String> NAMESPACE; // default: "default"

66

public static final ConfigOption<String> CLUSTER_ID;

67

public static final ConfigOption<String> CONTAINER_IMAGE; // dynamic default

68

public static final ConfigOption<String> KUBE_CONFIG_FILE;

69

70

// Service Configuration

71

public static final ConfigOption<ServiceExposedType> REST_SERVICE_EXPOSED_TYPE; // default: ClusterIP

72

public static final ConfigOption<NodePortAddressType> REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE; // default: InternalIP

73

public static final ConfigOption<String> JOB_MANAGER_SERVICE_ACCOUNT; // default: "default"

74

public static final ConfigOption<String> TASK_MANAGER_SERVICE_ACCOUNT; // default: "default"

75

public static final ConfigOption<String> KUBERNETES_SERVICE_ACCOUNT; // default: "default"

76

77

// Resource Configuration

78

public static final ConfigOption<Double> JOB_MANAGER_CPU; // default: 1.0

79

public static final ConfigOption<Double> JOB_MANAGER_CPU_LIMIT_FACTOR; // default: 1.0

80

public static final ConfigOption<Double> JOB_MANAGER_MEMORY_LIMIT_FACTOR; // default: 1.0

81

public static final ConfigOption<Double> TASK_MANAGER_CPU; // default: -1.0 (auto-calculated)

82

public static final ConfigOption<Double> TASK_MANAGER_CPU_LIMIT_FACTOR; // default: 1.0

83

public static final ConfigOption<Double> TASK_MANAGER_MEMORY_LIMIT_FACTOR; // default: 1.0

84

public static final ConfigOption<Integer> KUBERNETES_JOBMANAGER_REPLICAS; // default: 1

85

86

// Labels and Annotations

87

public static final ConfigOption<Map<String, String>> JOB_MANAGER_LABELS;

88

public static final ConfigOption<Map<String, String>> TASK_MANAGER_LABELS;

89

public static final ConfigOption<Map<String, String>> JOB_MANAGER_ANNOTATIONS;

90

public static final ConfigOption<Map<String, String>> TASK_MANAGER_ANNOTATIONS;

91

public static final ConfigOption<Map<String, String>> REST_SERVICE_ANNOTATIONS;

92

public static final ConfigOption<Map<String, String>> INTERNAL_SERVICE_ANNOTATIONS;

93

94

// Node Selection and Scheduling

95

public static final ConfigOption<Map<String, String>> JOB_MANAGER_NODE_SELECTOR;

96

public static final ConfigOption<Map<String, String>> TASK_MANAGER_NODE_SELECTOR;

97

public static final ConfigOption<List<Map<String, String>>> JOB_MANAGER_TOLERATIONS;

98

public static final ConfigOption<List<Map<String, String>>> TASK_MANAGER_TOLERATIONS;

99

public static final ConfigOption<List<Map<String, String>>> JOB_MANAGER_OWNER_REFERENCE;

100

101

// Container Configuration

102

public static final ConfigOption<ImagePullPolicy> CONTAINER_IMAGE_PULL_POLICY; // default: IfNotPresent

103

public static final ConfigOption<List<String>> CONTAINER_IMAGE_PULL_SECRETS;

104

public static final ConfigOption<String> KUBERNETES_ENTRY_PATH; // default: "/docker-entrypoint.sh"

105

public static final ConfigOption<String> FLINK_CONF_DIR; // default: "/opt/flink/conf"

106

public static final ConfigOption<String> FLINK_LOG_DIR;

107

108

// Secrets and Environment

109

public static final ConfigOption<Map<String, String>> KUBERNETES_SECRETS;

110

public static final ConfigOption<List<Map<String, String>>> KUBERNETES_ENV_SECRET_KEY_REF;

111

112

// Pod Templates

113

public static final ConfigOption<String> JOB_MANAGER_POD_TEMPLATE;

114

public static final ConfigOption<String> TASK_MANAGER_POD_TEMPLATE;

115

public static final ConfigOption<String> KUBERNETES_POD_TEMPLATE;

116

117

// Additional Configuration

118

public static final ConfigOption<String> HADOOP_CONF_CONFIG_MAP;

119

public static final ConfigOption<String> KUBERNETES_JOBMANAGER_ENTRYPOINT_ARGS; // default: ""

120

public static final ConfigOption<String> KUBERNETES_TASKMANAGER_ENTRYPOINT_ARGS; // default: ""

121

public static final ConfigOption<Boolean> KUBERNETES_HOSTNETWORK_ENABLED; // default: false

122

public static final ConfigOption<String> KUBERNETES_CLIENT_USER_AGENT; // default: "flink"

123

public static final ConfigOption<Integer> KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE; // default: 4

124

125

// Transactional Operations

126

public static final ConfigOption<Integer> KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES; // default: 15

127

public static final ConfigOption<Duration> KUBERNETES_TRANSACTIONAL_OPERATION_INITIAL_RETRY_DEALY; // default: 50ms

128

public static final ConfigOption<Duration> KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRY_DEALY; // default: 1 minute

129

130

// Artifact Upload

131

public static final ConfigOption<Boolean> LOCAL_UPLOAD_ENABLED; // default: false

132

public static final ConfigOption<Boolean> LOCAL_UPLOAD_OVERWRITE; // default: false

133

public static final ConfigOption<String> LOCAL_UPLOAD_TARGET;

134

135

// Decorators

136

public static final ConfigOption<Boolean> KUBERNETES_HADOOP_CONF_MOUNT_DECORATOR_ENABLED; // default: true

137

public static final ConfigOption<Boolean> KUBERNETES_KERBEROS_MOUNT_DECORATOR_ENABLED; // default: true

138

}

139

140

enum ServiceExposedType {

141

ClusterIP, // Internal cluster access only

142

NodePort, // External access via node ports

143

LoadBalancer, // External access via load balancer

144

Headless_ClusterIP // Headless service for direct pod access

145

}

146

147

enum NodePortAddressType {

148

InternalIP, // Use internal node IP addresses

149

ExternalIP // Use external node IP addresses

150

}

151

152

enum ImagePullPolicy {

153

IfNotPresent, // Pull image if not present locally

154

Always, // Always pull the latest image

155

Never // Never pull, use local image only

156

}

157

```

158

159

**Usage Example:**

160

161

```java

162

import org.apache.flink.configuration.Configuration;

163

import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;

164

165

Configuration config = new Configuration();

166

167

// Basic cluster configuration

168

config.setString(KubernetesConfigOptions.NAMESPACE, "flink-jobs");

169

config.setString(KubernetesConfigOptions.CLUSTER_ID, "analytics-cluster");

170

config.setString(KubernetesConfigOptions.CONTAINER_IMAGE, "my-registry/flink:2.1.0");

171

172

// Resource allocation

173

config.setDouble(KubernetesConfigOptions.JOB_MANAGER_CPU, 1.0);

174

config.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);

175

176

// Service account configuration

177

config.setString(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, "flink-jobmanager");

178

config.setString(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT, "flink-taskmanager");

179

180

// Service exposure

181

config.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, ServiceExposedType.LoadBalancer);

182

```

183

184

### High Availability Configuration

185

186

Configuration options for Kubernetes-based high availability features including leader election and state management.

187

188

```java { .api }

189

public class KubernetesHighAvailabilityOptions {

190

// Leader Election Configuration

191

public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION; // default: 15 seconds

192

public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE; // default: 15 seconds

193

public static final ConfigOption<Duration> KUBERNETES_RETRY_PERIOD; // default: 5 seconds

194

}

195

```

196

197

**Usage Example:**

198

199

```java

200

import org.apache.flink.configuration.Configuration;

201

import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;

202

import java.time.Duration;

203

204

Configuration config = new Configuration();

205

206

// Enable Kubernetes HA

207

config.setString("high-availability", "kubernetes");

208

209

// Configure leader election timing (these are the default values)

210

config.set(KubernetesHighAvailabilityOptions.KUBERNETES_LEASE_DURATION, Duration.ofSeconds(15));

211

config.set(KubernetesHighAvailabilityOptions.KUBERNETES_RENEW_DEADLINE, Duration.ofSeconds(15));

212

config.set(KubernetesHighAvailabilityOptions.KUBERNETES_RETRY_PERIOD, Duration.ofSeconds(5));

213

```

214

215

### Service Provider Interface Integration

216

217

The module automatically registers with Flink's plugin system through Service Provider Interface (SPI) files. This enables seamless integration without requiring explicit code changes.

218

219

**Registered Services:**

220

221

- **PipelineExecutorFactory**: `org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutorFactory`

222

- **ClusterClientFactory**: `org.apache.flink.kubernetes.KubernetesClusterClientFactory`

223

224

**Usage:**

225

226

```java

227

import org.apache.flink.configuration.Configuration;

228

229

// Simply set deployment target - SPI handles the rest

230

Configuration config = new Configuration();

231

config.setString("deployment.target", "kubernetes-session");

232

// or

233

config.setString("deployment.target", "kubernetes-application");

234

```

235

236

## Types

237

238

```java { .api }

239

// Configuration option types

240

public interface ConfigOption<T> {

241

// Standard Flink ConfigOption interface

242

}

243

244

public enum ServiceExposedType {

245

ClusterIP, // Internal cluster access only

246

NodePort, // External access via node ports

247

LoadBalancer, // External access via load balancer

248

Headless_ClusterIP // Headless service for direct pod access

249

}

250

251

public enum NodePortAddressType {

252

InternalIP, // Use internal node IP addresses

253

ExternalIP // Use external node IP addresses

254

}

255

256

public enum ImagePullPolicy {

257

IfNotPresent, // Pull image if not present locally

258

Always, // Always pull the latest image

259

Never // Never pull, use local image only

260

}

261

262

// Duration type for timing configuration

263

public class Duration {

264

public static Duration ofSeconds(long seconds);

265

public static Duration ofMinutes(long minutes);

266

public static Duration ofMillis(long millis);

267

// Standard Java Duration class

268

}

269

```

270

271

## Deployment Targets

272

273

The module supports two primary deployment modes:

274

275

- **kubernetes-session**: Deploy a long-running Flink session cluster on Kubernetes

276

- **kubernetes-application**: Deploy a single Flink application directly on Kubernetes

277

278

Configure via the `deployment.target` configuration option.

279

280

## Dependencies

281

282

This module includes shaded dependencies to avoid version conflicts:

283

284

- **Fabric8 Kubernetes Client**: Shaded to `org.apache.flink.kubernetes.shaded.io.fabric8`

285

- **Jackson**: Shaded to `org.apache.flink.kubernetes.shaded.com.fasterxml.jackson`

286

- **OkHttp**: Shaded to `org.apache.flink.kubernetes.shaded.okhttp3`

287

- **SnakeYAML**: Shaded to `org.apache.flink.kubernetes.shaded.org.snakeyaml`

288

289

## Error Handling

290

291

The module defines `KubernetesException` for Kubernetes-specific errors, but this is marked as internal. Errors are typically propagated through standard Flink exception handling mechanisms.

292

293

Most errors related to Kubernetes integration will manifest as:

294

- Configuration validation errors during cluster startup

295

- Resource allocation failures during pod creation

296

- Network connectivity issues during cluster communication

297

298

## Notes

299

300

- **Internal APIs**: Most classes in this module are marked `@Internal` and subject to change between versions

301

- **Configuration-Driven**: Primary interaction is through configuration options rather than direct API calls

302

- **SPI Integration**: Automatically integrates with Flink CLI and client APIs through service provider interfaces

303

- **Shaded Dependencies**: All external dependencies are relocated to prevent conflicts with user applications