Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Centralized AWS authentication and connection management providing secure, configurable access to AWS services. Handles credentials, sessions, and cross-service authentication patterns with support for multiple authentication methods including IAM roles, access keys, and temporary credentials.
Foundation class for all AWS service hooks providing common authentication and session management functionality.
class AwsBaseHook(BaseHook):
def __init__(self, aws_conn_id: str = 'aws_default', verify: bool = None, region_name: str = None, client_type: str = None, resource_type: str = None, config: dict = None):
"""
Initialize AWS Base Hook.
Parameters:
- aws_conn_id: Airflow connection ID for AWS credentials
- verify: SSL certificate verification (True/False/path to CA bundle)
- region_name: AWS region name
- client_type: AWS service client type (e.g., 's3', 'lambda')
- resource_type: AWS service resource type
- config: Additional configuration for AWS client
"""
def get_client_type(self, client_type: str = None, region_name: str = None, config: dict = None) -> Any:
"""
Get AWS service client.
Parameters:
- client_type: AWS service type (e.g., 's3', 'ec2', 'lambda')
- region_name: AWS region name
- config: Client configuration options
Returns:
Boto3 client instance
"""
def get_resource_type(self, resource_type: str, region_name: str = None, config: dict = None) -> Any:
"""
Get AWS service resource.
Parameters:
- resource_type: AWS service type (e.g., 's3', 'ec2', 'dynamodb')
- region_name: AWS region name
- config: Resource configuration options
Returns:
Boto3 resource instance
"""
def get_session(self, region_name: str = None) -> Any:
"""
Get AWS session with configured credentials.
Parameters:
- region_name: AWS region name
Returns:
Boto3 session instance
"""
def get_credentials(self, region_name: str = None) -> dict:
"""
Get AWS credentials for the configured connection.
Parameters:
- region_name: AWS region name
Returns:
Credentials dictionary with access keys and tokens
"""
def expand_role(self, role: str, region_name: str = None) -> str:
"""
Expand role ARN if needed.
Parameters:
- role: Role name or ARN
- region_name: AWS region name
Returns:
Full role ARN
"""
@staticmethod
def retry(should_retry: callable):
"""
Decorator for implementing retry logic on AWS API calls.
Parameters:
- should_retry: Function to determine if retry should occur
Returns:
Decorator function
"""Connection configuration classes for managing AWS authentication settings.
class AwsGenericHook(AwsBaseHook):
def __init__(self, aws_conn_id: str = 'aws_default', client_type: str = None, **kwargs):
"""
Generic AWS hook for any AWS service.
Parameters:
- aws_conn_id: Airflow connection ID for AWS credentials
- client_type: AWS service client type
"""
def get_conn(self) -> Any:
"""
Get AWS service client connection.
Returns:
Configured AWS service client
"""Utility classes and functions for connection management and configuration.
class ConnectionWrapper:
def __init__(self, conn: Any, region_name: str = None):
"""
Wrapper for Airflow connections with AWS-specific enhancements.
Parameters:
- conn: Airflow connection object
- region_name: AWS region name
"""
@property
def extra_config(self) -> dict:
"""
Get connection extra configuration.
Returns:
Extra configuration dictionary
"""
@property
def aws_access_key_id(self) -> str:
"""
Get AWS access key ID.
Returns:
AWS access key ID
"""
@property
def aws_secret_access_key(self) -> str:
"""
Get AWS secret access key.
Returns:
AWS secret access key
"""
@property
def aws_session_token(self) -> str:
"""
Get AWS session token.
Returns:
AWS session token for temporary credentials
"""
@property
def role_arn(self) -> str:
"""
Get IAM role ARN for assuming roles.
Returns:
IAM role ARN
"""
@property
def region_name(self) -> str:
"""
Get AWS region name.
Returns:
AWS region name
"""
@property
def external_id(self) -> str:
"""
Get external ID for role assumption.
Returns:
External ID for cross-account role assumption
"""
@property
def config_kwargs(self) -> dict:
"""
Get configuration arguments for AWS clients.
Returns:
Configuration dictionary for boto3 clients
"""
def trim_none_values(config: dict) -> dict:
"""
Remove None values from configuration dictionary.
Parameters:
- config: Configuration dictionary
Returns:
Dictionary with None values removed
"""from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
# Using default connection
hook = AwsBaseHook(aws_conn_id='aws_default')
# Get S3 client
s3_client = hook.get_client_type('s3', region_name='us-east-1')
# Get credentials for manual use
credentials = hook.get_credentials()
print(f"Access Key: {credentials['aws_access_key_id']}")
print(f"Region: {credentials['region_name']}")# Using specific connection with custom config
hook = AwsBaseHook(
aws_conn_id='my_aws_prod',
region_name='us-west-2',
config={
'retries': {
'max_attempts': 10,
'mode': 'adaptive'
},
'max_pool_connections': 50
}
)
# Get Lambda client with custom configuration
lambda_client = hook.get_client_type('lambda')from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook
# Both hooks use the same authentication
s3_hook = S3Hook(aws_conn_id='aws_production')
lambda_hook = LambdaHook(aws_conn_id='aws_production')
# Upload file to S3
s3_hook.load_file('/local/data.json', 'processed/data.json', 'my-bucket')
# Trigger Lambda function to process the file
lambda_hook.invoke_lambda(
function_name='data-processor',
payload='{"bucket": "my-bucket", "key": "processed/data.json"}'
)# Hook configured to assume cross-account role
hook = AwsBaseHook(aws_conn_id='cross_account_role')
# The hook will automatically assume the role specified in the connection
# and use temporary credentials for all API calls
ec2_client = hook.get_client_type('ec2', region_name='us-east-1')
# List instances in the cross-account environment
instances = ec2_client.describe_instances()# Example connection configuration via Airflow UI or environment variables
# Standard IAM User credentials:
# Connection ID: aws_default
# Connection Type: Amazon Web Services
# Login: AKIA... (AWS Access Key ID)
# Password: ... (AWS Secret Access Key)
# Extra: {"region_name": "us-east-1"}
# IAM Role assumption:
# Connection ID: aws_role
# Connection Type: Amazon Web Services
# Extra: {
# "role_arn": "arn:aws:iam::123456789012:role/AirflowExecutionRole",
# "region_name": "us-east-1",
# "external_id": "unique-external-id"
# }
# Temporary credentials:
# Connection ID: aws_temp
# Connection Type: Amazon Web Services
# Login: ASIA... (Temporary Access Key ID)
# Password: ... (Temporary Secret Access Key)
# Extra: {
# "aws_session_token": "...",
# "region_name": "us-east-1"
# }from airflow.providers.amazon.aws.utils.connection_wrapper import ConnectionWrapper
# Manual connection wrapper usage
from airflow.models import Connection
conn = Connection(
conn_id='manual_aws',
conn_type='aws',
login='AKIA...',
password='...',
extra='{"region_name": "eu-west-1", "role_arn": "arn:aws:iam::123456789012:role/DataProcessingRole"}'
)
wrapper = ConnectionWrapper(conn)
config = wrapper.config_kwargs
# Use configuration with boto3 directly
import boto3
session = boto3.Session(**config)
s3 = session.client('s3')from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from botocore.exceptions import ClientError
class MyCustomHook(AwsBaseHook):
@AwsBaseHook.retry(lambda e: e.response['Error']['Code'] in ['Throttling', 'ServiceUnavailable'])
def my_api_call(self):
"""API call with automatic retry on specific errors."""
client = self.get_client_type('s3')
try:
return client.list_buckets()
except ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
self.log.error("Access denied - check IAM permissions")
raise
else:
self.log.warning(f"API call failed: {e}")
raiseThe provider supports the following connection types in Airflow:
# Connection type identifiers
AWS_CONNECTION_TYPE = 'aws'
REDSHIFT_CONNECTION_TYPE = 'redshift'
EMR_CONNECTION_TYPE = 'emr'
# Connection configuration keys
class ConnectionConfigKeys:
REGION_NAME = 'region_name'
ROLE_ARN = 'role_arn'
EXTERNAL_ID = 'external_id'
AWS_ACCESS_KEY_ID = 'aws_access_key_id'
AWS_SECRET_ACCESS_KEY = 'aws_secret_access_key'
AWS_SESSION_TOKEN = 'aws_session_token'
ENDPOINT_URL = 'endpoint_url'
CONFIG_KWARGS = 'config_kwargs'# AWS credential types
class AwsCredentials:
aws_access_key_id: str
aws_secret_access_key: str
aws_session_token: str = None
region_name: str = 'us-east-1'
# Connection configuration
class AwsConnectionConfig:
aws_conn_id: str = 'aws_default'
region_name: str = None
role_arn: str = None
external_id: str = None
verify: bool = True
endpoint_url: str = None
config: dict = None
# Session configuration
class SessionConfig:
aws_access_key_id: str = None
aws_secret_access_key: str = None
aws_session_token: str = None
region_name: str = None
botocore_session: Any = None
profile_name: str = None
# Client configuration
class ClientConfig:
region_name: str = None
api_version: str = None
use_ssl: bool = True
verify: bool = None
endpoint_url: str = None
aws_access_key_id: str = None
aws_secret_access_key: str = None
aws_session_token: str = None
config: Any = None # botocore.config.Config
# Authentication methods
class AuthMethod:
IAM_USER = 'iam_user'
IAM_ROLE = 'iam_role'
INSTANCE_PROFILE = 'instance_profile'
CONTAINER_CREDENTIALS = 'container_credentials'
EXTERNAL_ID = 'external_id'
WEB_IDENTITY_TOKEN = 'web_identity_token'Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon