or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-flink--flink-kubernetes

Apache Flink Kubernetes integration module that provides native Kubernetes support for deploying and managing Flink clusters on Kubernetes.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-kubernetes@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-kubernetes@2.1.0

index.mddocs/

Flink Kubernetes

Apache Flink Kubernetes integration module that provides native Kubernetes support for deploying and managing Flink clusters on Kubernetes. This module enables Flink to leverage Kubernetes as a resource manager, supporting both session and application clusters with features like high availability, leader election, checkpoint recovery, and state management using Kubernetes resources.

Package Information

  • Package Name: flink-kubernetes
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-kubernetes
  • Installation: Add as a Maven dependency to your project
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-kubernetes</artifactId>
    <version>2.1.0</version>
</dependency>

Core Imports

import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;

Basic Usage

This module is typically used through configuration rather than direct API calls. Configure Flink to use Kubernetes as a deployment target:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;

Configuration config = new Configuration();
config.setString("deployment.target", "kubernetes-session");
config.setString(KubernetesConfigOptions.NAMESPACE, "flink");
config.setString(KubernetesConfigOptions.CLUSTER_ID, "my-flink-cluster");
config.setString(KubernetesConfigOptions.CONTAINER_IMAGE, "flink:2.1.0");

Architecture

The flink-kubernetes module is built around several key components:

  • Configuration System: Public configuration options for customizing Kubernetes deployments
  • Cluster Management: Internal cluster descriptors and client factories for managing Flink clusters
  • Kubernetes Client: Internal abstraction layer over Fabric8 Kubernetes client for API interactions
  • High Availability: Kubernetes-based leader election and state management services
  • Service Provider Interface: Automatic integration via SPI for seamless Flink CLI usage

Important Note: Most implementation classes are marked @Internal and are not intended for direct use by external applications. Users primarily interact with this module through configuration options and the standard Flink client APIs.

Capabilities

Kubernetes Configuration

Configuration options for customizing Kubernetes deployment behavior, cluster resources, and integration settings.

public class KubernetesConfigOptions {
    // Cluster Configuration
    public static final ConfigOption<String> CONTEXT;
    public static final ConfigOption<String> NAMESPACE;           // default: "default"
    public static final ConfigOption<String> CLUSTER_ID;
    public static final ConfigOption<String> CONTAINER_IMAGE;     // dynamic default
    public static final ConfigOption<String> KUBE_CONFIG_FILE;
    
    // Service Configuration  
    public static final ConfigOption<ServiceExposedType> REST_SERVICE_EXPOSED_TYPE; // default: ClusterIP
    public static final ConfigOption<NodePortAddressType> REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE; // default: InternalIP
    public static final ConfigOption<String> JOB_MANAGER_SERVICE_ACCOUNT;          // default: "default"
    public static final ConfigOption<String> TASK_MANAGER_SERVICE_ACCOUNT;         // default: "default"
    public static final ConfigOption<String> KUBERNETES_SERVICE_ACCOUNT;          // default: "default"
    
    // Resource Configuration
    public static final ConfigOption<Double> JOB_MANAGER_CPU;                      // default: 1.0
    public static final ConfigOption<Double> JOB_MANAGER_CPU_LIMIT_FACTOR;         // default: 1.0
    public static final ConfigOption<Double> JOB_MANAGER_MEMORY_LIMIT_FACTOR;      // default: 1.0
    public static final ConfigOption<Double> TASK_MANAGER_CPU;                     // default: -1.0 (auto-calculated)
    public static final ConfigOption<Double> TASK_MANAGER_CPU_LIMIT_FACTOR;        // default: 1.0
    public static final ConfigOption<Double> TASK_MANAGER_MEMORY_LIMIT_FACTOR;     // default: 1.0
    public static final ConfigOption<Integer> KUBERNETES_JOBMANAGER_REPLICAS;      // default: 1
    
    // Labels and Annotations
    public static final ConfigOption<Map<String, String>> JOB_MANAGER_LABELS;
    public static final ConfigOption<Map<String, String>> TASK_MANAGER_LABELS;
    public static final ConfigOption<Map<String, String>> JOB_MANAGER_ANNOTATIONS;
    public static final ConfigOption<Map<String, String>> TASK_MANAGER_ANNOTATIONS;
    public static final ConfigOption<Map<String, String>> REST_SERVICE_ANNOTATIONS;
    public static final ConfigOption<Map<String, String>> INTERNAL_SERVICE_ANNOTATIONS;
    
    // Node Selection and Scheduling
    public static final ConfigOption<Map<String, String>> JOB_MANAGER_NODE_SELECTOR;
    public static final ConfigOption<Map<String, String>> TASK_MANAGER_NODE_SELECTOR;
    public static final ConfigOption<List<Map<String, String>>> JOB_MANAGER_TOLERATIONS;
    public static final ConfigOption<List<Map<String, String>>> TASK_MANAGER_TOLERATIONS;
    public static final ConfigOption<List<Map<String, String>>> JOB_MANAGER_OWNER_REFERENCE;
    
    // Container Configuration
    public static final ConfigOption<ImagePullPolicy> CONTAINER_IMAGE_PULL_POLICY; // default: IfNotPresent
    public static final ConfigOption<List<String>> CONTAINER_IMAGE_PULL_SECRETS;
    public static final ConfigOption<String> KUBERNETES_ENTRY_PATH;                // default: "/docker-entrypoint.sh"
    public static final ConfigOption<String> FLINK_CONF_DIR;                       // default: "/opt/flink/conf"
    public static final ConfigOption<String> FLINK_LOG_DIR;
    
    // Secrets and Environment
    public static final ConfigOption<Map<String, String>> KUBERNETES_SECRETS;
    public static final ConfigOption<List<Map<String, String>>> KUBERNETES_ENV_SECRET_KEY_REF;
    
    // Pod Templates
    public static final ConfigOption<String> JOB_MANAGER_POD_TEMPLATE;
    public static final ConfigOption<String> TASK_MANAGER_POD_TEMPLATE;  
    public static final ConfigOption<String> KUBERNETES_POD_TEMPLATE;
    
    // Additional Configuration
    public static final ConfigOption<String> HADOOP_CONF_CONFIG_MAP;
    public static final ConfigOption<String> KUBERNETES_JOBMANAGER_ENTRYPOINT_ARGS; // default: ""
    public static final ConfigOption<String> KUBERNETES_TASKMANAGER_ENTRYPOINT_ARGS; // default: ""
    public static final ConfigOption<Boolean> KUBERNETES_HOSTNETWORK_ENABLED;      // default: false
    public static final ConfigOption<String> KUBERNETES_CLIENT_USER_AGENT;         // default: "flink"
    public static final ConfigOption<Integer> KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE; // default: 4
    
    // Transactional Operations
    public static final ConfigOption<Integer> KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES; // default: 15
    public static final ConfigOption<Duration> KUBERNETES_TRANSACTIONAL_OPERATION_INITIAL_RETRY_DEALY; // default: 50ms
    public static final ConfigOption<Duration> KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRY_DEALY; // default: 1 minute
    
    // Artifact Upload
    public static final ConfigOption<Boolean> LOCAL_UPLOAD_ENABLED;                // default: false
    public static final ConfigOption<Boolean> LOCAL_UPLOAD_OVERWRITE;              // default: false
    public static final ConfigOption<String> LOCAL_UPLOAD_TARGET;
    
    // Decorators
    public static final ConfigOption<Boolean> KUBERNETES_HADOOP_CONF_MOUNT_DECORATOR_ENABLED; // default: true
    public static final ConfigOption<Boolean> KUBERNETES_KERBEROS_MOUNT_DECORATOR_ENABLED;    // default: true
}

enum ServiceExposedType {
    ClusterIP,           // Internal cluster access only
    NodePort,            // External access via node ports  
    LoadBalancer,        // External access via load balancer
    Headless_ClusterIP   // Headless service for direct pod access
}

enum NodePortAddressType {
    InternalIP,          // Use internal node IP addresses
    ExternalIP           // Use external node IP addresses  
}

enum ImagePullPolicy {
    IfNotPresent,        // Pull image if not present locally
    Always,              // Always pull the latest image
    Never                // Never pull, use local image only
}

Usage Example:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;

Configuration config = new Configuration();

// Basic cluster configuration
config.setString(KubernetesConfigOptions.NAMESPACE, "flink-jobs");
config.setString(KubernetesConfigOptions.CLUSTER_ID, "analytics-cluster");
config.setString(KubernetesConfigOptions.CONTAINER_IMAGE, "my-registry/flink:2.1.0");

// Resource allocation
config.setDouble(KubernetesConfigOptions.JOB_MANAGER_CPU, 1.0);
config.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);

// Service account configuration
config.setString(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, "flink-jobmanager");
config.setString(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT, "flink-taskmanager");

// Service exposure
config.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, ServiceExposedType.LoadBalancer);

High Availability Configuration

Configuration options for Kubernetes-based high availability features including leader election and state management.

public class KubernetesHighAvailabilityOptions {
    // Leader Election Configuration
    public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION;    // default: 15 seconds
    public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE;    // default: 15 seconds  
    public static final ConfigOption<Duration> KUBERNETES_RETRY_PERIOD;      // default: 5 seconds
}

Usage Example:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;
import java.time.Duration;

Configuration config = new Configuration();

// Enable Kubernetes HA
config.setString("high-availability", "kubernetes");

// Configure leader election timing (these are the default values)
config.set(KubernetesHighAvailabilityOptions.KUBERNETES_LEASE_DURATION, Duration.ofSeconds(15));
config.set(KubernetesHighAvailabilityOptions.KUBERNETES_RENEW_DEADLINE, Duration.ofSeconds(15));
config.set(KubernetesHighAvailabilityOptions.KUBERNETES_RETRY_PERIOD, Duration.ofSeconds(5));

Service Provider Interface Integration

The module automatically registers with Flink's plugin system through Service Provider Interface (SPI) files. This enables seamless integration without requiring explicit code changes.

Registered Services:

  • PipelineExecutorFactory: org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutorFactory
  • ClusterClientFactory: org.apache.flink.kubernetes.KubernetesClusterClientFactory

Usage:

import org.apache.flink.configuration.Configuration;

// Simply set deployment target - SPI handles the rest
Configuration config = new Configuration();
config.setString("deployment.target", "kubernetes-session");
// or
config.setString("deployment.target", "kubernetes-application");

Types

// Configuration option types
public interface ConfigOption<T> {
    // Standard Flink ConfigOption interface
}

public enum ServiceExposedType {
    ClusterIP,           // Internal cluster access only
    NodePort,            // External access via node ports
    LoadBalancer,        // External access via load balancer
    Headless_ClusterIP   // Headless service for direct pod access
}

public enum NodePortAddressType {
    InternalIP,          // Use internal node IP addresses
    ExternalIP           // Use external node IP addresses
}

public enum ImagePullPolicy {
    IfNotPresent,        // Pull image if not present locally
    Always,              // Always pull the latest image
    Never                // Never pull, use local image only
}

// Duration type for timing configuration
public class Duration {
    public static Duration ofSeconds(long seconds);
    public static Duration ofMinutes(long minutes);
    public static Duration ofMillis(long millis);
    // Standard Java Duration class
}

Deployment Targets

The module supports two primary deployment modes:

  • kubernetes-session: Deploy a long-running Flink session cluster on Kubernetes
  • kubernetes-application: Deploy a single Flink application directly on Kubernetes

Configure via the deployment.target configuration option.

Dependencies

This module includes shaded dependencies to avoid version conflicts:

  • Fabric8 Kubernetes Client: Shaded to org.apache.flink.kubernetes.shaded.io.fabric8
  • Jackson: Shaded to org.apache.flink.kubernetes.shaded.com.fasterxml.jackson
  • OkHttp: Shaded to org.apache.flink.kubernetes.shaded.okhttp3
  • SnakeYAML: Shaded to org.apache.flink.kubernetes.shaded.org.snakeyaml

Error Handling

The module defines KubernetesException for Kubernetes-specific errors, but this is marked as internal. Errors are typically propagated through standard Flink exception handling mechanisms.

Most errors related to Kubernetes integration will manifest as:

  • Configuration validation errors during cluster startup
  • Resource allocation failures during pod creation
  • Network connectivity issues during cluster communication

Notes

  • Internal APIs: Most classes in this module are marked @Internal and subject to change between versions
  • Configuration-Driven: Primary interaction is through configuration options rather than direct API calls
  • SPI Integration: Automatically integrates with Flink CLI and client APIs through service provider interfaces
  • Shaded Dependencies: All external dependencies are relocated to prevent conflicts with user applications