0
# Amazon EKS (Elastic Kubernetes Service)
1
2
Amazon EKS enables managed Kubernetes cluster operations with comprehensive support for cluster lifecycle management, node group operations, Fargate profiles, and Kubernetes workload execution through Airflow.
3
4
## Capabilities
5
6
### Cluster Management
7
8
Create, configure, and manage Amazon EKS clusters with full lifecycle support.
9
10
```python { .api }
11
class EksCreateClusterOperator(AwsBaseOperator):
12
"""
13
Create an Amazon EKS cluster.
14
15
Parameters:
16
- cluster_name: str - unique name for the EKS cluster
17
- cluster_role_arn: str - ARN of the IAM role for the EKS service
18
- resources_vpc_config: dict - VPC configuration for the cluster
19
- compute: str - compute type ('nodegroup' or 'fargate')
20
- create_cluster_kwargs: dict - additional cluster creation parameters
21
- nodegroup_name: str - name for the managed node group
22
- nodegroup_role_arn: str - ARN of the IAM role for worker nodes
23
- create_nodegroup_kwargs: dict - additional node group parameters
24
- fargate_profile_name: str - name for the Fargate profile
25
- fargate_pod_execution_role_arn: str - ARN for Fargate pod execution role
26
- fargate_selectors: list - selectors for Fargate profile
27
- create_fargate_profile_kwargs: dict - additional Fargate parameters
28
- wait_for_completion: bool - wait for cluster creation to complete
29
- waiter_delay: int - time between waiter checks
30
- waiter_max_attempts: int - maximum waiter attempts
31
- deferrable: bool - run operator in deferrable mode
32
- aws_conn_id: str - Airflow connection for AWS credentials
33
34
Returns:
35
str: Cluster ARN
36
"""
37
def __init__(
38
self,
39
cluster_name: str,
40
cluster_role_arn: str,
41
resources_vpc_config: dict,
42
compute: str = None,
43
create_cluster_kwargs: dict = None,
44
nodegroup_name: str = None,
45
nodegroup_role_arn: str = None,
46
create_nodegroup_kwargs: dict = None,
47
fargate_profile_name: str = None,
48
fargate_pod_execution_role_arn: str = None,
49
fargate_selectors: list = None,
50
create_fargate_profile_kwargs: dict = None,
51
wait_for_completion: bool = True,
52
waiter_delay: int = 30,
53
waiter_max_attempts: int = 40,
54
deferrable: bool = False,
55
**kwargs
56
): ...
57
```
58
59
```python { .api }
60
class EksDeleteClusterOperator(AwsBaseOperator):
61
"""
62
Delete an Amazon EKS cluster and associated compute resources.
63
64
Parameters:
65
- cluster_name: str - name of the EKS cluster to delete
66
- force_delete_compute: bool - force delete attached compute resources
67
- wait_for_completion: bool - wait for deletion to complete
68
- waiter_delay: int - time between waiter checks
69
- waiter_max_attempts: int - maximum waiter attempts
70
- deferrable: bool - run operator in deferrable mode
71
- aws_conn_id: str - Airflow connection for AWS credentials
72
73
Returns:
74
bool: True if cluster was deleted successfully
75
"""
76
def __init__(
77
self,
78
cluster_name: str,
79
force_delete_compute: bool = False,
80
wait_for_completion: bool = True,
81
waiter_delay: int = 30,
82
waiter_max_attempts: int = 40,
83
deferrable: bool = False,
84
**kwargs
85
): ...
86
```
87
88
### Node Group Operations
89
90
Manage EKS managed node groups for EC2-based worker nodes.
91
92
```python { .api }
93
class EksCreateNodegroupOperator(AwsBaseOperator):
94
"""
95
Create an Amazon EKS managed node group.
96
97
Parameters:
98
- cluster_name: str - name of the EKS cluster
99
- nodegroup_name: str - name for the node group
100
- nodegroup_subnets: list - list of subnet IDs for the node group
101
- nodegroup_role_arn: str - ARN of the IAM role for worker nodes
102
- create_nodegroup_kwargs: dict - additional node group configuration
103
- wait_for_completion: bool - wait for node group creation
104
- waiter_delay: int - time between waiter checks
105
- waiter_max_attempts: int - maximum waiter attempts
106
- deferrable: bool - run operator in deferrable mode
107
- aws_conn_id: str - Airflow connection for AWS credentials
108
109
Returns:
110
str: Node group ARN
111
"""
112
def __init__(
113
self,
114
cluster_name: str,
115
nodegroup_name: str,
116
nodegroup_subnets: list[str],
117
nodegroup_role_arn: str,
118
create_nodegroup_kwargs: dict = None,
119
wait_for_completion: bool = True,
120
waiter_delay: int = 30,
121
waiter_max_attempts: int = 80,
122
deferrable: bool = False,
123
**kwargs
124
): ...
125
```
126
127
```python { .api }
128
class EksDeleteNodegroupOperator(AwsBaseOperator):
129
"""
130
Delete an Amazon EKS managed node group.
131
132
Parameters:
133
- cluster_name: str - name of the EKS cluster
134
- nodegroup_name: str - name of the node group to delete
135
- wait_for_completion: bool - wait for deletion to complete
136
- waiter_delay: int - time between waiter checks
137
- waiter_max_attempts: int - maximum waiter attempts
138
- deferrable: bool - run operator in deferrable mode
139
- aws_conn_id: str - Airflow connection for AWS credentials
140
141
Returns:
142
bool: True if node group was deleted successfully
143
"""
144
def __init__(
145
self,
146
cluster_name: str,
147
nodegroup_name: str,
148
wait_for_completion: bool = True,
149
waiter_delay: int = 30,
150
waiter_max_attempts: int = 60,
151
deferrable: bool = False,
152
**kwargs
153
): ...
154
```
155
156
### Fargate Profile Management
157
158
Configure AWS Fargate profiles for serverless container execution.
159
160
```python { .api }
161
class EksCreateFargateProfileOperator(AwsBaseOperator):
162
"""
163
Create an AWS Fargate profile for an EKS cluster.
164
165
Parameters:
166
- cluster_name: str - name of the EKS cluster
167
- fargate_profile_name: str - name for the Fargate profile
168
- fargate_pod_execution_role_arn: str - ARN for pod execution role
169
- fargate_selectors: list - selectors for Fargate scheduling
170
- create_fargate_profile_kwargs: dict - additional Fargate configuration
171
- wait_for_completion: bool - wait for profile creation
172
- waiter_delay: int - time between waiter checks
173
- waiter_max_attempts: int - maximum waiter attempts
174
- deferrable: bool - run operator in deferrable mode
175
- aws_conn_id: str - Airflow connection for AWS credentials
176
177
Returns:
178
str: Fargate profile ARN
179
"""
180
def __init__(
181
self,
182
cluster_name: str,
183
fargate_profile_name: str,
184
fargate_pod_execution_role_arn: str,
185
fargate_selectors: list,
186
create_fargate_profile_kwargs: dict = None,
187
wait_for_completion: bool = True,
188
waiter_delay: int = 30,
189
waiter_max_attempts: int = 60,
190
deferrable: bool = False,
191
**kwargs
192
): ...
193
```
194
195
```python { .api }
196
class EksDeleteFargateProfileOperator(AwsBaseOperator):
197
"""
198
Delete an AWS Fargate profile from an EKS cluster.
199
200
Parameters:
201
- cluster_name: str - name of the EKS cluster
202
- fargate_profile_name: str - name of the Fargate profile to delete
203
- wait_for_completion: bool - wait for deletion to complete
204
- waiter_delay: int - time between waiter checks
205
- waiter_max_attempts: int - maximum waiter attempts
206
- deferrable: bool - run operator in deferrable mode
207
- aws_conn_id: str - Airflow connection for AWS credentials
208
209
Returns:
210
bool: True if Fargate profile was deleted successfully
211
"""
212
def __init__(
213
self,
214
cluster_name: str,
215
fargate_profile_name: str,
216
wait_for_completion: bool = True,
217
waiter_delay: int = 30,
218
waiter_max_attempts: int = 60,
219
deferrable: bool = False,
220
**kwargs
221
): ...
222
```
223
224
### Kubernetes Pod Execution
225
226
Run Kubernetes pods on EKS clusters with comprehensive configuration options.
227
228
```python { .api }
229
class EksPodOperator(KubernetesPodOperator):
230
"""
231
Execute a Kubernetes Pod on an Amazon EKS cluster.
232
233
Parameters:
234
- cluster_name: str - name of the EKS cluster
235
- in_cluster: bool - whether running inside the cluster
236
- namespace: str - Kubernetes namespace for the pod
237
- pod_name: str - name for the pod
238
- image: str - container image to run
239
- cmds: list - commands to execute in the container
240
- arguments: list - arguments for the commands
241
- labels: dict - labels to apply to the pod
242
- startup_timeout_seconds: int - timeout for pod startup
243
- get_logs: bool - whether to retrieve pod logs
244
- log_events_on_failure: bool - log events on pod failure
245
- is_delete_operator_pod: bool - delete pod after completion
246
- on_finish_action: str - action to take when pod finishes
247
- aws_conn_id: str - Airflow connection for AWS credentials
248
- region: str - AWS region for the EKS cluster
249
250
Returns:
251
str: Pod execution result
252
"""
253
def __init__(
254
self,
255
cluster_name: str,
256
in_cluster: bool = False,
257
namespace: str = 'default',
258
pod_name: str = None,
259
image: str = None,
260
cmds: list = None,
261
arguments: list = None,
262
labels: dict = None,
263
startup_timeout_seconds: int = 600,
264
get_logs: bool = True,
265
log_events_on_failure: bool = False,
266
is_delete_operator_pod: bool = True,
267
on_finish_action: str = 'delete_pod',
268
aws_conn_id: str = 'aws_default',
269
region: str = None,
270
**kwargs
271
): ...
272
```
273
274
### EKS Service Hook
275
276
Low-level EKS service operations and cluster information retrieval.
277
278
```python { .api }
279
class EksHook(AwsBaseHook):
280
"""
281
Hook for Amazon EKS service operations.
282
283
Parameters:
284
- aws_conn_id: str - Airflow connection for AWS credentials
285
- region_name: str - AWS region name
286
"""
287
def __init__(
288
self,
289
aws_conn_id: str = 'aws_default',
290
region_name: str = None,
291
**kwargs
292
): ...
293
294
def create_cluster(
295
self,
296
name: str,
297
version: str,
298
roleArn: str,
299
resourcesVpcConfig: dict,
300
**kwargs
301
) -> dict:
302
"""Create an EKS cluster."""
303
...
304
305
def delete_cluster(self, name: str) -> dict:
306
"""Delete an EKS cluster."""
307
...
308
309
def describe_cluster(self, name: str) -> dict:
310
"""Get information about an EKS cluster."""
311
...
312
313
def list_clusters(self, maxResults: int = None, nextToken: str = None) -> dict:
314
"""List EKS clusters."""
315
...
316
317
def create_nodegroup(
318
self,
319
clusterName: str,
320
nodegroupName: str,
321
subnets: list[str],
322
nodeRole: str,
323
**kwargs
324
) -> dict:
325
"""Create a managed node group."""
326
...
327
328
def delete_nodegroup(self, clusterName: str, nodegroupName: str) -> dict:
329
"""Delete a managed node group."""
330
...
331
332
def describe_nodegroup(self, clusterName: str, nodegroupName: str) -> dict:
333
"""Get information about a node group."""
334
...
335
336
def create_fargate_profile(
337
self,
338
clusterName: str,
339
fargateProfileName: str,
340
podExecutionRoleArn: str,
341
selectors: list,
342
**kwargs
343
) -> dict:
344
"""Create a Fargate profile."""
345
...
346
347
def delete_fargate_profile(self, clusterName: str, fargateProfileName: str) -> dict:
348
"""Delete a Fargate profile."""
349
...
350
351
def describe_fargate_profile(self, clusterName: str, fargateProfileName: str) -> dict:
352
"""Get information about a Fargate profile."""
353
...
354
355
def generate_config_file(
356
self,
357
eks_cluster_name: str,
358
pod_namespace: str,
359
pod_username: str = 'aws'
360
) -> str:
361
"""Generate kubeconfig file for EKS cluster access."""
362
...
363
```
364
365
## Usage Examples
366
367
### Basic Cluster Creation
368
369
```python
370
from airflow.providers.amazon.aws.operators.eks import EksCreateClusterOperator
371
372
# Create an EKS cluster with node group
373
create_cluster = EksCreateClusterOperator(
374
task_id='create_eks_cluster',
375
cluster_name='my-data-cluster',
376
cluster_role_arn='arn:aws:iam::123456789012:role/eks-service-role',
377
resources_vpc_config={
378
'subnetIds': ['subnet-12345', 'subnet-67890'],
379
'securityGroupIds': ['sg-abcdef'],
380
'endpointConfigType': 'PUBLIC_AND_PRIVATE'
381
},
382
compute='nodegroup',
383
nodegroup_name='worker-nodes',
384
nodegroup_role_arn='arn:aws:iam::123456789012:role/NodeInstanceRole',
385
create_nodegroup_kwargs={
386
'scalingConfig': {
387
'minSize': 1,
388
'maxSize': 3,
389
'desiredSize': 2
390
},
391
'instanceTypes': ['t3.medium'],
392
'diskSize': 20,
393
'amiType': 'AL2_x86_64'
394
},
395
wait_for_completion=True,
396
aws_conn_id='aws_default'
397
)
398
```
399
400
### Fargate Profile Setup
401
402
```python
403
from airflow.providers.amazon.aws.operators.eks import EksCreateFargateProfileOperator
404
405
# Create a Fargate profile for serverless execution
406
create_fargate_profile = EksCreateFargateProfileOperator(
407
task_id='create_fargate_profile',
408
cluster_name='my-data-cluster',
409
fargate_profile_name='batch-processing',
410
fargate_pod_execution_role_arn='arn:aws:iam::123456789012:role/eks-fargate-pod-execution-role',
411
fargate_selectors=[
412
{
413
'namespace': 'batch-jobs',
414
'labels': {'compute-type': 'fargate'}
415
},
416
{
417
'namespace': 'data-processing'
418
}
419
],
420
create_fargate_profile_kwargs={
421
'subnets': ['subnet-12345', 'subnet-67890'],
422
'tags': {'Environment': 'prod', 'Team': 'data-engineering'}
423
},
424
aws_conn_id='aws_default'
425
)
426
```
427
428
### Running Pods on EKS
429
430
```python
431
from airflow.providers.amazon.aws.operators.eks import EksPodOperator
432
433
# Execute a data processing job on EKS
434
run_data_job = EksPodOperator(
435
task_id='run_spark_job',
436
cluster_name='my-data-cluster',
437
namespace='data-processing',
438
pod_name='spark-data-processor',
439
image='my-spark-image:latest',
440
cmds=['spark-submit'],
441
arguments=[
442
'--master', 'k8s://https://my-cluster-endpoint',
443
'--deploy-mode', 'cluster',
444
's3://my-bucket/spark-app.py'
445
],
446
labels={'app': 'spark', 'version': 'v1.0'},
447
get_logs=True,
448
is_delete_operator_pod=True,
449
aws_conn_id='aws_default',
450
region='us-west-2'
451
)
452
```
453
454
### Cluster Lifecycle Management
455
456
```python
457
from airflow.providers.amazon.aws.operators.eks import (
458
EksCreateClusterOperator,
459
EksDeleteClusterOperator
460
)
461
462
# Complete cluster lifecycle in a DAG
463
with DAG('eks_cluster_lifecycle', schedule='@daily') as dag:
464
465
# Create cluster
466
create = EksCreateClusterOperator(
467
task_id='create_cluster',
468
cluster_name='temp-processing-cluster',
469
cluster_role_arn='arn:aws:iam::123456789012:role/eks-service-role',
470
resources_vpc_config={
471
'subnetIds': ['subnet-12345', 'subnet-67890']
472
},
473
compute='fargate',
474
fargate_profile_name='temp-profile',
475
fargate_pod_execution_role_arn='arn:aws:iam::123456789012:role/fargate-execution-role',
476
fargate_selectors=[{'namespace': 'default'}]
477
)
478
479
# Run workload
480
process_data = EksPodOperator(
481
task_id='process_data',
482
cluster_name='temp-processing-cluster',
483
image='data-processor:latest',
484
cmds=['python', 'process.py']
485
)
486
487
# Clean up cluster
488
cleanup = EksDeleteClusterOperator(
489
task_id='delete_cluster',
490
cluster_name='temp-processing-cluster',
491
force_delete_compute=True
492
)
493
494
create >> process_data >> cleanup
495
```
496
497
## Import Statements
498
499
```python
500
from airflow.providers.amazon.aws.operators.eks import (
501
EksCreateClusterOperator,
502
EksDeleteClusterOperator,
503
EksCreateNodegroupOperator,
504
EksDeleteNodegroupOperator,
505
EksCreateFargateProfileOperator,
506
EksDeleteFargateProfileOperator,
507
EksPodOperator
508
)
509
from airflow.providers.amazon.aws.hooks.eks import EksHook
510
```