Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
npx @tessl/cli install tessl/pypi-apache-airflow-providers-amazon@9.12.0A comprehensive provider package that enables Apache Airflow to orchestrate and manage Amazon Web Services (AWS) resources through workflows. This package provides hooks, operators, sensors, transfers, and triggers for over 30 AWS services, enabling seamless integration of cloud services within Airflow DAGs. Supports core services including compute (Batch, EKS, ECS, Lambda), storage (S3, EFS), databases (RDS, DynamoDB, Redshift), analytics (Athena, EMR, Glue, SageMaker), messaging (SNS, SQS), and migration (DMS) capabilities.
pip install apache-airflow-providers-amazonAuthentication and base functionality:
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHookCommon service imports:
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperatorfrom datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# Define DAG
dag = DAG(
'aws_example_dag',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False
)
# Create S3 bucket
create_bucket = S3CreateBucketOperator(
task_id='create_bucket',
bucket_name='my-airflow-bucket',
aws_conn_id='aws_default',
dag=dag
)
# Wait for a file to appear in S3
wait_for_file = S3KeySensor(
task_id='wait_for_file',
bucket_name='my-airflow-bucket',
bucket_key='data/input.csv',
aws_conn_id='aws_default',
timeout=600,
poke_interval=60,
dag=dag
)
# Invoke Lambda function
invoke_lambda = LambdaInvokeFunctionOperator(
task_id='invoke_lambda',
function_name='process_data',
payload='{"bucket": "my-airflow-bucket", "key": "data/input.csv"}',
aws_conn_id='aws_default',
dag=dag
)
# Define task dependencies
create_bucket >> wait_for_file >> invoke_lambdaThe provider package is organized around five core component types that integrate with Airflow's execution model:
All components inherit from Airflow base classes and support:
Complete S3 bucket and object management including create, delete, copy, transform, and list operations. Provides both basic operations and advanced features like multipart uploads and lifecycle management.
class S3Hook(AwsBaseHook):
def create_bucket(self, bucket_name: str, region_name: str = None) -> bool: ...
def delete_bucket(self, bucket_name: str, force_delete: bool = False) -> None: ...
def copy_object(self, source_bucket_key: str, dest_bucket_key: str, **kwargs) -> None: ...
def get_key(self, key: str, bucket_name: str = None) -> Any: ...
def load_file(self, filename: str, key: str, bucket_name: str = None, **kwargs) -> None: ...class S3CreateBucketOperator(BaseOperator):
def __init__(self, bucket_name: str, aws_conn_id: str = 'aws_default', **kwargs): ...
class S3DeleteBucketOperator(BaseOperator):
def __init__(self, bucket_name: str, force_delete: bool = False, **kwargs): ...
class S3KeySensor(BaseSensorOperator):
def __init__(self, bucket_name: str, bucket_key: str, **kwargs): ...AWS Lambda function creation, invocation, and management operations. Supports both synchronous and asynchronous function execution with payload handling and response processing.
class LambdaHook(AwsBaseHook):
def invoke_lambda(self, function_name: str, payload: str = None, **kwargs) -> dict: ...
def create_lambda(self, function_name: str, runtime: str, role: str, **kwargs) -> dict: ...class LambdaInvokeFunctionOperator(BaseOperator):
def __init__(self, function_name: str, payload: str = None, **kwargs): ...Amazon EMR cluster creation, management, and job execution. Supports both traditional EMR clusters and EMR Serverless applications with comprehensive step management.
class EmrHook(AwsBaseHook):
def create_job_flow(self, job_flow_overrides: dict = None, **kwargs) -> str: ...
def add_job_flow_steps(self, job_flow_id: str, steps: list, **kwargs) -> list: ...class EmrCreateJobFlowOperator(BaseOperator):
def __init__(self, job_flow_overrides: dict = None, **kwargs): ...
class EmrAddStepsOperator(BaseOperator):
def __init__(self, job_flow_id: str, steps: list, **kwargs): ...AWS Glue job execution and crawler management for ETL operations. Supports both Glue jobs and Glue DataBrew for data preparation workflows.
class GlueJobHook(AwsBaseHook):
def get_job_state(self, job_name: str, run_id: str) -> str: ...
def initialize_job(self, job_name: str) -> dict: ...class GlueJobOperator(BaseOperator):
def __init__(self, job_name: str, script_args: dict = None, **kwargs): ...Amazon RDS instance management including creation, deletion, snapshot operations, and state management. Supports both traditional RDS instances and Aurora clusters.
class RdsHook(AwsBaseHook):
def create_db_instance(self, db_instance_identifier: str, **kwargs) -> dict: ...
def delete_db_instance(self, db_instance_identifier: str, **kwargs) -> dict: ...class RdsCreateDbInstanceOperator(BaseOperator):
def __init__(self, db_instance_identifier: str, **kwargs): ...Amazon Redshift cluster management and SQL execution through both traditional connections and the Redshift Data API. Supports cluster lifecycle management and query execution.
class RedshiftSqlHook(AwsBaseHook):
def run(self, sql: str, autocommit: bool = False, **kwargs) -> Any: ...
def get_records(self, sql: str, **kwargs) -> list: ...class RedshiftSqlOperator(BaseOperator):
def __init__(self, sql: str, redshift_conn_id: str = 'redshift_default', **kwargs): ...Amazon SageMaker training jobs, model deployment, and endpoint management. Provides comprehensive MLOps integration with support for training, tuning, batch transform, and real-time inference.
class SageMakerHook(AwsBaseHook):
def create_training_job(self, config: dict, **kwargs) -> str: ...
def create_model(self, config: dict, **kwargs) -> str: ...class SageMakerTrainingOperator(BaseOperator):
def __init__(self, config: dict, **kwargs): ...Amazon ECS task execution and service management. Supports both Fargate and EC2 launch types with comprehensive task definition and execution capabilities.
class EcsHook(AwsBaseHook):
def run_task(self, task_definition: str, cluster: str, **kwargs) -> str: ...
def describe_tasks(self, cluster: str, tasks: list, **kwargs) -> dict: ...class EcsRunTaskOperator(BaseOperator):
def __init__(self, task_definition: str, cluster: str, **kwargs): ...Managed containerized job execution at scale with comprehensive lifecycle management, monitoring, and automatic resource provisioning.
class BatchOperator(AwsBaseOperator):
def __init__(self, job_name: str, job_definition: str, job_queue: str, **kwargs): ...
class BatchCreateComputeEnvironmentOperator(AwsBaseOperator):
def __init__(self, compute_environment_name: str, environment_type: str, **kwargs): ...Managed Kubernetes cluster operations with support for node groups, Fargate profiles, and pod execution within Airflow workflows.
class EksCreateClusterOperator(AwsBaseOperator):
def __init__(self, cluster_name: str, cluster_role_arn: str, resources_vpc_config: dict, **kwargs): ...
class EksPodOperator(KubernetesPodOperator):
def __init__(self, cluster_name: str, namespace: str, image: str, **kwargs): ...Serverless SQL query service for data stored in S3, enabling interactive analytics and data processing through standard SQL syntax.
class AthenaOperator(AwsBaseOperator):
def __init__(self, query: str, database: str, output_location: str, **kwargs): ...
class AthenaSensor(BaseSensorOperator):
def __init__(self, query_execution_id: str, **kwargs): ...Managed NoSQL database operations with support for batch data operations, import/export functionality, and seamless S3 integration.
class DynamoDBHook(AwsBaseHook):
def write_batch_data(self, items: Iterable) -> bool: ...
class S3ToDynamoDBOperator(BaseOperator):
def __init__(self, s3_bucket: str, s3_key: str, dynamodb_table: str, **kwargs): ...Comprehensive messaging services for event-driven architectures, enabling pub/sub messaging, queuing, and asynchronous communication patterns.
class SnsPublishOperator(AwsBaseOperator):
def __init__(self, target_arn: str, message: str, subject: str, **kwargs): ...
class SqsPublishOperator(AwsBaseOperator):
def __init__(self, sqs_queue: str, message_content: str, **kwargs): ...Database migration and replication capabilities for seamless data transfer between different database engines and continuous replication.
class DmsCreateTaskOperator(AwsBaseOperator):
def __init__(self, replication_task_id: str, source_endpoint_arn: str, target_endpoint_arn: str, **kwargs): ...
class DmsTaskCompletedSensor(BaseSensorOperator):
def __init__(self, replication_task_arn: str, **kwargs): ...Comprehensive data movement capabilities between AWS services and external systems. Supports transfers between S3, Redshift, databases, FTP/SFTP, and other data sources.
class S3ToRedshiftOperator(BaseOperator):
def __init__(self, schema: str, table: str, s3_bucket: str, s3_key: str, **kwargs): ...
class RedshiftToS3Operator(BaseOperator):
def __init__(self, s3_bucket: str, s3_key: str, schema: str, table: str, **kwargs): ...Centralized AWS authentication and connection management providing secure, configurable access to AWS services with support for multiple authentication methods.
class AwsBaseHook(BaseHook):
def __init__(self, aws_conn_id: str = 'aws_default', **kwargs): ...
def get_credentials(self, region_name: str = None) -> dict: ...
def get_session(self, region_name: str = None) -> Any: ...# Connection configuration
class AwsConnectionConfig:
aws_access_key_id: str
aws_secret_access_key: str
region_name: str
session_token: str = None
role_arn: str = None
# Common AWS resource identifiers
ResourceArn = str
ClusterId = str
JobId = str
InstanceId = str
BucketName = str
KeyName = str
# Task execution states
class TaskExecutionState:
PENDING = "PENDING"
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILED = "FAILED"