Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Amazon EKS enables managed Kubernetes cluster operations with comprehensive support for cluster lifecycle management, node group operations, Fargate profiles, and Kubernetes workload execution through Airflow.
Create, configure, and manage Amazon EKS clusters with full lifecycle support.
class EksCreateClusterOperator(AwsBaseOperator):
"""
Create an Amazon EKS cluster.
Parameters:
- cluster_name: str - unique name for the EKS cluster
- cluster_role_arn: str - ARN of the IAM role for the EKS service
- resources_vpc_config: dict - VPC configuration for the cluster
- compute: str - compute type ('nodegroup' or 'fargate')
- create_cluster_kwargs: dict - additional cluster creation parameters
- nodegroup_name: str - name for the managed node group
- nodegroup_role_arn: str - ARN of the IAM role for worker nodes
- create_nodegroup_kwargs: dict - additional node group parameters
- fargate_profile_name: str - name for the Fargate profile
- fargate_pod_execution_role_arn: str - ARN for Fargate pod execution role
- fargate_selectors: list - selectors for Fargate profile
- create_fargate_profile_kwargs: dict - additional Fargate parameters
- wait_for_completion: bool - wait for cluster creation to complete
- waiter_delay: int - time between waiter checks
- waiter_max_attempts: int - maximum waiter attempts
- deferrable: bool - run operator in deferrable mode
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
str: Cluster ARN
"""
def __init__(
self,
cluster_name: str,
cluster_role_arn: str,
resources_vpc_config: dict,
compute: str = None,
create_cluster_kwargs: dict = None,
nodegroup_name: str = None,
nodegroup_role_arn: str = None,
create_nodegroup_kwargs: dict = None,
fargate_profile_name: str = None,
fargate_pod_execution_role_arn: str = None,
fargate_selectors: list = None,
create_fargate_profile_kwargs: dict = None,
wait_for_completion: bool = True,
waiter_delay: int = 30,
waiter_max_attempts: int = 40,
deferrable: bool = False,
**kwargs
): ...class EksDeleteClusterOperator(AwsBaseOperator):
"""
Delete an Amazon EKS cluster and associated compute resources.
Parameters:
- cluster_name: str - name of the EKS cluster to delete
- force_delete_compute: bool - force delete attached compute resources
- wait_for_completion: bool - wait for deletion to complete
- waiter_delay: int - time between waiter checks
- waiter_max_attempts: int - maximum waiter attempts
- deferrable: bool - run operator in deferrable mode
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
bool: True if cluster was deleted successfully
"""
def __init__(
self,
cluster_name: str,
force_delete_compute: bool = False,
wait_for_completion: bool = True,
waiter_delay: int = 30,
waiter_max_attempts: int = 40,
deferrable: bool = False,
**kwargs
): ...Manage EKS managed node groups for EC2-based worker nodes.
class EksCreateNodegroupOperator(AwsBaseOperator):
"""
Create an Amazon EKS managed node group.
Parameters:
- cluster_name: str - name of the EKS cluster
- nodegroup_name: str - name for the node group
- nodegroup_subnets: list - list of subnet IDs for the node group
- nodegroup_role_arn: str - ARN of the IAM role for worker nodes
- create_nodegroup_kwargs: dict - additional node group configuration
- wait_for_completion: bool - wait for node group creation
- waiter_delay: int - time between waiter checks
- waiter_max_attempts: int - maximum waiter attempts
- deferrable: bool - run operator in deferrable mode
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
str: Node group ARN
"""
def __init__(
self,
cluster_name: str,
nodegroup_name: str,
nodegroup_subnets: list[str],
nodegroup_role_arn: str,
create_nodegroup_kwargs: dict = None,
wait_for_completion: bool = True,
waiter_delay: int = 30,
waiter_max_attempts: int = 80,
deferrable: bool = False,
**kwargs
): ...class EksDeleteNodegroupOperator(AwsBaseOperator):
"""
Delete an Amazon EKS managed node group.
Parameters:
- cluster_name: str - name of the EKS cluster
- nodegroup_name: str - name of the node group to delete
- wait_for_completion: bool - wait for deletion to complete
- waiter_delay: int - time between waiter checks
- waiter_max_attempts: int - maximum waiter attempts
- deferrable: bool - run operator in deferrable mode
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
bool: True if node group was deleted successfully
"""
def __init__(
self,
cluster_name: str,
nodegroup_name: str,
wait_for_completion: bool = True,
waiter_delay: int = 30,
waiter_max_attempts: int = 60,
deferrable: bool = False,
**kwargs
): ...Configure AWS Fargate profiles for serverless container execution.
class EksCreateFargateProfileOperator(AwsBaseOperator):
"""
Create an AWS Fargate profile for an EKS cluster.
Parameters:
- cluster_name: str - name of the EKS cluster
- fargate_profile_name: str - name for the Fargate profile
- fargate_pod_execution_role_arn: str - ARN for pod execution role
- fargate_selectors: list - selectors for Fargate scheduling
- create_fargate_profile_kwargs: dict - additional Fargate configuration
- wait_for_completion: bool - wait for profile creation
- waiter_delay: int - time between waiter checks
- waiter_max_attempts: int - maximum waiter attempts
- deferrable: bool - run operator in deferrable mode
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
str: Fargate profile ARN
"""
def __init__(
self,
cluster_name: str,
fargate_profile_name: str,
fargate_pod_execution_role_arn: str,
fargate_selectors: list,
create_fargate_profile_kwargs: dict = None,
wait_for_completion: bool = True,
waiter_delay: int = 30,
waiter_max_attempts: int = 60,
deferrable: bool = False,
**kwargs
): ...class EksDeleteFargateProfileOperator(AwsBaseOperator):
"""
Delete an AWS Fargate profile from an EKS cluster.
Parameters:
- cluster_name: str - name of the EKS cluster
- fargate_profile_name: str - name of the Fargate profile to delete
- wait_for_completion: bool - wait for deletion to complete
- waiter_delay: int - time between waiter checks
- waiter_max_attempts: int - maximum waiter attempts
- deferrable: bool - run operator in deferrable mode
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
bool: True if Fargate profile was deleted successfully
"""
def __init__(
self,
cluster_name: str,
fargate_profile_name: str,
wait_for_completion: bool = True,
waiter_delay: int = 30,
waiter_max_attempts: int = 60,
deferrable: bool = False,
**kwargs
): ...Run Kubernetes pods on EKS clusters with comprehensive configuration options.
class EksPodOperator(KubernetesPodOperator):
"""
Execute a Kubernetes Pod on an Amazon EKS cluster.
Parameters:
- cluster_name: str - name of the EKS cluster
- in_cluster: bool - whether running inside the cluster
- namespace: str - Kubernetes namespace for the pod
- pod_name: str - name for the pod
- image: str - container image to run
- cmds: list - commands to execute in the container
- arguments: list - arguments for the commands
- labels: dict - labels to apply to the pod
- startup_timeout_seconds: int - timeout for pod startup
- get_logs: bool - whether to retrieve pod logs
- log_events_on_failure: bool - log events on pod failure
- is_delete_operator_pod: bool - delete pod after completion
- on_finish_action: str - action to take when pod finishes
- aws_conn_id: str - Airflow connection for AWS credentials
- region: str - AWS region for the EKS cluster
Returns:
str: Pod execution result
"""
def __init__(
self,
cluster_name: str,
in_cluster: bool = False,
namespace: str = 'default',
pod_name: str = None,
image: str = None,
cmds: list = None,
arguments: list = None,
labels: dict = None,
startup_timeout_seconds: int = 600,
get_logs: bool = True,
log_events_on_failure: bool = False,
is_delete_operator_pod: bool = True,
on_finish_action: str = 'delete_pod',
aws_conn_id: str = 'aws_default',
region: str = None,
**kwargs
): ...Low-level EKS service operations and cluster information retrieval.
class EksHook(AwsBaseHook):
"""
Hook for Amazon EKS service operations.
Parameters:
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
"""
def __init__(
self,
aws_conn_id: str = 'aws_default',
region_name: str = None,
**kwargs
): ...
def create_cluster(
self,
name: str,
version: str,
roleArn: str,
resourcesVpcConfig: dict,
**kwargs
) -> dict:
"""Create an EKS cluster."""
...
def delete_cluster(self, name: str) -> dict:
"""Delete an EKS cluster."""
...
def describe_cluster(self, name: str) -> dict:
"""Get information about an EKS cluster."""
...
def list_clusters(self, maxResults: int = None, nextToken: str = None) -> dict:
"""List EKS clusters."""
...
def create_nodegroup(
self,
clusterName: str,
nodegroupName: str,
subnets: list[str],
nodeRole: str,
**kwargs
) -> dict:
"""Create a managed node group."""
...
def delete_nodegroup(self, clusterName: str, nodegroupName: str) -> dict:
"""Delete a managed node group."""
...
def describe_nodegroup(self, clusterName: str, nodegroupName: str) -> dict:
"""Get information about a node group."""
...
def create_fargate_profile(
self,
clusterName: str,
fargateProfileName: str,
podExecutionRoleArn: str,
selectors: list,
**kwargs
) -> dict:
"""Create a Fargate profile."""
...
def delete_fargate_profile(self, clusterName: str, fargateProfileName: str) -> dict:
"""Delete a Fargate profile."""
...
def describe_fargate_profile(self, clusterName: str, fargateProfileName: str) -> dict:
"""Get information about a Fargate profile."""
...
def generate_config_file(
self,
eks_cluster_name: str,
pod_namespace: str,
pod_username: str = 'aws'
) -> str:
"""Generate kubeconfig file for EKS cluster access."""
...from airflow.providers.amazon.aws.operators.eks import EksCreateClusterOperator
# Create an EKS cluster with node group
create_cluster = EksCreateClusterOperator(
task_id='create_eks_cluster',
cluster_name='my-data-cluster',
cluster_role_arn='arn:aws:iam::123456789012:role/eks-service-role',
resources_vpc_config={
'subnetIds': ['subnet-12345', 'subnet-67890'],
'securityGroupIds': ['sg-abcdef'],
'endpointConfigType': 'PUBLIC_AND_PRIVATE'
},
compute='nodegroup',
nodegroup_name='worker-nodes',
nodegroup_role_arn='arn:aws:iam::123456789012:role/NodeInstanceRole',
create_nodegroup_kwargs={
'scalingConfig': {
'minSize': 1,
'maxSize': 3,
'desiredSize': 2
},
'instanceTypes': ['t3.medium'],
'diskSize': 20,
'amiType': 'AL2_x86_64'
},
wait_for_completion=True,
aws_conn_id='aws_default'
)from airflow.providers.amazon.aws.operators.eks import EksCreateFargateProfileOperator
# Create a Fargate profile for serverless execution
create_fargate_profile = EksCreateFargateProfileOperator(
task_id='create_fargate_profile',
cluster_name='my-data-cluster',
fargate_profile_name='batch-processing',
fargate_pod_execution_role_arn='arn:aws:iam::123456789012:role/eks-fargate-pod-execution-role',
fargate_selectors=[
{
'namespace': 'batch-jobs',
'labels': {'compute-type': 'fargate'}
},
{
'namespace': 'data-processing'
}
],
create_fargate_profile_kwargs={
'subnets': ['subnet-12345', 'subnet-67890'],
'tags': {'Environment': 'prod', 'Team': 'data-engineering'}
},
aws_conn_id='aws_default'
)from airflow.providers.amazon.aws.operators.eks import EksPodOperator
# Execute a data processing job on EKS
run_data_job = EksPodOperator(
task_id='run_spark_job',
cluster_name='my-data-cluster',
namespace='data-processing',
pod_name='spark-data-processor',
image='my-spark-image:latest',
cmds=['spark-submit'],
arguments=[
'--master', 'k8s://https://my-cluster-endpoint',
'--deploy-mode', 'cluster',
's3://my-bucket/spark-app.py'
],
labels={'app': 'spark', 'version': 'v1.0'},
get_logs=True,
is_delete_operator_pod=True,
aws_conn_id='aws_default',
region='us-west-2'
)from airflow.providers.amazon.aws.operators.eks import (
EksCreateClusterOperator,
EksDeleteClusterOperator
)
# Complete cluster lifecycle in a DAG
with DAG('eks_cluster_lifecycle', schedule='@daily') as dag:
# Create cluster
create = EksCreateClusterOperator(
task_id='create_cluster',
cluster_name='temp-processing-cluster',
cluster_role_arn='arn:aws:iam::123456789012:role/eks-service-role',
resources_vpc_config={
'subnetIds': ['subnet-12345', 'subnet-67890']
},
compute='fargate',
fargate_profile_name='temp-profile',
fargate_pod_execution_role_arn='arn:aws:iam::123456789012:role/fargate-execution-role',
fargate_selectors=[{'namespace': 'default'}]
)
# Run workload
process_data = EksPodOperator(
task_id='process_data',
cluster_name='temp-processing-cluster',
image='data-processor:latest',
cmds=['python', 'process.py']
)
# Clean up cluster
cleanup = EksDeleteClusterOperator(
task_id='delete_cluster',
cluster_name='temp-processing-cluster',
force_delete_compute=True
)
create >> process_data >> cleanupfrom airflow.providers.amazon.aws.operators.eks import (
EksCreateClusterOperator,
EksDeleteClusterOperator,
EksCreateNodegroupOperator,
EksDeleteNodegroupOperator,
EksCreateFargateProfileOperator,
EksDeleteFargateProfileOperator,
EksPodOperator
)
from airflow.providers.amazon.aws.hooks.eks import EksHookInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon