Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
npx @tessl/cli install tessl/pypi-apache-airflow-providers-amazon@9.12.00
# Apache Airflow Providers Amazon
1
2
A comprehensive provider package that enables Apache Airflow to orchestrate and manage Amazon Web Services (AWS) resources through workflows. This package provides hooks, operators, sensors, transfers, and triggers for over 30 AWS services, enabling seamless integration of cloud services within Airflow DAGs. Supports core services including compute (Batch, EKS, ECS, Lambda), storage (S3, EFS), databases (RDS, DynamoDB, Redshift), analytics (Athena, EMR, Glue, SageMaker), messaging (SNS, SQS), and migration (DMS) capabilities.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-amazon
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-amazon`
9
- **Documentation**: https://airflow.apache.org/docs/apache-airflow-providers-amazon/
10
11
## Core Imports
12
13
Authentication and base functionality:
14
15
```python
16
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
17
```
18
19
Common service imports:
20
21
```python
22
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
23
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
24
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
25
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
26
```
27
28
## Basic Usage
29
30
```python
31
from datetime import datetime
32
from airflow import DAG
33
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
34
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator
35
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
36
37
# Define DAG
38
dag = DAG(
39
'aws_example_dag',
40
start_date=datetime(2023, 1, 1),
41
schedule_interval='@daily',
42
catchup=False
43
)
44
45
# Create S3 bucket
46
create_bucket = S3CreateBucketOperator(
47
task_id='create_bucket',
48
bucket_name='my-airflow-bucket',
49
aws_conn_id='aws_default',
50
dag=dag
51
)
52
53
# Wait for a file to appear in S3
54
wait_for_file = S3KeySensor(
55
task_id='wait_for_file',
56
bucket_name='my-airflow-bucket',
57
bucket_key='data/input.csv',
58
aws_conn_id='aws_default',
59
timeout=600,
60
poke_interval=60,
61
dag=dag
62
)
63
64
# Invoke Lambda function
65
invoke_lambda = LambdaInvokeFunctionOperator(
66
task_id='invoke_lambda',
67
function_name='process_data',
68
payload='{"bucket": "my-airflow-bucket", "key": "data/input.csv"}',
69
aws_conn_id='aws_default',
70
dag=dag
71
)
72
73
# Define task dependencies
74
create_bucket >> wait_for_file >> invoke_lambda
75
```
76
77
## Architecture
78
79
The provider package is organized around five core component types that integrate with Airflow's execution model:
80
81
- **Hooks**: Service clients that provide low-level AWS API access and authentication
82
- **Operators**: Task implementations that execute specific AWS operations within DAGs
83
- **Sensors**: Monitoring tasks that wait for AWS resource states or events
84
- **Transfers**: Data movement operations between AWS services and external systems
85
- **Triggers**: Asynchronous operations for efficient resource monitoring
86
87
All components inherit from Airflow base classes and support:
88
- Connection management through Airflow's connection system
89
- Retry logic and error handling
90
- Logging and monitoring integration
91
- Template variable support
92
93
## Capabilities
94
95
### S3 Storage Operations
96
97
Complete S3 bucket and object management including create, delete, copy, transform, and list operations. Provides both basic operations and advanced features like multipart uploads and lifecycle management.
98
99
```python { .api }
100
class S3Hook(AwsBaseHook):
101
def create_bucket(self, bucket_name: str, region_name: str = None) -> bool: ...
102
def delete_bucket(self, bucket_name: str, force_delete: bool = False) -> None: ...
103
def copy_object(self, source_bucket_key: str, dest_bucket_key: str, **kwargs) -> None: ...
104
def get_key(self, key: str, bucket_name: str = None) -> Any: ...
105
def load_file(self, filename: str, key: str, bucket_name: str = None, **kwargs) -> None: ...
106
```
107
108
```python { .api }
109
class S3CreateBucketOperator(BaseOperator):
110
def __init__(self, bucket_name: str, aws_conn_id: str = 'aws_default', **kwargs): ...
111
112
class S3DeleteBucketOperator(BaseOperator):
113
def __init__(self, bucket_name: str, force_delete: bool = False, **kwargs): ...
114
115
class S3KeySensor(BaseSensorOperator):
116
def __init__(self, bucket_name: str, bucket_key: str, **kwargs): ...
117
```
118
119
[S3 Storage](./s3-storage.md)
120
121
### Lambda Function Management
122
123
AWS Lambda function creation, invocation, and management operations. Supports both synchronous and asynchronous function execution with payload handling and response processing.
124
125
```python { .api }
126
class LambdaHook(AwsBaseHook):
127
def invoke_lambda(self, function_name: str, payload: str = None, **kwargs) -> dict: ...
128
def create_lambda(self, function_name: str, runtime: str, role: str, **kwargs) -> dict: ...
129
```
130
131
```python { .api }
132
class LambdaInvokeFunctionOperator(BaseOperator):
133
def __init__(self, function_name: str, payload: str = None, **kwargs): ...
134
```
135
136
[Lambda Functions](./lambda-functions.md)
137
138
### EMR Cluster Management
139
140
Amazon EMR cluster creation, management, and job execution. Supports both traditional EMR clusters and EMR Serverless applications with comprehensive step management.
141
142
```python { .api }
143
class EmrHook(AwsBaseHook):
144
def create_job_flow(self, job_flow_overrides: dict = None, **kwargs) -> str: ...
145
def add_job_flow_steps(self, job_flow_id: str, steps: list, **kwargs) -> list: ...
146
```
147
148
```python { .api }
149
class EmrCreateJobFlowOperator(BaseOperator):
150
def __init__(self, job_flow_overrides: dict = None, **kwargs): ...
151
152
class EmrAddStepsOperator(BaseOperator):
153
def __init__(self, job_flow_id: str, steps: list, **kwargs): ...
154
```
155
156
[EMR Clusters](./emr-clusters.md)
157
158
### Glue Data Processing
159
160
AWS Glue job execution and crawler management for ETL operations. Supports both Glue jobs and Glue DataBrew for data preparation workflows.
161
162
```python { .api }
163
class GlueJobHook(AwsBaseHook):
164
def get_job_state(self, job_name: str, run_id: str) -> str: ...
165
def initialize_job(self, job_name: str) -> dict: ...
166
```
167
168
```python { .api }
169
class GlueJobOperator(BaseOperator):
170
def __init__(self, job_name: str, script_args: dict = None, **kwargs): ...
171
```
172
173
[Glue Processing](./glue-processing.md)
174
175
### RDS Database Operations
176
177
Amazon RDS instance management including creation, deletion, snapshot operations, and state management. Supports both traditional RDS instances and Aurora clusters.
178
179
```python { .api }
180
class RdsHook(AwsBaseHook):
181
def create_db_instance(self, db_instance_identifier: str, **kwargs) -> dict: ...
182
def delete_db_instance(self, db_instance_identifier: str, **kwargs) -> dict: ...
183
```
184
185
```python { .api }
186
class RdsCreateDbInstanceOperator(BaseOperator):
187
def __init__(self, db_instance_identifier: str, **kwargs): ...
188
```
189
190
[RDS Databases](./rds-databases.md)
191
192
### Redshift Data Warehouse
193
194
Amazon Redshift cluster management and SQL execution through both traditional connections and the Redshift Data API. Supports cluster lifecycle management and query execution.
195
196
```python { .api }
197
class RedshiftSqlHook(AwsBaseHook):
198
def run(self, sql: str, autocommit: bool = False, **kwargs) -> Any: ...
199
def get_records(self, sql: str, **kwargs) -> list: ...
200
```
201
202
```python { .api }
203
class RedshiftSqlOperator(BaseOperator):
204
def __init__(self, sql: str, redshift_conn_id: str = 'redshift_default', **kwargs): ...
205
```
206
207
[Redshift Warehouse](./redshift-warehouse.md)
208
209
### SageMaker Machine Learning
210
211
Amazon SageMaker training jobs, model deployment, and endpoint management. Provides comprehensive MLOps integration with support for training, tuning, batch transform, and real-time inference.
212
213
```python { .api }
214
class SageMakerHook(AwsBaseHook):
215
def create_training_job(self, config: dict, **kwargs) -> str: ...
216
def create_model(self, config: dict, **kwargs) -> str: ...
217
```
218
219
```python { .api }
220
class SageMakerTrainingOperator(BaseOperator):
221
def __init__(self, config: dict, **kwargs): ...
222
```
223
224
[SageMaker ML](./sagemaker-ml.md)
225
226
### ECS Container Orchestration
227
228
Amazon ECS task execution and service management. Supports both Fargate and EC2 launch types with comprehensive task definition and execution capabilities.
229
230
```python { .api }
231
class EcsHook(AwsBaseHook):
232
def run_task(self, task_definition: str, cluster: str, **kwargs) -> str: ...
233
def describe_tasks(self, cluster: str, tasks: list, **kwargs) -> dict: ...
234
```
235
236
```python { .api }
237
class EcsRunTaskOperator(BaseOperator):
238
def __init__(self, task_definition: str, cluster: str, **kwargs): ...
239
```
240
241
[ECS Containers](./ecs-containers.md)
242
243
### AWS Batch Processing
244
245
Managed containerized job execution at scale with comprehensive lifecycle management, monitoring, and automatic resource provisioning.
246
247
```python { .api }
248
class BatchOperator(AwsBaseOperator):
249
def __init__(self, job_name: str, job_definition: str, job_queue: str, **kwargs): ...
250
251
class BatchCreateComputeEnvironmentOperator(AwsBaseOperator):
252
def __init__(self, compute_environment_name: str, environment_type: str, **kwargs): ...
253
```
254
255
[Batch Processing](./batch-processing.md)
256
257
### Amazon EKS Kubernetes
258
259
Managed Kubernetes cluster operations with support for node groups, Fargate profiles, and pod execution within Airflow workflows.
260
261
```python { .api }
262
class EksCreateClusterOperator(AwsBaseOperator):
263
def __init__(self, cluster_name: str, cluster_role_arn: str, resources_vpc_config: dict, **kwargs): ...
264
265
class EksPodOperator(KubernetesPodOperator):
266
def __init__(self, cluster_name: str, namespace: str, image: str, **kwargs): ...
267
```
268
269
[EKS Kubernetes](./eks-kubernetes.md)
270
271
### Amazon Athena Analytics
272
273
Serverless SQL query service for data stored in S3, enabling interactive analytics and data processing through standard SQL syntax.
274
275
```python { .api }
276
class AthenaOperator(AwsBaseOperator):
277
def __init__(self, query: str, database: str, output_location: str, **kwargs): ...
278
279
class AthenaSensor(BaseSensorOperator):
280
def __init__(self, query_execution_id: str, **kwargs): ...
281
```
282
283
[Athena Analytics](./athena-analytics.md)
284
285
### Amazon DynamoDB NoSQL
286
287
Managed NoSQL database operations with support for batch data operations, import/export functionality, and seamless S3 integration.
288
289
```python { .api }
290
class DynamoDBHook(AwsBaseHook):
291
def write_batch_data(self, items: Iterable) -> bool: ...
292
293
class S3ToDynamoDBOperator(BaseOperator):
294
def __init__(self, s3_bucket: str, s3_key: str, dynamodb_table: str, **kwargs): ...
295
```
296
297
[DynamoDB NoSQL](./dynamodb-nosql.md)
298
299
### SNS/SQS Messaging
300
301
Comprehensive messaging services for event-driven architectures, enabling pub/sub messaging, queuing, and asynchronous communication patterns.
302
303
```python { .api }
304
class SnsPublishOperator(AwsBaseOperator):
305
def __init__(self, target_arn: str, message: str, subject: str, **kwargs): ...
306
307
class SqsPublishOperator(AwsBaseOperator):
308
def __init__(self, sqs_queue: str, message_content: str, **kwargs): ...
309
```
310
311
[Messaging SNS/SQS](./messaging-sns-sqs.md)
312
313
### AWS Database Migration Service
314
315
Database migration and replication capabilities for seamless data transfer between different database engines and continuous replication.
316
317
```python { .api }
318
class DmsCreateTaskOperator(AwsBaseOperator):
319
def __init__(self, replication_task_id: str, source_endpoint_arn: str, target_endpoint_arn: str, **kwargs): ...
320
321
class DmsTaskCompletedSensor(BaseSensorOperator):
322
def __init__(self, replication_task_arn: str, **kwargs): ...
323
```
324
325
[DMS Migration](./dms-migration.md)
326
327
### Data Transfer Operations
328
329
Comprehensive data movement capabilities between AWS services and external systems. Supports transfers between S3, Redshift, databases, FTP/SFTP, and other data sources.
330
331
```python { .api }
332
class S3ToRedshiftOperator(BaseOperator):
333
def __init__(self, schema: str, table: str, s3_bucket: str, s3_key: str, **kwargs): ...
334
335
class RedshiftToS3Operator(BaseOperator):
336
def __init__(self, s3_bucket: str, s3_key: str, schema: str, table: str, **kwargs): ...
337
```
338
339
[Data Transfers](./data-transfers.md)
340
341
### Authentication and Connection Management
342
343
Centralized AWS authentication and connection management providing secure, configurable access to AWS services with support for multiple authentication methods.
344
345
```python { .api }
346
class AwsBaseHook(BaseHook):
347
def __init__(self, aws_conn_id: str = 'aws_default', **kwargs): ...
348
def get_credentials(self, region_name: str = None) -> dict: ...
349
def get_session(self, region_name: str = None) -> Any: ...
350
```
351
352
[Authentication](./authentication.md)
353
354
## Common Types
355
356
```python { .api }
357
# Connection configuration
358
class AwsConnectionConfig:
359
aws_access_key_id: str
360
aws_secret_access_key: str
361
region_name: str
362
session_token: str = None
363
role_arn: str = None
364
365
# Common AWS resource identifiers
366
ResourceArn = str
367
ClusterId = str
368
JobId = str
369
InstanceId = str
370
BucketName = str
371
KeyName = str
372
373
# Task execution states
374
class TaskExecutionState:
375
PENDING = "PENDING"
376
RUNNING = "RUNNING"
377
SUCCESS = "SUCCESS"
378
FAILED = "FAILED"
379
```