Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Amazon S3 (Simple Storage Service) integration providing comprehensive bucket and object management capabilities. This includes creation, deletion, copying, transformation, and monitoring of S3 resources within Airflow workflows.
Core S3 client providing low-level AWS S3 API access with authentication and connection management.
class S3Hook(AwsBaseHook):
def __init__(self, aws_conn_id: str = 'aws_default', verify: bool = None, **kwargs):
"""
Initialize S3 Hook.
Parameters:
- aws_conn_id: AWS connection ID
- verify: SSL certificate verification
"""
def create_bucket(self, bucket_name: str, region_name: str = None) -> bool:
"""
Create an S3 bucket.
Parameters:
- bucket_name: Name of the bucket to create
- region_name: AWS region for bucket creation
Returns:
True if bucket created successfully
"""
def delete_bucket(self, bucket_name: str, force_delete: bool = False) -> None:
"""
Delete an S3 bucket.
Parameters:
- bucket_name: Name of the bucket to delete
- force_delete: Delete bucket even if not empty
"""
def list_keys(self, bucket_name: str, prefix: str = '', delimiter: str = '', page_size: int = None, max_items: int = None) -> list:
"""
List keys in an S3 bucket.
Parameters:
- bucket_name: Name of the bucket
- prefix: Key prefix to filter results
- delimiter: Delimiter for grouping keys
- page_size: Number of items per page
- max_items: Maximum number of items to return
Returns:
List of key names
"""
def list_prefixes(self, bucket_name: str, prefix: str = '', delimiter: str = '', page_size: int = None, max_items: int = None) -> list:
"""
List prefixes in an S3 bucket.
Parameters:
- bucket_name: Name of the bucket
- prefix: Key prefix to filter results
- delimiter: Delimiter for grouping keys
- page_size: Number of items per page
- max_items: Maximum number of items to return
Returns:
List of prefix names
"""
def check_for_bucket(self, bucket_name: str) -> bool:
"""
Check if S3 bucket exists.
Parameters:
- bucket_name: Name of the bucket to check
Returns:
True if bucket exists
"""
def get_key(self, key: str, bucket_name: str = None) -> Any:
"""
Get S3 key object.
Parameters:
- key: S3 key name
- bucket_name: Name of the bucket
Returns:
S3 key object
"""
def check_for_key(self, key: str, bucket_name: str = None) -> bool:
"""
Check if S3 key exists.
Parameters:
- key: S3 key name
- bucket_name: Name of the bucket
Returns:
True if key exists
"""
def get_key_size(self, key: str, bucket_name: str = None) -> int:
"""
Get size of S3 key in bytes.
Parameters:
- key: S3 key name
- bucket_name: Name of the bucket
Returns:
Size in bytes
"""
def copy_object(self, source_bucket_key: str, dest_bucket_key: str, source_bucket_name: str = None, dest_bucket_name: str = None, **kwargs) -> None:
"""
Copy an S3 object.
Parameters:
- source_bucket_key: Source S3 key
- dest_bucket_key: Destination S3 key
- source_bucket_name: Source bucket name
- dest_bucket_name: Destination bucket name
"""
def delete_objects(self, bucket: str, keys: list) -> None:
"""
Delete multiple S3 objects.
Parameters:
- bucket: Name of the bucket
- keys: List of key names to delete
"""
def download_file(self, key: str, bucket_name: str, local_path: str, **kwargs) -> str:
"""
Download S3 object to local file.
Parameters:
- key: S3 key name
- bucket_name: Name of the bucket
- local_path: Local file path for download
Returns:
Local file path
"""
def upload_file(self, filename: str, key: str, bucket_name: str = None, **kwargs) -> None:
"""
Upload local file to S3.
Parameters:
- filename: Local file path to upload
- key: S3 key name for uploaded file
- bucket_name: Name of the bucket
"""
def load_file(self, filename: str, key: str, bucket_name: str = None, replace: bool = True, **kwargs) -> None:
"""
Load local file to S3 key.
Parameters:
- filename: Local file path to load
- key: S3 key name
- bucket_name: Name of the bucket
- replace: Replace existing key if it exists
"""
def load_string(self, string_data: str, key: str, bucket_name: str = None, replace: bool = True, **kwargs) -> None:
"""
Load string data to S3 key.
Parameters:
- string_data: String data to upload
- key: S3 key name
- bucket_name: Name of the bucket
- replace: Replace existing key if it exists
"""
def load_bytes(self, bytes_data: bytes, key: str, bucket_name: str = None, replace: bool = True, **kwargs) -> None:
"""
Load bytes data to S3 key.
Parameters:
- bytes_data: Bytes data to upload
- key: S3 key name
- bucket_name: Name of the bucket
- replace: Replace existing key if it exists
"""
def read_key(self, key: str, bucket_name: str = None) -> str:
"""
Read S3 key content as string.
Parameters:
- key: S3 key name
- bucket_name: Name of the bucket
Returns:
Key content as string
"""
def generate_presigned_url(self, client_method: str, params: dict = None, expires_in: int = 3600, http_method: str = None) -> str:
"""
Generate presigned URL for S3 operations.
Parameters:
- client_method: S3 client method name
- params: Parameters for the method
- expires_in: URL expiration time in seconds
- http_method: HTTP method for the URL
Returns:
Presigned URL
"""Task implementations for S3 operations that can be used directly in Airflow DAGs.
class S3CreateBucketOperator(BaseOperator):
def __init__(self, bucket_name: str, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):
"""
Create an S3 bucket.
Parameters:
- bucket_name: Name of the bucket to create
- aws_conn_id: AWS connection ID
- region_name: AWS region for bucket creation
"""
class S3DeleteBucketOperator(BaseOperator):
def __init__(self, bucket_name: str, force_delete: bool = False, aws_conn_id: str = 'aws_default', **kwargs):
"""
Delete an S3 bucket.
Parameters:
- bucket_name: Name of the bucket to delete
- force_delete: Delete bucket even if not empty
- aws_conn_id: AWS connection ID
"""
class S3DeleteObjectsOperator(BaseOperator):
def __init__(self, bucket: str, keys: list, aws_conn_id: str = 'aws_default', **kwargs):
"""
Delete multiple S3 objects.
Parameters:
- bucket: Name of the bucket
- keys: List of key names to delete
- aws_conn_id: AWS connection ID
"""
class S3CopyObjectOperator(BaseOperator):
def __init__(self, source_bucket_key: str, dest_bucket_key: str, source_bucket_name: str = None, dest_bucket_name: str = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Copy an S3 object.
Parameters:
- source_bucket_key: Source S3 key
- dest_bucket_key: Destination S3 key
- source_bucket_name: Source bucket name
- dest_bucket_name: Destination bucket name
- aws_conn_id: AWS connection ID
"""
class S3CreateObjectOperator(BaseOperator):
def __init__(self, s3_bucket: str, s3_key: str, data: Any, replace: bool = True, aws_conn_id: str = 'aws_default', **kwargs):
"""
Create an S3 object with provided data.
Parameters:
- s3_bucket: Name of the bucket
- s3_key: S3 key name
- data: Data to write to S3 object
- replace: Replace existing object if it exists
- aws_conn_id: AWS connection ID
"""
class S3FileTransformOperator(BaseOperator):
def __init__(self, source_s3_key: str, dest_s3_key: str, transform_script: str, source_aws_conn_id: str = 'aws_default', dest_aws_conn_id: str = 'aws_default', **kwargs):
"""
Transform S3 file using provided script.
Parameters:
- source_s3_key: Source S3 key
- dest_s3_key: Destination S3 key
- transform_script: Transformation script to apply
- source_aws_conn_id: Source AWS connection ID
- dest_aws_conn_id: Destination AWS connection ID
"""
class S3ListOperator(BaseOperator):
def __init__(self, bucket: str, prefix: str = '', delimiter: str = '', aws_conn_id: str = 'aws_default', **kwargs):
"""
List S3 objects in a bucket.
Parameters:
- bucket: Name of the bucket
- prefix: Key prefix to filter results
- delimiter: Delimiter for grouping keys
- aws_conn_id: AWS connection ID
"""
class S3ListPrefixesOperator(BaseOperator):
def __init__(self, bucket: str, prefix: str = '', delimiter: str = '', aws_conn_id: str = 'aws_default', **kwargs):
"""
List S3 prefixes in a bucket.
Parameters:
- bucket: Name of the bucket
- prefix: Key prefix to filter results
- delimiter: Delimiter for grouping keys
- aws_conn_id: AWS connection ID
"""Monitoring tasks that wait for specific S3 conditions or states.
class S3KeySensor(BaseSensorOperator):
def __init__(self, bucket_name: str, bucket_key: str, wildcard_match: bool = False, aws_conn_id: str = 'aws_default', verify: bool = None, **kwargs):
"""
Wait for S3 key to exist.
Parameters:
- bucket_name: Name of the bucket
- bucket_key: S3 key name to wait for
- wildcard_match: Use wildcard matching for key name
- aws_conn_id: AWS connection ID
- verify: SSL certificate verification
"""
class S3KeySizeSensor(BaseSensorOperator):
def __init__(self, bucket_name: str, bucket_key: str, check_fn: callable = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Wait for S3 key size condition.
Parameters:
- bucket_name: Name of the bucket
- bucket_key: S3 key name to check
- check_fn: Function to check key size condition
- aws_conn_id: AWS connection ID
"""
class S3KeysUnchangedSensor(BaseSensorOperator):
def __init__(self, bucket_name: str, prefix: str, aws_conn_id: str = 'aws_default', inactivity_period: int = 60*60, min_objects: int = 1, **kwargs):
"""
Wait for S3 keys to be unchanged for specified period.
Parameters:
- bucket_name: Name of the bucket
- prefix: Key prefix to monitor
- aws_conn_id: AWS connection ID
- inactivity_period: Inactivity period in seconds
- min_objects: Minimum number of objects required
"""
class S3PrefixSensor(BaseSensorOperator):
def __init__(self, bucket_name: str, prefix: str, delimiter: str = '', aws_conn_id: str = 'aws_default', **kwargs):
"""
Wait for S3 prefix to exist.
Parameters:
- bucket_name: Name of the bucket
- prefix: Prefix to wait for
- delimiter: Delimiter for prefix matching
- aws_conn_id: AWS connection ID
"""Asynchronous triggers for efficient S3 monitoring without blocking Airflow workers.
class S3KeyTrigger(BaseTrigger):
def __init__(self, bucket_name: str, bucket_key: str, wildcard_match: bool = False, aws_conn_id: str = 'aws_default', **kwargs):
"""
Asynchronous trigger for S3 key existence.
Parameters:
- bucket_name: Name of the bucket
- bucket_key: S3 key name to wait for
- wildcard_match: Use wildcard matching for key name
- aws_conn_id: AWS connection ID
"""
class S3KeySizeTrigger(BaseTrigger):
def __init__(self, bucket_name: str, bucket_key: str, check_fn: callable = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Asynchronous trigger for S3 key size condition.
Parameters:
- bucket_name: Name of the bucket
- bucket_key: S3 key name to check
- check_fn: Function to check key size condition
- aws_conn_id: AWS connection ID
"""from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# Initialize hook
s3_hook = S3Hook(aws_conn_id='my_aws_conn')
# Create bucket
s3_hook.create_bucket('my-data-bucket', region_name='us-east-1')
# Upload file
s3_hook.load_file('/local/path/data.csv', 'uploads/data.csv', 'my-data-bucket')
# Check if file exists
exists = s3_hook.check_for_key('uploads/data.csv', 'my-data-bucket')
# Download file
s3_hook.download_file('uploads/data.csv', 'my-data-bucket', '/local/path/downloaded.csv')
# List objects with prefix
objects = s3_hook.list_keys('my-data-bucket', prefix='uploads/')from airflow import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3CreateObjectOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
dag = DAG('s3_workflow', start_date=datetime(2023, 1, 1))
create_bucket = S3CreateBucketOperator(
task_id='create_bucket',
bucket_name='my-processing-bucket',
aws_conn_id='aws_default',
dag=dag
)
upload_config = S3CreateObjectOperator(
task_id='upload_config',
s3_bucket='my-processing-bucket',
s3_key='config/settings.json',
data='{"version": "1.0", "environment": "prod"}',
dag=dag
)
wait_for_data = S3KeySensor(
task_id='wait_for_data',
bucket_name='my-processing-bucket',
bucket_key='input/{{ ds }}/data.parquet',
timeout=3600,
dag=dag
)
create_bucket >> upload_config >> wait_for_data# S3 key and bucket identifiers
BucketName = str
KeyName = str
S3Uri = str # Format: s3://bucket/key
# S3 object metadata
class S3ObjectMetadata:
key: str
size: int
last_modified: datetime
etag: str
storage_class: str
# S3 connection configuration
class S3Config:
aws_access_key_id: str
aws_secret_access_key: str
region_name: str = 'us-east-1'
endpoint_url: str = None
verify: bool = TrueInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon