Apache Flink Kubernetes integration module that provides native Kubernetes support for deploying and managing Flink clusters on Kubernetes.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-kubernetes@2.1.0Apache Flink Kubernetes integration module that provides native Kubernetes support for deploying and managing Flink clusters on Kubernetes. This module enables Flink to leverage Kubernetes as a resource manager, supporting both session and application clusters with features like high availability, leader election, checkpoint recovery, and state management using Kubernetes resources.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;This module is typically used through configuration rather than direct API calls. Configure Flink to use Kubernetes as a deployment target:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
Configuration config = new Configuration();
config.setString("deployment.target", "kubernetes-session");
config.setString(KubernetesConfigOptions.NAMESPACE, "flink");
config.setString(KubernetesConfigOptions.CLUSTER_ID, "my-flink-cluster");
config.setString(KubernetesConfigOptions.CONTAINER_IMAGE, "flink:2.1.0");The flink-kubernetes module is built around several key components:
Important Note: Most implementation classes are marked @Internal and are not intended for direct use by external applications. Users primarily interact with this module through configuration options and the standard Flink client APIs.
Configuration options for customizing Kubernetes deployment behavior, cluster resources, and integration settings.
public class KubernetesConfigOptions {
// Cluster Configuration
public static final ConfigOption<String> CONTEXT;
public static final ConfigOption<String> NAMESPACE; // default: "default"
public static final ConfigOption<String> CLUSTER_ID;
public static final ConfigOption<String> CONTAINER_IMAGE; // dynamic default
public static final ConfigOption<String> KUBE_CONFIG_FILE;
// Service Configuration
public static final ConfigOption<ServiceExposedType> REST_SERVICE_EXPOSED_TYPE; // default: ClusterIP
public static final ConfigOption<NodePortAddressType> REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE; // default: InternalIP
public static final ConfigOption<String> JOB_MANAGER_SERVICE_ACCOUNT; // default: "default"
public static final ConfigOption<String> TASK_MANAGER_SERVICE_ACCOUNT; // default: "default"
public static final ConfigOption<String> KUBERNETES_SERVICE_ACCOUNT; // default: "default"
// Resource Configuration
public static final ConfigOption<Double> JOB_MANAGER_CPU; // default: 1.0
public static final ConfigOption<Double> JOB_MANAGER_CPU_LIMIT_FACTOR; // default: 1.0
public static final ConfigOption<Double> JOB_MANAGER_MEMORY_LIMIT_FACTOR; // default: 1.0
public static final ConfigOption<Double> TASK_MANAGER_CPU; // default: -1.0 (auto-calculated)
public static final ConfigOption<Double> TASK_MANAGER_CPU_LIMIT_FACTOR; // default: 1.0
public static final ConfigOption<Double> TASK_MANAGER_MEMORY_LIMIT_FACTOR; // default: 1.0
public static final ConfigOption<Integer> KUBERNETES_JOBMANAGER_REPLICAS; // default: 1
// Labels and Annotations
public static final ConfigOption<Map<String, String>> JOB_MANAGER_LABELS;
public static final ConfigOption<Map<String, String>> TASK_MANAGER_LABELS;
public static final ConfigOption<Map<String, String>> JOB_MANAGER_ANNOTATIONS;
public static final ConfigOption<Map<String, String>> TASK_MANAGER_ANNOTATIONS;
public static final ConfigOption<Map<String, String>> REST_SERVICE_ANNOTATIONS;
public static final ConfigOption<Map<String, String>> INTERNAL_SERVICE_ANNOTATIONS;
// Node Selection and Scheduling
public static final ConfigOption<Map<String, String>> JOB_MANAGER_NODE_SELECTOR;
public static final ConfigOption<Map<String, String>> TASK_MANAGER_NODE_SELECTOR;
public static final ConfigOption<List<Map<String, String>>> JOB_MANAGER_TOLERATIONS;
public static final ConfigOption<List<Map<String, String>>> TASK_MANAGER_TOLERATIONS;
public static final ConfigOption<List<Map<String, String>>> JOB_MANAGER_OWNER_REFERENCE;
// Container Configuration
public static final ConfigOption<ImagePullPolicy> CONTAINER_IMAGE_PULL_POLICY; // default: IfNotPresent
public static final ConfigOption<List<String>> CONTAINER_IMAGE_PULL_SECRETS;
public static final ConfigOption<String> KUBERNETES_ENTRY_PATH; // default: "/docker-entrypoint.sh"
public static final ConfigOption<String> FLINK_CONF_DIR; // default: "/opt/flink/conf"
public static final ConfigOption<String> FLINK_LOG_DIR;
// Secrets and Environment
public static final ConfigOption<Map<String, String>> KUBERNETES_SECRETS;
public static final ConfigOption<List<Map<String, String>>> KUBERNETES_ENV_SECRET_KEY_REF;
// Pod Templates
public static final ConfigOption<String> JOB_MANAGER_POD_TEMPLATE;
public static final ConfigOption<String> TASK_MANAGER_POD_TEMPLATE;
public static final ConfigOption<String> KUBERNETES_POD_TEMPLATE;
// Additional Configuration
public static final ConfigOption<String> HADOOP_CONF_CONFIG_MAP;
public static final ConfigOption<String> KUBERNETES_JOBMANAGER_ENTRYPOINT_ARGS; // default: ""
public static final ConfigOption<String> KUBERNETES_TASKMANAGER_ENTRYPOINT_ARGS; // default: ""
public static final ConfigOption<Boolean> KUBERNETES_HOSTNETWORK_ENABLED; // default: false
public static final ConfigOption<String> KUBERNETES_CLIENT_USER_AGENT; // default: "flink"
public static final ConfigOption<Integer> KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE; // default: 4
// Transactional Operations
public static final ConfigOption<Integer> KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES; // default: 15
public static final ConfigOption<Duration> KUBERNETES_TRANSACTIONAL_OPERATION_INITIAL_RETRY_DEALY; // default: 50ms
public static final ConfigOption<Duration> KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRY_DEALY; // default: 1 minute
// Artifact Upload
public static final ConfigOption<Boolean> LOCAL_UPLOAD_ENABLED; // default: false
public static final ConfigOption<Boolean> LOCAL_UPLOAD_OVERWRITE; // default: false
public static final ConfigOption<String> LOCAL_UPLOAD_TARGET;
// Decorators
public static final ConfigOption<Boolean> KUBERNETES_HADOOP_CONF_MOUNT_DECORATOR_ENABLED; // default: true
public static final ConfigOption<Boolean> KUBERNETES_KERBEROS_MOUNT_DECORATOR_ENABLED; // default: true
}
enum ServiceExposedType {
ClusterIP, // Internal cluster access only
NodePort, // External access via node ports
LoadBalancer, // External access via load balancer
Headless_ClusterIP // Headless service for direct pod access
}
enum NodePortAddressType {
InternalIP, // Use internal node IP addresses
ExternalIP // Use external node IP addresses
}
enum ImagePullPolicy {
IfNotPresent, // Pull image if not present locally
Always, // Always pull the latest image
Never // Never pull, use local image only
}Usage Example:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
Configuration config = new Configuration();
// Basic cluster configuration
config.setString(KubernetesConfigOptions.NAMESPACE, "flink-jobs");
config.setString(KubernetesConfigOptions.CLUSTER_ID, "analytics-cluster");
config.setString(KubernetesConfigOptions.CONTAINER_IMAGE, "my-registry/flink:2.1.0");
// Resource allocation
config.setDouble(KubernetesConfigOptions.JOB_MANAGER_CPU, 1.0);
config.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
// Service account configuration
config.setString(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, "flink-jobmanager");
config.setString(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT, "flink-taskmanager");
// Service exposure
config.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, ServiceExposedType.LoadBalancer);Configuration options for Kubernetes-based high availability features including leader election and state management.
public class KubernetesHighAvailabilityOptions {
// Leader Election Configuration
public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION; // default: 15 seconds
public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE; // default: 15 seconds
public static final ConfigOption<Duration> KUBERNETES_RETRY_PERIOD; // default: 5 seconds
}Usage Example:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;
import java.time.Duration;
Configuration config = new Configuration();
// Enable Kubernetes HA
config.setString("high-availability", "kubernetes");
// Configure leader election timing (these are the default values)
config.set(KubernetesHighAvailabilityOptions.KUBERNETES_LEASE_DURATION, Duration.ofSeconds(15));
config.set(KubernetesHighAvailabilityOptions.KUBERNETES_RENEW_DEADLINE, Duration.ofSeconds(15));
config.set(KubernetesHighAvailabilityOptions.KUBERNETES_RETRY_PERIOD, Duration.ofSeconds(5));The module automatically registers with Flink's plugin system through Service Provider Interface (SPI) files. This enables seamless integration without requiring explicit code changes.
Registered Services:
org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutorFactoryorg.apache.flink.kubernetes.KubernetesClusterClientFactoryUsage:
import org.apache.flink.configuration.Configuration;
// Simply set deployment target - SPI handles the rest
Configuration config = new Configuration();
config.setString("deployment.target", "kubernetes-session");
// or
config.setString("deployment.target", "kubernetes-application");// Configuration option types
public interface ConfigOption<T> {
// Standard Flink ConfigOption interface
}
public enum ServiceExposedType {
ClusterIP, // Internal cluster access only
NodePort, // External access via node ports
LoadBalancer, // External access via load balancer
Headless_ClusterIP // Headless service for direct pod access
}
public enum NodePortAddressType {
InternalIP, // Use internal node IP addresses
ExternalIP // Use external node IP addresses
}
public enum ImagePullPolicy {
IfNotPresent, // Pull image if not present locally
Always, // Always pull the latest image
Never // Never pull, use local image only
}
// Duration type for timing configuration
public class Duration {
public static Duration ofSeconds(long seconds);
public static Duration ofMinutes(long minutes);
public static Duration ofMillis(long millis);
// Standard Java Duration class
}The module supports two primary deployment modes:
Configure via the deployment.target configuration option.
This module includes shaded dependencies to avoid version conflicts:
org.apache.flink.kubernetes.shaded.io.fabric8org.apache.flink.kubernetes.shaded.com.fasterxml.jacksonorg.apache.flink.kubernetes.shaded.okhttp3org.apache.flink.kubernetes.shaded.org.snakeyamlThe module defines KubernetesException for Kubernetes-specific errors, but this is marked as internal. Errors are typically propagated through standard Flink exception handling mechanisms.
Most errors related to Kubernetes integration will manifest as:
@Internal and subject to change between versions