CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

s3-storage.mddocs/

S3 Storage Operations

Amazon S3 (Simple Storage Service) integration providing comprehensive bucket and object management capabilities. This includes creation, deletion, copying, transformation, and monitoring of S3 resources within Airflow workflows.

Capabilities

S3 Hook

Core S3 client providing low-level AWS S3 API access with authentication and connection management.

class S3Hook(AwsBaseHook):
    def __init__(self, aws_conn_id: str = 'aws_default', verify: bool = None, **kwargs):
        """
        Initialize S3 Hook.
        
        Parameters:
        - aws_conn_id: AWS connection ID
        - verify: SSL certificate verification
        """

    def create_bucket(self, bucket_name: str, region_name: str = None) -> bool:
        """
        Create an S3 bucket.
        
        Parameters:
        - bucket_name: Name of the bucket to create
        - region_name: AWS region for bucket creation
        
        Returns:
        True if bucket created successfully
        """

    def delete_bucket(self, bucket_name: str, force_delete: bool = False) -> None:
        """
        Delete an S3 bucket.
        
        Parameters:
        - bucket_name: Name of the bucket to delete
        - force_delete: Delete bucket even if not empty
        """

    def list_keys(self, bucket_name: str, prefix: str = '', delimiter: str = '', page_size: int = None, max_items: int = None) -> list:
        """
        List keys in an S3 bucket.
        
        Parameters:
        - bucket_name: Name of the bucket
        - prefix: Key prefix to filter results
        - delimiter: Delimiter for grouping keys
        - page_size: Number of items per page
        - max_items: Maximum number of items to return
        
        Returns:
        List of key names
        """

    def list_prefixes(self, bucket_name: str, prefix: str = '', delimiter: str = '', page_size: int = None, max_items: int = None) -> list:
        """
        List prefixes in an S3 bucket.
        
        Parameters:
        - bucket_name: Name of the bucket
        - prefix: Key prefix to filter results
        - delimiter: Delimiter for grouping keys
        - page_size: Number of items per page
        - max_items: Maximum number of items to return
        
        Returns:
        List of prefix names
        """

    def check_for_bucket(self, bucket_name: str) -> bool:
        """
        Check if S3 bucket exists.
        
        Parameters:
        - bucket_name: Name of the bucket to check
        
        Returns:
        True if bucket exists
        """

    def get_key(self, key: str, bucket_name: str = None) -> Any:
        """
        Get S3 key object.
        
        Parameters:
        - key: S3 key name
        - bucket_name: Name of the bucket
        
        Returns:
        S3 key object
        """

    def check_for_key(self, key: str, bucket_name: str = None) -> bool:
        """
        Check if S3 key exists.
        
        Parameters:
        - key: S3 key name
        - bucket_name: Name of the bucket
        
        Returns:
        True if key exists
        """

    def get_key_size(self, key: str, bucket_name: str = None) -> int:
        """
        Get size of S3 key in bytes.
        
        Parameters:
        - key: S3 key name
        - bucket_name: Name of the bucket
        
        Returns:
        Size in bytes
        """

    def copy_object(self, source_bucket_key: str, dest_bucket_key: str, source_bucket_name: str = None, dest_bucket_name: str = None, **kwargs) -> None:
        """
        Copy an S3 object.
        
        Parameters:
        - source_bucket_key: Source S3 key
        - dest_bucket_key: Destination S3 key
        - source_bucket_name: Source bucket name
        - dest_bucket_name: Destination bucket name
        """

    def delete_objects(self, bucket: str, keys: list) -> None:
        """
        Delete multiple S3 objects.
        
        Parameters:
        - bucket: Name of the bucket
        - keys: List of key names to delete
        """

    def download_file(self, key: str, bucket_name: str, local_path: str, **kwargs) -> str:
        """
        Download S3 object to local file.
        
        Parameters:
        - key: S3 key name
        - bucket_name: Name of the bucket
        - local_path: Local file path for download
        
        Returns:
        Local file path
        """

    def upload_file(self, filename: str, key: str, bucket_name: str = None, **kwargs) -> None:
        """
        Upload local file to S3.
        
        Parameters:
        - filename: Local file path to upload
        - key: S3 key name for uploaded file
        - bucket_name: Name of the bucket
        """

    def load_file(self, filename: str, key: str, bucket_name: str = None, replace: bool = True, **kwargs) -> None:
        """
        Load local file to S3 key.
        
        Parameters:
        - filename: Local file path to load
        - key: S3 key name
        - bucket_name: Name of the bucket
        - replace: Replace existing key if it exists
        """

    def load_string(self, string_data: str, key: str, bucket_name: str = None, replace: bool = True, **kwargs) -> None:
        """
        Load string data to S3 key.
        
        Parameters:
        - string_data: String data to upload
        - key: S3 key name
        - bucket_name: Name of the bucket
        - replace: Replace existing key if it exists
        """

    def load_bytes(self, bytes_data: bytes, key: str, bucket_name: str = None, replace: bool = True, **kwargs) -> None:
        """
        Load bytes data to S3 key.
        
        Parameters:
        - bytes_data: Bytes data to upload
        - key: S3 key name
        - bucket_name: Name of the bucket
        - replace: Replace existing key if it exists
        """

    def read_key(self, key: str, bucket_name: str = None) -> str:
        """
        Read S3 key content as string.
        
        Parameters:
        - key: S3 key name
        - bucket_name: Name of the bucket
        
        Returns:
        Key content as string
        """

    def generate_presigned_url(self, client_method: str, params: dict = None, expires_in: int = 3600, http_method: str = None) -> str:
        """
        Generate presigned URL for S3 operations.
        
        Parameters:
        - client_method: S3 client method name
        - params: Parameters for the method
        - expires_in: URL expiration time in seconds
        - http_method: HTTP method for the URL
        
        Returns:
        Presigned URL
        """

S3 Operators

Task implementations for S3 operations that can be used directly in Airflow DAGs.

class S3CreateBucketOperator(BaseOperator):
    def __init__(self, bucket_name: str, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):
        """
        Create an S3 bucket.
        
        Parameters:
        - bucket_name: Name of the bucket to create
        - aws_conn_id: AWS connection ID
        - region_name: AWS region for bucket creation
        """

class S3DeleteBucketOperator(BaseOperator):
    def __init__(self, bucket_name: str, force_delete: bool = False, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Delete an S3 bucket.
        
        Parameters:
        - bucket_name: Name of the bucket to delete
        - force_delete: Delete bucket even if not empty
        - aws_conn_id: AWS connection ID
        """

class S3DeleteObjectsOperator(BaseOperator):
    def __init__(self, bucket: str, keys: list, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Delete multiple S3 objects.
        
        Parameters:
        - bucket: Name of the bucket
        - keys: List of key names to delete
        - aws_conn_id: AWS connection ID
        """

class S3CopyObjectOperator(BaseOperator):
    def __init__(self, source_bucket_key: str, dest_bucket_key: str, source_bucket_name: str = None, dest_bucket_name: str = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Copy an S3 object.
        
        Parameters:
        - source_bucket_key: Source S3 key
        - dest_bucket_key: Destination S3 key
        - source_bucket_name: Source bucket name
        - dest_bucket_name: Destination bucket name
        - aws_conn_id: AWS connection ID
        """

class S3CreateObjectOperator(BaseOperator):
    def __init__(self, s3_bucket: str, s3_key: str, data: Any, replace: bool = True, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Create an S3 object with provided data.
        
        Parameters:
        - s3_bucket: Name of the bucket
        - s3_key: S3 key name
        - data: Data to write to S3 object
        - replace: Replace existing object if it exists
        - aws_conn_id: AWS connection ID
        """

class S3FileTransformOperator(BaseOperator):
    def __init__(self, source_s3_key: str, dest_s3_key: str, transform_script: str, source_aws_conn_id: str = 'aws_default', dest_aws_conn_id: str = 'aws_default', **kwargs):
        """
        Transform S3 file using provided script.
        
        Parameters:
        - source_s3_key: Source S3 key
        - dest_s3_key: Destination S3 key
        - transform_script: Transformation script to apply
        - source_aws_conn_id: Source AWS connection ID
        - dest_aws_conn_id: Destination AWS connection ID
        """

class S3ListOperator(BaseOperator):
    def __init__(self, bucket: str, prefix: str = '', delimiter: str = '', aws_conn_id: str = 'aws_default', **kwargs):
        """
        List S3 objects in a bucket.
        
        Parameters:
        - bucket: Name of the bucket
        - prefix: Key prefix to filter results
        - delimiter: Delimiter for grouping keys
        - aws_conn_id: AWS connection ID
        """

class S3ListPrefixesOperator(BaseOperator):
    def __init__(self, bucket: str, prefix: str = '', delimiter: str = '', aws_conn_id: str = 'aws_default', **kwargs):
        """
        List S3 prefixes in a bucket.
        
        Parameters:
        - bucket: Name of the bucket
        - prefix: Key prefix to filter results
        - delimiter: Delimiter for grouping keys
        - aws_conn_id: AWS connection ID
        """

S3 Sensors

Monitoring tasks that wait for specific S3 conditions or states.

class S3KeySensor(BaseSensorOperator):
    def __init__(self, bucket_name: str, bucket_key: str, wildcard_match: bool = False, aws_conn_id: str = 'aws_default', verify: bool = None, **kwargs):
        """
        Wait for S3 key to exist.
        
        Parameters:
        - bucket_name: Name of the bucket
        - bucket_key: S3 key name to wait for
        - wildcard_match: Use wildcard matching for key name
        - aws_conn_id: AWS connection ID
        - verify: SSL certificate verification
        """

class S3KeySizeSensor(BaseSensorOperator):
    def __init__(self, bucket_name: str, bucket_key: str, check_fn: callable = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Wait for S3 key size condition.
        
        Parameters:
        - bucket_name: Name of the bucket
        - bucket_key: S3 key name to check
        - check_fn: Function to check key size condition
        - aws_conn_id: AWS connection ID
        """

class S3KeysUnchangedSensor(BaseSensorOperator):
    def __init__(self, bucket_name: str, prefix: str, aws_conn_id: str = 'aws_default', inactivity_period: int = 60*60, min_objects: int = 1, **kwargs):
        """
        Wait for S3 keys to be unchanged for specified period.
        
        Parameters:
        - bucket_name: Name of the bucket
        - prefix: Key prefix to monitor
        - aws_conn_id: AWS connection ID
        - inactivity_period: Inactivity period in seconds
        - min_objects: Minimum number of objects required
        """

class S3PrefixSensor(BaseSensorOperator):
    def __init__(self, bucket_name: str, prefix: str, delimiter: str = '', aws_conn_id: str = 'aws_default', **kwargs):
        """
        Wait for S3 prefix to exist.
        
        Parameters:
        - bucket_name: Name of the bucket
        - prefix: Prefix to wait for
        - delimiter: Delimiter for prefix matching
        - aws_conn_id: AWS connection ID
        """

S3 Triggers

Asynchronous triggers for efficient S3 monitoring without blocking Airflow workers.

class S3KeyTrigger(BaseTrigger):
    def __init__(self, bucket_name: str, bucket_key: str, wildcard_match: bool = False, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Asynchronous trigger for S3 key existence.
        
        Parameters:
        - bucket_name: Name of the bucket
        - bucket_key: S3 key name to wait for
        - wildcard_match: Use wildcard matching for key name
        - aws_conn_id: AWS connection ID
        """

class S3KeySizeTrigger(BaseTrigger):
    def __init__(self, bucket_name: str, bucket_key: str, check_fn: callable = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Asynchronous trigger for S3 key size condition.
        
        Parameters:
        - bucket_name: Name of the bucket
        - bucket_key: S3 key name to check
        - check_fn: Function to check key size condition
        - aws_conn_id: AWS connection ID
        """

Usage Examples

Basic S3 Operations

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

# Initialize hook
s3_hook = S3Hook(aws_conn_id='my_aws_conn')

# Create bucket
s3_hook.create_bucket('my-data-bucket', region_name='us-east-1')

# Upload file
s3_hook.load_file('/local/path/data.csv', 'uploads/data.csv', 'my-data-bucket')

# Check if file exists
exists = s3_hook.check_for_key('uploads/data.csv', 'my-data-bucket')

# Download file
s3_hook.download_file('uploads/data.csv', 'my-data-bucket', '/local/path/downloaded.csv')

# List objects with prefix
objects = s3_hook.list_keys('my-data-bucket', prefix='uploads/')

S3 DAG Operations

from airflow import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3CreateObjectOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

dag = DAG('s3_workflow', start_date=datetime(2023, 1, 1))

create_bucket = S3CreateBucketOperator(
    task_id='create_bucket',
    bucket_name='my-processing-bucket',
    aws_conn_id='aws_default',
    dag=dag
)

upload_config = S3CreateObjectOperator(
    task_id='upload_config',
    s3_bucket='my-processing-bucket',
    s3_key='config/settings.json',
    data='{"version": "1.0", "environment": "prod"}',
    dag=dag
)

wait_for_data = S3KeySensor(
    task_id='wait_for_data',
    bucket_name='my-processing-bucket',
    bucket_key='input/{{ ds }}/data.parquet',
    timeout=3600,
    dag=dag
)

create_bucket >> upload_config >> wait_for_data

Types

# S3 key and bucket identifiers
BucketName = str
KeyName = str
S3Uri = str  # Format: s3://bucket/key

# S3 object metadata
class S3ObjectMetadata:
    key: str
    size: int
    last_modified: datetime
    etag: str
    storage_class: str

# S3 connection configuration
class S3Config:
    aws_access_key_id: str
    aws_secret_access_key: str
    region_name: str = 'us-east-1'
    endpoint_url: str = None
    verify: bool = True

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json