0
# Flink Kubernetes
1
2
Apache Flink Kubernetes integration module that provides native Kubernetes support for deploying and managing Flink clusters on Kubernetes. This module enables Flink to leverage Kubernetes as a resource manager, supporting both session and application clusters with features like high availability, leader election, checkpoint recovery, and state management using Kubernetes resources.
3
4
## Package Information
5
6
- **Package Name**: flink-kubernetes
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-kubernetes
11
- **Installation**: Add as a Maven dependency to your project
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-kubernetes</artifactId>
17
<version>2.1.0</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
25
import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;
26
```
27
28
## Basic Usage
29
30
This module is typically used through configuration rather than direct API calls. Configure Flink to use Kubernetes as a deployment target:
31
32
```java
33
import org.apache.flink.configuration.Configuration;
34
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
35
36
Configuration config = new Configuration();
37
config.setString("deployment.target", "kubernetes-session");
38
config.setString(KubernetesConfigOptions.NAMESPACE, "flink");
39
config.setString(KubernetesConfigOptions.CLUSTER_ID, "my-flink-cluster");
40
config.setString(KubernetesConfigOptions.CONTAINER_IMAGE, "flink:2.1.0");
41
```
42
43
## Architecture
44
45
The flink-kubernetes module is built around several key components:
46
47
- **Configuration System**: Public configuration options for customizing Kubernetes deployments
48
- **Cluster Management**: Internal cluster descriptors and client factories for managing Flink clusters
49
- **Kubernetes Client**: Internal abstraction layer over Fabric8 Kubernetes client for API interactions
50
- **High Availability**: Kubernetes-based leader election and state management services
51
- **Service Provider Interface**: Automatic integration via SPI for seamless Flink CLI usage
52
53
**Important Note**: Most implementation classes are marked `@Internal` and are not intended for direct use by external applications. Users primarily interact with this module through configuration options and the standard Flink client APIs.
54
55
## Capabilities
56
57
### Kubernetes Configuration
58
59
Configuration options for customizing Kubernetes deployment behavior, cluster resources, and integration settings.
60
61
```java { .api }
62
public class KubernetesConfigOptions {
63
// Cluster Configuration
64
public static final ConfigOption<String> CONTEXT;
65
public static final ConfigOption<String> NAMESPACE; // default: "default"
66
public static final ConfigOption<String> CLUSTER_ID;
67
public static final ConfigOption<String> CONTAINER_IMAGE; // dynamic default
68
public static final ConfigOption<String> KUBE_CONFIG_FILE;
69
70
// Service Configuration
71
public static final ConfigOption<ServiceExposedType> REST_SERVICE_EXPOSED_TYPE; // default: ClusterIP
72
public static final ConfigOption<NodePortAddressType> REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE; // default: InternalIP
73
public static final ConfigOption<String> JOB_MANAGER_SERVICE_ACCOUNT; // default: "default"
74
public static final ConfigOption<String> TASK_MANAGER_SERVICE_ACCOUNT; // default: "default"
75
public static final ConfigOption<String> KUBERNETES_SERVICE_ACCOUNT; // default: "default"
76
77
// Resource Configuration
78
public static final ConfigOption<Double> JOB_MANAGER_CPU; // default: 1.0
79
public static final ConfigOption<Double> JOB_MANAGER_CPU_LIMIT_FACTOR; // default: 1.0
80
public static final ConfigOption<Double> JOB_MANAGER_MEMORY_LIMIT_FACTOR; // default: 1.0
81
public static final ConfigOption<Double> TASK_MANAGER_CPU; // default: -1.0 (auto-calculated)
82
public static final ConfigOption<Double> TASK_MANAGER_CPU_LIMIT_FACTOR; // default: 1.0
83
public static final ConfigOption<Double> TASK_MANAGER_MEMORY_LIMIT_FACTOR; // default: 1.0
84
public static final ConfigOption<Integer> KUBERNETES_JOBMANAGER_REPLICAS; // default: 1
85
86
// Labels and Annotations
87
public static final ConfigOption<Map<String, String>> JOB_MANAGER_LABELS;
88
public static final ConfigOption<Map<String, String>> TASK_MANAGER_LABELS;
89
public static final ConfigOption<Map<String, String>> JOB_MANAGER_ANNOTATIONS;
90
public static final ConfigOption<Map<String, String>> TASK_MANAGER_ANNOTATIONS;
91
public static final ConfigOption<Map<String, String>> REST_SERVICE_ANNOTATIONS;
92
public static final ConfigOption<Map<String, String>> INTERNAL_SERVICE_ANNOTATIONS;
93
94
// Node Selection and Scheduling
95
public static final ConfigOption<Map<String, String>> JOB_MANAGER_NODE_SELECTOR;
96
public static final ConfigOption<Map<String, String>> TASK_MANAGER_NODE_SELECTOR;
97
public static final ConfigOption<List<Map<String, String>>> JOB_MANAGER_TOLERATIONS;
98
public static final ConfigOption<List<Map<String, String>>> TASK_MANAGER_TOLERATIONS;
99
public static final ConfigOption<List<Map<String, String>>> JOB_MANAGER_OWNER_REFERENCE;
100
101
// Container Configuration
102
public static final ConfigOption<ImagePullPolicy> CONTAINER_IMAGE_PULL_POLICY; // default: IfNotPresent
103
public static final ConfigOption<List<String>> CONTAINER_IMAGE_PULL_SECRETS;
104
public static final ConfigOption<String> KUBERNETES_ENTRY_PATH; // default: "/docker-entrypoint.sh"
105
public static final ConfigOption<String> FLINK_CONF_DIR; // default: "/opt/flink/conf"
106
public static final ConfigOption<String> FLINK_LOG_DIR;
107
108
// Secrets and Environment
109
public static final ConfigOption<Map<String, String>> KUBERNETES_SECRETS;
110
public static final ConfigOption<List<Map<String, String>>> KUBERNETES_ENV_SECRET_KEY_REF;
111
112
// Pod Templates
113
public static final ConfigOption<String> JOB_MANAGER_POD_TEMPLATE;
114
public static final ConfigOption<String> TASK_MANAGER_POD_TEMPLATE;
115
public static final ConfigOption<String> KUBERNETES_POD_TEMPLATE;
116
117
// Additional Configuration
118
public static final ConfigOption<String> HADOOP_CONF_CONFIG_MAP;
119
public static final ConfigOption<String> KUBERNETES_JOBMANAGER_ENTRYPOINT_ARGS; // default: ""
120
public static final ConfigOption<String> KUBERNETES_TASKMANAGER_ENTRYPOINT_ARGS; // default: ""
121
public static final ConfigOption<Boolean> KUBERNETES_HOSTNETWORK_ENABLED; // default: false
122
public static final ConfigOption<String> KUBERNETES_CLIENT_USER_AGENT; // default: "flink"
123
public static final ConfigOption<Integer> KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE; // default: 4
124
125
// Transactional Operations
126
public static final ConfigOption<Integer> KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES; // default: 15
127
public static final ConfigOption<Duration> KUBERNETES_TRANSACTIONAL_OPERATION_INITIAL_RETRY_DEALY; // default: 50ms
128
public static final ConfigOption<Duration> KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRY_DEALY; // default: 1 minute
129
130
// Artifact Upload
131
public static final ConfigOption<Boolean> LOCAL_UPLOAD_ENABLED; // default: false
132
public static final ConfigOption<Boolean> LOCAL_UPLOAD_OVERWRITE; // default: false
133
public static final ConfigOption<String> LOCAL_UPLOAD_TARGET;
134
135
// Decorators
136
public static final ConfigOption<Boolean> KUBERNETES_HADOOP_CONF_MOUNT_DECORATOR_ENABLED; // default: true
137
public static final ConfigOption<Boolean> KUBERNETES_KERBEROS_MOUNT_DECORATOR_ENABLED; // default: true
138
}
139
140
enum ServiceExposedType {
141
ClusterIP, // Internal cluster access only
142
NodePort, // External access via node ports
143
LoadBalancer, // External access via load balancer
144
Headless_ClusterIP // Headless service for direct pod access
145
}
146
147
enum NodePortAddressType {
148
InternalIP, // Use internal node IP addresses
149
ExternalIP // Use external node IP addresses
150
}
151
152
enum ImagePullPolicy {
153
IfNotPresent, // Pull image if not present locally
154
Always, // Always pull the latest image
155
Never // Never pull, use local image only
156
}
157
```
158
159
**Usage Example:**
160
161
```java
162
import org.apache.flink.configuration.Configuration;
163
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
164
165
Configuration config = new Configuration();
166
167
// Basic cluster configuration
168
config.setString(KubernetesConfigOptions.NAMESPACE, "flink-jobs");
169
config.setString(KubernetesConfigOptions.CLUSTER_ID, "analytics-cluster");
170
config.setString(KubernetesConfigOptions.CONTAINER_IMAGE, "my-registry/flink:2.1.0");
171
172
// Resource allocation
173
config.setDouble(KubernetesConfigOptions.JOB_MANAGER_CPU, 1.0);
174
config.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
175
176
// Service account configuration
177
config.setString(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, "flink-jobmanager");
178
config.setString(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT, "flink-taskmanager");
179
180
// Service exposure
181
config.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, ServiceExposedType.LoadBalancer);
182
```
183
184
### High Availability Configuration
185
186
Configuration options for Kubernetes-based high availability features including leader election and state management.
187
188
```java { .api }
189
public class KubernetesHighAvailabilityOptions {
190
// Leader Election Configuration
191
public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION; // default: 15 seconds
192
public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE; // default: 15 seconds
193
public static final ConfigOption<Duration> KUBERNETES_RETRY_PERIOD; // default: 5 seconds
194
}
195
```
196
197
**Usage Example:**
198
199
```java
200
import org.apache.flink.configuration.Configuration;
201
import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;
202
import java.time.Duration;
203
204
Configuration config = new Configuration();
205
206
// Enable Kubernetes HA
207
config.setString("high-availability", "kubernetes");
208
209
// Configure leader election timing (these are the default values)
210
config.set(KubernetesHighAvailabilityOptions.KUBERNETES_LEASE_DURATION, Duration.ofSeconds(15));
211
config.set(KubernetesHighAvailabilityOptions.KUBERNETES_RENEW_DEADLINE, Duration.ofSeconds(15));
212
config.set(KubernetesHighAvailabilityOptions.KUBERNETES_RETRY_PERIOD, Duration.ofSeconds(5));
213
```
214
215
### Service Provider Interface Integration
216
217
The module automatically registers with Flink's plugin system through Service Provider Interface (SPI) files. This enables seamless integration without requiring explicit code changes.
218
219
**Registered Services:**
220
221
- **PipelineExecutorFactory**: `org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutorFactory`
222
- **ClusterClientFactory**: `org.apache.flink.kubernetes.KubernetesClusterClientFactory`
223
224
**Usage:**
225
226
```java
227
import org.apache.flink.configuration.Configuration;
228
229
// Simply set deployment target - SPI handles the rest
230
Configuration config = new Configuration();
231
config.setString("deployment.target", "kubernetes-session");
232
// or
233
config.setString("deployment.target", "kubernetes-application");
234
```
235
236
## Types
237
238
```java { .api }
239
// Configuration option types
240
public interface ConfigOption<T> {
241
// Standard Flink ConfigOption interface
242
}
243
244
public enum ServiceExposedType {
245
ClusterIP, // Internal cluster access only
246
NodePort, // External access via node ports
247
LoadBalancer, // External access via load balancer
248
Headless_ClusterIP // Headless service for direct pod access
249
}
250
251
public enum NodePortAddressType {
252
InternalIP, // Use internal node IP addresses
253
ExternalIP // Use external node IP addresses
254
}
255
256
public enum ImagePullPolicy {
257
IfNotPresent, // Pull image if not present locally
258
Always, // Always pull the latest image
259
Never // Never pull, use local image only
260
}
261
262
// Duration type for timing configuration
263
public class Duration {
264
public static Duration ofSeconds(long seconds);
265
public static Duration ofMinutes(long minutes);
266
public static Duration ofMillis(long millis);
267
// Standard Java Duration class
268
}
269
```
270
271
## Deployment Targets
272
273
The module supports two primary deployment modes:
274
275
- **kubernetes-session**: Deploy a long-running Flink session cluster on Kubernetes
276
- **kubernetes-application**: Deploy a single Flink application directly on Kubernetes
277
278
Configure via the `deployment.target` configuration option.
279
280
## Dependencies
281
282
This module includes shaded dependencies to avoid version conflicts:
283
284
- **Fabric8 Kubernetes Client**: Shaded to `org.apache.flink.kubernetes.shaded.io.fabric8`
285
- **Jackson**: Shaded to `org.apache.flink.kubernetes.shaded.com.fasterxml.jackson`
286
- **OkHttp**: Shaded to `org.apache.flink.kubernetes.shaded.okhttp3`
287
- **SnakeYAML**: Shaded to `org.apache.flink.kubernetes.shaded.org.snakeyaml`
288
289
## Error Handling
290
291
The module defines `KubernetesException` for Kubernetes-specific errors, but this is marked as internal. Errors are typically propagated through standard Flink exception handling mechanisms.
292
293
Most errors related to Kubernetes integration will manifest as:
294
- Configuration validation errors during cluster startup
295
- Resource allocation failures during pod creation
296
- Network connectivity issues during cluster communication
297
298
## Notes
299
300
- **Internal APIs**: Most classes in this module are marked `@Internal` and subject to change between versions
301
- **Configuration-Driven**: Primary interaction is through configuration options rather than direct API calls
302
- **SPI Integration**: Automatically integrates with Flink CLI and client APIs through service provider interfaces
303
- **Shaded Dependencies**: All external dependencies are relocated to prevent conflicts with user applications