CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

authentication.mddocs/

Authentication and Connection Management

Centralized AWS authentication and connection management providing secure, configurable access to AWS services. Handles credentials, sessions, and cross-service authentication patterns with support for multiple authentication methods including IAM roles, access keys, and temporary credentials.

Capabilities

AWS Base Hook

Foundation class for all AWS service hooks providing common authentication and session management functionality.

class AwsBaseHook(BaseHook):
    def __init__(self, aws_conn_id: str = 'aws_default', verify: bool = None, region_name: str = None, client_type: str = None, resource_type: str = None, config: dict = None):
        """
        Initialize AWS Base Hook.
        
        Parameters:
        - aws_conn_id: Airflow connection ID for AWS credentials
        - verify: SSL certificate verification (True/False/path to CA bundle)
        - region_name: AWS region name
        - client_type: AWS service client type (e.g., 's3', 'lambda')
        - resource_type: AWS service resource type
        - config: Additional configuration for AWS client
        """

    def get_client_type(self, client_type: str = None, region_name: str = None, config: dict = None) -> Any:
        """
        Get AWS service client.
        
        Parameters:
        - client_type: AWS service type (e.g., 's3', 'ec2', 'lambda')
        - region_name: AWS region name
        - config: Client configuration options
        
        Returns:
        Boto3 client instance
        """

    def get_resource_type(self, resource_type: str, region_name: str = None, config: dict = None) -> Any:
        """
        Get AWS service resource.
        
        Parameters:
        - resource_type: AWS service type (e.g., 's3', 'ec2', 'dynamodb')
        - region_name: AWS region name
        - config: Resource configuration options
        
        Returns:
        Boto3 resource instance
        """

    def get_session(self, region_name: str = None) -> Any:
        """
        Get AWS session with configured credentials.
        
        Parameters:
        - region_name: AWS region name
        
        Returns:
        Boto3 session instance
        """

    def get_credentials(self, region_name: str = None) -> dict:
        """
        Get AWS credentials for the configured connection.
        
        Parameters:
        - region_name: AWS region name
        
        Returns:
        Credentials dictionary with access keys and tokens
        """

    def expand_role(self, role: str, region_name: str = None) -> str:
        """
        Expand role ARN if needed.
        
        Parameters:
        - role: Role name or ARN
        - region_name: AWS region name
        
        Returns:
        Full role ARN
        """

    @staticmethod
    def retry(should_retry: callable):
        """
        Decorator for implementing retry logic on AWS API calls.
        
        Parameters:
        - should_retry: Function to determine if retry should occur
        
        Returns:
        Decorator function
        """

AWS Connection Configuration

Connection configuration classes for managing AWS authentication settings.

class AwsGenericHook(AwsBaseHook):
    def __init__(self, aws_conn_id: str = 'aws_default', client_type: str = None, **kwargs):
        """
        Generic AWS hook for any AWS service.
        
        Parameters:
        - aws_conn_id: Airflow connection ID for AWS credentials
        - client_type: AWS service client type
        """

    def get_conn(self) -> Any:
        """
        Get AWS service client connection.
        
        Returns:
        Configured AWS service client
        """

Connection Utilities

Utility classes and functions for connection management and configuration.

class ConnectionWrapper:
    def __init__(self, conn: Any, region_name: str = None):
        """
        Wrapper for Airflow connections with AWS-specific enhancements.
        
        Parameters:
        - conn: Airflow connection object
        - region_name: AWS region name
        """

    @property
    def extra_config(self) -> dict:
        """
        Get connection extra configuration.
        
        Returns:
        Extra configuration dictionary
        """

    @property
    def aws_access_key_id(self) -> str:
        """
        Get AWS access key ID.
        
        Returns:
        AWS access key ID
        """

    @property
    def aws_secret_access_key(self) -> str:
        """
        Get AWS secret access key.
        
        Returns:
        AWS secret access key
        """

    @property
    def aws_session_token(self) -> str:
        """
        Get AWS session token.
        
        Returns:
        AWS session token for temporary credentials
        """

    @property
    def role_arn(self) -> str:
        """
        Get IAM role ARN for assuming roles.
        
        Returns:
        IAM role ARN
        """

    @property
    def region_name(self) -> str:
        """
        Get AWS region name.
        
        Returns:
        AWS region name
        """

    @property
    def external_id(self) -> str:
        """
        Get external ID for role assumption.
        
        Returns:
        External ID for cross-account role assumption
        """

    @property
    def config_kwargs(self) -> dict:
        """
        Get configuration arguments for AWS clients.
        
        Returns:
        Configuration dictionary for boto3 clients
        """

def trim_none_values(config: dict) -> dict:
    """
    Remove None values from configuration dictionary.
    
    Parameters:
    - config: Configuration dictionary
    
    Returns:
    Dictionary with None values removed
    """

Usage Examples

Basic Authentication Setup

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

# Using default connection
hook = AwsBaseHook(aws_conn_id='aws_default')

# Get S3 client
s3_client = hook.get_client_type('s3', region_name='us-east-1')

# Get credentials for manual use
credentials = hook.get_credentials()
print(f"Access Key: {credentials['aws_access_key_id']}")
print(f"Region: {credentials['region_name']}")

Custom Connection Configuration

# Using specific connection with custom config
hook = AwsBaseHook(
    aws_conn_id='my_aws_prod',
    region_name='us-west-2',
    config={
        'retries': {
            'max_attempts': 10,
            'mode': 'adaptive'
        },
        'max_pool_connections': 50
    }
)

# Get Lambda client with custom configuration
lambda_client = hook.get_client_type('lambda')

Cross-Service Authentication

from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook

# Both hooks use the same authentication
s3_hook = S3Hook(aws_conn_id='aws_production')
lambda_hook = LambdaHook(aws_conn_id='aws_production')

# Upload file to S3
s3_hook.load_file('/local/data.json', 'processed/data.json', 'my-bucket')

# Trigger Lambda function to process the file
lambda_hook.invoke_lambda(
    function_name='data-processor',
    payload='{"bucket": "my-bucket", "key": "processed/data.json"}'
)

Role Assumption

# Hook configured to assume cross-account role
hook = AwsBaseHook(aws_conn_id='cross_account_role')

# The hook will automatically assume the role specified in the connection
# and use temporary credentials for all API calls
ec2_client = hook.get_client_type('ec2', region_name='us-east-1')

# List instances in the cross-account environment
instances = ec2_client.describe_instances()

Connection Configuration in Airflow

# Example connection configuration via Airflow UI or environment variables

# Standard IAM User credentials:
# Connection ID: aws_default
# Connection Type: Amazon Web Services
# Login: AKIA... (AWS Access Key ID)
# Password: ... (AWS Secret Access Key)
# Extra: {"region_name": "us-east-1"}

# IAM Role assumption:
# Connection ID: aws_role
# Connection Type: Amazon Web Services  
# Extra: {
#   "role_arn": "arn:aws:iam::123456789012:role/AirflowExecutionRole",
#   "region_name": "us-east-1",
#   "external_id": "unique-external-id"
# }

# Temporary credentials:
# Connection ID: aws_temp
# Connection Type: Amazon Web Services
# Login: ASIA... (Temporary Access Key ID)  
# Password: ... (Temporary Secret Access Key)
# Extra: {
#   "aws_session_token": "...",
#   "region_name": "us-east-1"
# }

Advanced Configuration

from airflow.providers.amazon.aws.utils.connection_wrapper import ConnectionWrapper

# Manual connection wrapper usage
from airflow.models import Connection

conn = Connection(
    conn_id='manual_aws',
    conn_type='aws',
    login='AKIA...',
    password='...',
    extra='{"region_name": "eu-west-1", "role_arn": "arn:aws:iam::123456789012:role/DataProcessingRole"}'
)

wrapper = ConnectionWrapper(conn)
config = wrapper.config_kwargs

# Use configuration with boto3 directly
import boto3
session = boto3.Session(**config)
s3 = session.client('s3')

Error Handling and Retries

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from botocore.exceptions import ClientError

class MyCustomHook(AwsBaseHook):
    @AwsBaseHook.retry(lambda e: e.response['Error']['Code'] in ['Throttling', 'ServiceUnavailable'])
    def my_api_call(self):
        """API call with automatic retry on specific errors."""
        client = self.get_client_type('s3')
        try:
            return client.list_buckets()
        except ClientError as e:
            if e.response['Error']['Code'] == 'AccessDenied':
                self.log.error("Access denied - check IAM permissions")
                raise
            else:
                self.log.warning(f"API call failed: {e}")
                raise

Connection Types

AWS Connection Types

The provider supports the following connection types in Airflow:

# Connection type identifiers
AWS_CONNECTION_TYPE = 'aws'
REDSHIFT_CONNECTION_TYPE = 'redshift'
EMR_CONNECTION_TYPE = 'emr'

# Connection configuration keys
class ConnectionConfigKeys:
    REGION_NAME = 'region_name'
    ROLE_ARN = 'role_arn'
    EXTERNAL_ID = 'external_id'
    AWS_ACCESS_KEY_ID = 'aws_access_key_id'
    AWS_SECRET_ACCESS_KEY = 'aws_secret_access_key'
    AWS_SESSION_TOKEN = 'aws_session_token'
    ENDPOINT_URL = 'endpoint_url'
    CONFIG_KWARGS = 'config_kwargs'

Types

# AWS credential types
class AwsCredentials:
    aws_access_key_id: str
    aws_secret_access_key: str
    aws_session_token: str = None
    region_name: str = 'us-east-1'

# Connection configuration
class AwsConnectionConfig:
    aws_conn_id: str = 'aws_default'
    region_name: str = None
    role_arn: str = None
    external_id: str = None
    verify: bool = True
    endpoint_url: str = None
    config: dict = None

# Session configuration
class SessionConfig:
    aws_access_key_id: str = None
    aws_secret_access_key: str = None
    aws_session_token: str = None
    region_name: str = None
    botocore_session: Any = None
    profile_name: str = None

# Client configuration
class ClientConfig:
    region_name: str = None
    api_version: str = None
    use_ssl: bool = True
    verify: bool = None
    endpoint_url: str = None
    aws_access_key_id: str = None
    aws_secret_access_key: str = None
    aws_session_token: str = None
    config: Any = None  # botocore.config.Config

# Authentication methods
class AuthMethod:
    IAM_USER = 'iam_user'
    IAM_ROLE = 'iam_role'
    INSTANCE_PROFILE = 'instance_profile'
    CONTAINER_CREDENTIALS = 'container_credentials'
    EXTERNAL_ID = 'external_id'
    WEB_IDENTITY_TOKEN = 'web_identity_token'

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json