CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

eks-kubernetes.mddocs/

Amazon EKS (Elastic Kubernetes Service)

Amazon EKS enables managed Kubernetes cluster operations with comprehensive support for cluster lifecycle management, node group operations, Fargate profiles, and Kubernetes workload execution through Airflow.

Capabilities

Cluster Management

Create, configure, and manage Amazon EKS clusters with full lifecycle support.

class EksCreateClusterOperator(AwsBaseOperator):
    """
    Create an Amazon EKS cluster.
    
    Parameters:
    - cluster_name: str - unique name for the EKS cluster
    - cluster_role_arn: str - ARN of the IAM role for the EKS service
    - resources_vpc_config: dict - VPC configuration for the cluster
    - compute: str - compute type ('nodegroup' or 'fargate')
    - create_cluster_kwargs: dict - additional cluster creation parameters
    - nodegroup_name: str - name for the managed node group
    - nodegroup_role_arn: str - ARN of the IAM role for worker nodes
    - create_nodegroup_kwargs: dict - additional node group parameters
    - fargate_profile_name: str - name for the Fargate profile
    - fargate_pod_execution_role_arn: str - ARN for Fargate pod execution role
    - fargate_selectors: list - selectors for Fargate profile
    - create_fargate_profile_kwargs: dict - additional Fargate parameters
    - wait_for_completion: bool - wait for cluster creation to complete
    - waiter_delay: int - time between waiter checks
    - waiter_max_attempts: int - maximum waiter attempts
    - deferrable: bool - run operator in deferrable mode
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    str: Cluster ARN
    """
    def __init__(
        self,
        cluster_name: str,
        cluster_role_arn: str,
        resources_vpc_config: dict,
        compute: str = None,
        create_cluster_kwargs: dict = None,
        nodegroup_name: str = None,
        nodegroup_role_arn: str = None,
        create_nodegroup_kwargs: dict = None,
        fargate_profile_name: str = None,
        fargate_pod_execution_role_arn: str = None,
        fargate_selectors: list = None,
        create_fargate_profile_kwargs: dict = None,
        wait_for_completion: bool = True,
        waiter_delay: int = 30,
        waiter_max_attempts: int = 40,
        deferrable: bool = False,
        **kwargs
    ): ...
class EksDeleteClusterOperator(AwsBaseOperator):
    """
    Delete an Amazon EKS cluster and associated compute resources.
    
    Parameters:
    - cluster_name: str - name of the EKS cluster to delete
    - force_delete_compute: bool - force delete attached compute resources
    - wait_for_completion: bool - wait for deletion to complete
    - waiter_delay: int - time between waiter checks
    - waiter_max_attempts: int - maximum waiter attempts
    - deferrable: bool - run operator in deferrable mode
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    bool: True if cluster was deleted successfully
    """
    def __init__(
        self,
        cluster_name: str,
        force_delete_compute: bool = False,
        wait_for_completion: bool = True,
        waiter_delay: int = 30,
        waiter_max_attempts: int = 40,
        deferrable: bool = False,
        **kwargs
    ): ...

Node Group Operations

Manage EKS managed node groups for EC2-based worker nodes.

class EksCreateNodegroupOperator(AwsBaseOperator):
    """
    Create an Amazon EKS managed node group.
    
    Parameters:
    - cluster_name: str - name of the EKS cluster
    - nodegroup_name: str - name for the node group
    - nodegroup_subnets: list - list of subnet IDs for the node group
    - nodegroup_role_arn: str - ARN of the IAM role for worker nodes
    - create_nodegroup_kwargs: dict - additional node group configuration
    - wait_for_completion: bool - wait for node group creation
    - waiter_delay: int - time between waiter checks
    - waiter_max_attempts: int - maximum waiter attempts
    - deferrable: bool - run operator in deferrable mode
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    str: Node group ARN
    """
    def __init__(
        self,
        cluster_name: str,
        nodegroup_name: str,
        nodegroup_subnets: list[str],
        nodegroup_role_arn: str,
        create_nodegroup_kwargs: dict = None,
        wait_for_completion: bool = True,
        waiter_delay: int = 30,
        waiter_max_attempts: int = 80,
        deferrable: bool = False,
        **kwargs
    ): ...
class EksDeleteNodegroupOperator(AwsBaseOperator):
    """
    Delete an Amazon EKS managed node group.
    
    Parameters:
    - cluster_name: str - name of the EKS cluster
    - nodegroup_name: str - name of the node group to delete
    - wait_for_completion: bool - wait for deletion to complete
    - waiter_delay: int - time between waiter checks
    - waiter_max_attempts: int - maximum waiter attempts
    - deferrable: bool - run operator in deferrable mode
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    bool: True if node group was deleted successfully
    """
    def __init__(
        self,
        cluster_name: str,
        nodegroup_name: str,
        wait_for_completion: bool = True,
        waiter_delay: int = 30,
        waiter_max_attempts: int = 60,
        deferrable: bool = False,
        **kwargs
    ): ...

Fargate Profile Management

Configure AWS Fargate profiles for serverless container execution.

class EksCreateFargateProfileOperator(AwsBaseOperator):
    """
    Create an AWS Fargate profile for an EKS cluster.
    
    Parameters:
    - cluster_name: str - name of the EKS cluster
    - fargate_profile_name: str - name for the Fargate profile
    - fargate_pod_execution_role_arn: str - ARN for pod execution role
    - fargate_selectors: list - selectors for Fargate scheduling
    - create_fargate_profile_kwargs: dict - additional Fargate configuration
    - wait_for_completion: bool - wait for profile creation
    - waiter_delay: int - time between waiter checks
    - waiter_max_attempts: int - maximum waiter attempts
    - deferrable: bool - run operator in deferrable mode
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    str: Fargate profile ARN
    """
    def __init__(
        self,
        cluster_name: str,
        fargate_profile_name: str,
        fargate_pod_execution_role_arn: str,
        fargate_selectors: list,
        create_fargate_profile_kwargs: dict = None,
        wait_for_completion: bool = True,
        waiter_delay: int = 30,
        waiter_max_attempts: int = 60,
        deferrable: bool = False,
        **kwargs
    ): ...
class EksDeleteFargateProfileOperator(AwsBaseOperator):
    """
    Delete an AWS Fargate profile from an EKS cluster.
    
    Parameters:
    - cluster_name: str - name of the EKS cluster
    - fargate_profile_name: str - name of the Fargate profile to delete
    - wait_for_completion: bool - wait for deletion to complete
    - waiter_delay: int - time between waiter checks
    - waiter_max_attempts: int - maximum waiter attempts
    - deferrable: bool - run operator in deferrable mode
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    bool: True if Fargate profile was deleted successfully
    """
    def __init__(
        self,
        cluster_name: str,
        fargate_profile_name: str,
        wait_for_completion: bool = True,
        waiter_delay: int = 30,
        waiter_max_attempts: int = 60,
        deferrable: bool = False,
        **kwargs
    ): ...

Kubernetes Pod Execution

Run Kubernetes pods on EKS clusters with comprehensive configuration options.

class EksPodOperator(KubernetesPodOperator):
    """
    Execute a Kubernetes Pod on an Amazon EKS cluster.
    
    Parameters:
    - cluster_name: str - name of the EKS cluster
    - in_cluster: bool - whether running inside the cluster
    - namespace: str - Kubernetes namespace for the pod
    - pod_name: str - name for the pod
    - image: str - container image to run
    - cmds: list - commands to execute in the container
    - arguments: list - arguments for the commands
    - labels: dict - labels to apply to the pod
    - startup_timeout_seconds: int - timeout for pod startup
    - get_logs: bool - whether to retrieve pod logs
    - log_events_on_failure: bool - log events on pod failure
    - is_delete_operator_pod: bool - delete pod after completion
    - on_finish_action: str - action to take when pod finishes
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region: str - AWS region for the EKS cluster
    
    Returns:
    str: Pod execution result
    """
    def __init__(
        self,
        cluster_name: str,
        in_cluster: bool = False,
        namespace: str = 'default',
        pod_name: str = None,
        image: str = None,
        cmds: list = None,
        arguments: list = None,
        labels: dict = None,
        startup_timeout_seconds: int = 600,
        get_logs: bool = True,
        log_events_on_failure: bool = False,
        is_delete_operator_pod: bool = True,
        on_finish_action: str = 'delete_pod',
        aws_conn_id: str = 'aws_default',
        region: str = None,
        **kwargs
    ): ...

EKS Service Hook

Low-level EKS service operations and cluster information retrieval.

class EksHook(AwsBaseHook):
    """
    Hook for Amazon EKS service operations.
    
    Parameters:
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    """
    def __init__(
        self,
        aws_conn_id: str = 'aws_default',
        region_name: str = None,
        **kwargs
    ): ...
    
    def create_cluster(
        self,
        name: str,
        version: str,
        roleArn: str,
        resourcesVpcConfig: dict,
        **kwargs
    ) -> dict:
        """Create an EKS cluster."""
        ...
    
    def delete_cluster(self, name: str) -> dict:
        """Delete an EKS cluster."""
        ...
    
    def describe_cluster(self, name: str) -> dict:
        """Get information about an EKS cluster."""
        ...
    
    def list_clusters(self, maxResults: int = None, nextToken: str = None) -> dict:
        """List EKS clusters."""
        ...
    
    def create_nodegroup(
        self,
        clusterName: str,
        nodegroupName: str,
        subnets: list[str],
        nodeRole: str,
        **kwargs
    ) -> dict:
        """Create a managed node group."""
        ...
    
    def delete_nodegroup(self, clusterName: str, nodegroupName: str) -> dict:
        """Delete a managed node group."""
        ...
    
    def describe_nodegroup(self, clusterName: str, nodegroupName: str) -> dict:
        """Get information about a node group."""
        ...
    
    def create_fargate_profile(
        self,
        clusterName: str,
        fargateProfileName: str,
        podExecutionRoleArn: str,
        selectors: list,
        **kwargs
    ) -> dict:
        """Create a Fargate profile."""
        ...
    
    def delete_fargate_profile(self, clusterName: str, fargateProfileName: str) -> dict:
        """Delete a Fargate profile."""
        ...
    
    def describe_fargate_profile(self, clusterName: str, fargateProfileName: str) -> dict:
        """Get information about a Fargate profile."""
        ...
    
    def generate_config_file(
        self,
        eks_cluster_name: str,
        pod_namespace: str,
        pod_username: str = 'aws'
    ) -> str:
        """Generate kubeconfig file for EKS cluster access."""
        ...

Usage Examples

Basic Cluster Creation

from airflow.providers.amazon.aws.operators.eks import EksCreateClusterOperator

# Create an EKS cluster with node group
create_cluster = EksCreateClusterOperator(
    task_id='create_eks_cluster',
    cluster_name='my-data-cluster',
    cluster_role_arn='arn:aws:iam::123456789012:role/eks-service-role',
    resources_vpc_config={
        'subnetIds': ['subnet-12345', 'subnet-67890'],
        'securityGroupIds': ['sg-abcdef'],
        'endpointConfigType': 'PUBLIC_AND_PRIVATE'
    },
    compute='nodegroup',
    nodegroup_name='worker-nodes',
    nodegroup_role_arn='arn:aws:iam::123456789012:role/NodeInstanceRole',
    create_nodegroup_kwargs={
        'scalingConfig': {
            'minSize': 1,
            'maxSize': 3,
            'desiredSize': 2
        },
        'instanceTypes': ['t3.medium'],
        'diskSize': 20,
        'amiType': 'AL2_x86_64'
    },
    wait_for_completion=True,
    aws_conn_id='aws_default'
)

Fargate Profile Setup

from airflow.providers.amazon.aws.operators.eks import EksCreateFargateProfileOperator

# Create a Fargate profile for serverless execution
create_fargate_profile = EksCreateFargateProfileOperator(
    task_id='create_fargate_profile',
    cluster_name='my-data-cluster',
    fargate_profile_name='batch-processing',
    fargate_pod_execution_role_arn='arn:aws:iam::123456789012:role/eks-fargate-pod-execution-role',
    fargate_selectors=[
        {
            'namespace': 'batch-jobs',
            'labels': {'compute-type': 'fargate'}
        },
        {
            'namespace': 'data-processing'
        }
    ],
    create_fargate_profile_kwargs={
        'subnets': ['subnet-12345', 'subnet-67890'],
        'tags': {'Environment': 'prod', 'Team': 'data-engineering'}
    },
    aws_conn_id='aws_default'
)

Running Pods on EKS

from airflow.providers.amazon.aws.operators.eks import EksPodOperator

# Execute a data processing job on EKS
run_data_job = EksPodOperator(
    task_id='run_spark_job',
    cluster_name='my-data-cluster',
    namespace='data-processing',
    pod_name='spark-data-processor',
    image='my-spark-image:latest',
    cmds=['spark-submit'],
    arguments=[
        '--master', 'k8s://https://my-cluster-endpoint',
        '--deploy-mode', 'cluster',
        's3://my-bucket/spark-app.py'
    ],
    labels={'app': 'spark', 'version': 'v1.0'},
    get_logs=True,
    is_delete_operator_pod=True,
    aws_conn_id='aws_default',
    region='us-west-2'
)

Cluster Lifecycle Management

from airflow.providers.amazon.aws.operators.eks import (
    EksCreateClusterOperator,
    EksDeleteClusterOperator
)

# Complete cluster lifecycle in a DAG
with DAG('eks_cluster_lifecycle', schedule='@daily') as dag:
    
    # Create cluster
    create = EksCreateClusterOperator(
        task_id='create_cluster',
        cluster_name='temp-processing-cluster',
        cluster_role_arn='arn:aws:iam::123456789012:role/eks-service-role',
        resources_vpc_config={
            'subnetIds': ['subnet-12345', 'subnet-67890']
        },
        compute='fargate',
        fargate_profile_name='temp-profile',
        fargate_pod_execution_role_arn='arn:aws:iam::123456789012:role/fargate-execution-role',
        fargate_selectors=[{'namespace': 'default'}]
    )
    
    # Run workload
    process_data = EksPodOperator(
        task_id='process_data',
        cluster_name='temp-processing-cluster',
        image='data-processor:latest',
        cmds=['python', 'process.py']
    )
    
    # Clean up cluster
    cleanup = EksDeleteClusterOperator(
        task_id='delete_cluster',
        cluster_name='temp-processing-cluster',
        force_delete_compute=True
    )
    
    create >> process_data >> cleanup

Import Statements

from airflow.providers.amazon.aws.operators.eks import (
    EksCreateClusterOperator,
    EksDeleteClusterOperator,
    EksCreateNodegroupOperator,
    EksDeleteNodegroupOperator,
    EksCreateFargateProfileOperator,
    EksDeleteFargateProfileOperator,
    EksPodOperator
)
from airflow.providers.amazon.aws.hooks.eks import EksHook

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json