CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-transfers.mddocs/

Data Transfer Operations

Comprehensive data movement capabilities between AWS services and external systems. Provides operators for transferring data between S3, Redshift, databases, FTP/SFTP servers, and other data sources with support for various formats and transformation options.

Capabilities

S3 to Redshift Transfer

Transfer data from S3 to Redshift tables with support for COPY command options and data transformation.

class S3ToRedshiftOperator(BaseOperator):
    def __init__(self, schema: str, table: str, s3_bucket: str, s3_key: str, redshift_conn_id: str = 'redshift_default', aws_conn_id: str = 'aws_default', verify: bool = None, wildcard_match: bool = False, copy_options: list = None, autocommit: bool = True, parameters: dict = None, **kwargs):
        """
        Transfer data from S3 to Redshift table.
        
        Parameters:
        - schema: Redshift schema name
        - table: Redshift table name
        - s3_bucket: S3 bucket containing source data
        - s3_key: S3 key path for source data
        - redshift_conn_id: Redshift connection ID
        - aws_conn_id: AWS connection ID for S3 access
        - verify: SSL certificate verification
        - wildcard_match: Use wildcard matching for S3 keys
        - copy_options: Additional COPY command options
        - autocommit: Auto-commit the transaction
        - parameters: Additional parameters for the operation
        """

Redshift to S3 Transfer

Export data from Redshift tables to S3 with support for UNLOAD command options and parallel processing.

class RedshiftToS3Operator(BaseOperator):
    def __init__(self, s3_bucket: str, s3_key: str, schema: str = None, table: str = None, select_query: str = None, redshift_conn_id: str = 'redshift_default', aws_conn_id: str = 'aws_default', verify: bool = None, unload_options: list = None, autocommit: bool = True, include_header: bool = False, **kwargs):
        """
        Transfer data from Redshift to S3.
        
        Parameters:
        - s3_bucket: S3 bucket for destination data
        - s3_key: S3 key path for destination data
        - schema: Redshift schema name (if using table)
        - table: Redshift table name (if using table)
        - select_query: Custom SELECT query (alternative to schema/table)
        - redshift_conn_id: Redshift connection ID
        - aws_conn_id: AWS connection ID for S3 access
        - verify: SSL certificate verification
        - unload_options: Additional UNLOAD command options
        - autocommit: Auto-commit the transaction
        - include_header: Include column headers in output
        """

SQL to S3 Transfer

Transfer SQL query results from any database to S3 with support for various file formats and partitioning.

class SqlToS3Operator(BaseOperator):
    def __init__(self, query: str, s3_bucket: str, s3_key: str, sql_conn_id: str, aws_conn_id: str = 'aws_default', verify: bool = None, replace: bool = True, pd_kwargs: dict = None, index: bool = True, **kwargs):
        """
        Transfer SQL query results to S3.
        
        Parameters:
        - query: SQL query to execute
        - s3_bucket: S3 bucket for destination data
        - s3_key: S3 key path for destination data
        - sql_conn_id: SQL database connection ID
        - aws_conn_id: AWS connection ID for S3 access
        - verify: SSL certificate verification
        - replace: Replace existing S3 object
        - pd_kwargs: Additional pandas parameters for data processing
        - index: Include DataFrame index in output
        """

S3 to SQL Transfer

Transfer data from S3 to SQL databases with support for data type inference and table creation.

class S3ToSqlOperator(BaseOperator):
    def __init__(self, s3_bucket: str, s3_key: str, table: str, sql_conn_id: str, aws_conn_id: str = 'aws_default', verify: bool = None, wildcard_match: bool = False, copy_options: dict = None, **kwargs):
        """
        Transfer data from S3 to SQL database table.
        
        Parameters:
        - s3_bucket: S3 bucket containing source data
        - s3_key: S3 key path for source data
        - table: Destination table name
        - sql_conn_id: SQL database connection ID
        - aws_conn_id: AWS connection ID for S3 access
        - verify: SSL certificate verification
        - wildcard_match: Use wildcard matching for S3 keys
        - copy_options: Additional copy options
        """

DynamoDB to S3 Transfer

Export DynamoDB table data to S3 with support for parallel scans and data filtering.

class DynamoDbToS3Operator(BaseOperator):
    def __init__(self, dynamodb_table_name: str, s3_bucket_name: str, s3_key: str, aws_conn_id: str = 'aws_default', dynamodb_scan_kwargs: dict = None, s3_key_type: str = 'table_name', **kwargs):
        """
        Export DynamoDB table to S3.
        
        Parameters:
        - dynamodb_table_name: DynamoDB table name
        - s3_bucket_name: S3 bucket for destination data
        - s3_key: S3 key path for destination data
        - aws_conn_id: AWS connection ID
        - dynamodb_scan_kwargs: Additional DynamoDB scan parameters
        - s3_key_type: S3 key generation strategy
        """

File System Transfers

Transfer operations between S3 and local file systems or remote file servers.

class LocalFilesystemToS3Operator(BaseOperator):
    def __init__(self, filename: str, dest_key: str, dest_bucket: str = None, aws_conn_id: str = 'aws_default', verify: bool = None, replace: bool = True, **kwargs):
        """
        Transfer local file to S3.
        
        Parameters:
        - filename: Local file path
        - dest_key: S3 destination key
        - dest_bucket: S3 destination bucket
        - aws_conn_id: AWS connection ID
        - verify: SSL certificate verification
        - replace: Replace existing S3 object
        """

class S3ToFtpOperator(BaseOperator):
    def __init__(self, s3_bucket: str, s3_key: str, ftp_path: str, ftp_conn_id: str = 'ftp_default', aws_conn_id: str = 'aws_default', **kwargs):
        """
        Transfer S3 object to FTP server.
        
        Parameters:
        - s3_bucket: S3 source bucket
        - s3_key: S3 source key
        - ftp_path: FTP destination path
        - ftp_conn_id: FTP connection ID
        - aws_conn_id: AWS connection ID
        """

class FtpToS3Operator(BaseOperator):
    def __init__(self, ftp_path: str, s3_bucket: str, s3_key: str, ftp_conn_id: str = 'ftp_default', aws_conn_id: str = 'aws_default', **kwargs):
        """
        Transfer file from FTP server to S3.
        
        Parameters:
        - ftp_path: FTP source path
        - s3_bucket: S3 destination bucket
        - s3_key: S3 destination key
        - ftp_conn_id: FTP connection ID
        - aws_conn_id: AWS connection ID
        """

class S3ToSftpOperator(BaseOperator):
    def __init__(self, sftp_path: str, s3_bucket: str, s3_key: str, sftp_conn_id: str = 'sftp_default', aws_conn_id: str = 'aws_default', **kwargs):
        """
        Transfer S3 object to SFTP server.
        
        Parameters:
        - sftp_path: SFTP destination path
        - s3_bucket: S3 source bucket
        - s3_key: S3 source key
        - sftp_conn_id: SFTP connection ID
        - aws_conn_id: AWS connection ID
        """

class SftpToS3Operator(BaseOperator):
    def __init__(self, s3_bucket: str, s3_key: str, sftp_path: str, sftp_conn_id: str = 'sftp_default', aws_conn_id: str = 'aws_default', **kwargs):
        """
        Transfer file from SFTP server to S3.
        
        Parameters:
        - s3_bucket: S3 destination bucket
        - s3_key: S3 destination key
        - sftp_path: SFTP source path
        - sftp_conn_id: SFTP connection ID
        - aws_conn_id: AWS connection ID
        """

Web and API Transfers

Transfer data from web APIs and HTTP sources to S3.

class HttpToS3Operator(BaseOperator):
    def __init__(self, endpoint: str, s3_bucket: str, s3_key: str, http_conn_id: str = 'http_default', aws_conn_id: str = 'aws_default', method: str = 'GET', headers: dict = None, **kwargs):
        """
        Transfer HTTP response data to S3.
        
        Parameters:
        - endpoint: HTTP endpoint URL
        - s3_bucket: S3 destination bucket
        - s3_key: S3 destination key
        - http_conn_id: HTTP connection ID
        - aws_conn_id: AWS connection ID
        - method: HTTP method (GET, POST, etc.)
        - headers: HTTP request headers
        """

class GoogleApiToS3Operator(BaseOperator):
    def __init__(self, google_api_service_name: str, google_api_service_version: str, google_api_endpoint_path: str, s3_bucket: str, s3_key: str, google_conn_id: str = 'google_cloud_default', aws_conn_id: str = 'aws_default', **kwargs):
        """
        Transfer Google API data to S3.
        
        Parameters:
        - google_api_service_name: Google API service name
        - google_api_service_version: Google API service version
        - google_api_endpoint_path: API endpoint path
        - s3_bucket: S3 destination bucket
        - s3_key: S3 destination key
        - google_conn_id: Google Cloud connection ID
        - aws_conn_id: AWS connection ID
        """

Database Transfers

Transfer data between S3 and various database systems.

class MongoToS3Operator(BaseOperator):
    def __init__(self, mongo_collection: str, s3_bucket: str, s3_key: str, mongo_conn_id: str = 'mongo_default', aws_conn_id: str = 'aws_default', mongo_query: dict = None, **kwargs):
        """
        Transfer MongoDB collection data to S3.
        
        Parameters:
        - mongo_collection: MongoDB collection name
        - s3_bucket: S3 destination bucket
        - s3_key: S3 destination key
        - mongo_conn_id: MongoDB connection ID
        - aws_conn_id: AWS connection ID
        - mongo_query: MongoDB query filter
        """

Usage Examples

S3 to Redshift Data Pipeline

from airflow import DAG
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

dag = DAG('s3_to_redshift_pipeline', start_date=datetime(2023, 1, 1))

# Wait for data file in S3
wait_for_data = S3KeySensor(
    task_id='wait_for_data',
    bucket_name='data-lake-bucket',
    bucket_key='raw-data/{{ ds }}/sales_data.csv',
    timeout=3600,
    dag=dag
)

# Load data into Redshift
load_to_redshift = S3ToRedshiftOperator(
    task_id='load_to_redshift',
    schema='analytics',
    table='daily_sales',
    s3_bucket='data-lake-bucket',
    s3_key='raw-data/{{ ds }}/sales_data.csv',
    redshift_conn_id='redshift_prod',
    aws_conn_id='aws_default',
    copy_options=[
        "CSV",
        "IGNOREHEADER 1",
        "DATEFORMAT 'YYYY-MM-DD'",
        "TRUNCATECOLUMNS"
    ],
    dag=dag
)

wait_for_data >> load_to_redshift

Database Export to S3

from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator

# Export PostgreSQL query results to S3
export_sales_data = SqlToS3Operator(
    task_id='export_sales_data',
    query="""
        SELECT 
            date_trunc('day', order_date) as order_day,
            customer_id,
            product_id,
            quantity,
            price,
            total_amount
        FROM orders 
        WHERE order_date >= '{{ ds }}' 
        AND order_date < '{{ next_ds }}'
    """,
    s3_bucket='analytics-exports',
    s3_key='exports/sales/{{ ds }}/daily_sales.parquet',
    sql_conn_id='postgres_prod',
    aws_conn_id='aws_default',
    pd_kwargs={
        'dtype': {
            'customer_id': 'int64',
            'product_id': 'int64',
            'quantity': 'int32',
            'price': 'float64'
        }
    },
    dag=dag
)

Multi-Source Data Integration

from airflow.providers.amazon.aws.transfers.ftp_to_s3 import FtpToS3Operator
from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator

# Download files from multiple sources
ftp_download = FtpToS3Operator(
    task_id='download_from_ftp',
    ftp_path='/data/exports/customer_data.csv',
    s3_bucket='integration-staging',
    s3_key='sources/ftp/{{ ds }}/customer_data.csv',
    ftp_conn_id='partner_ftp',
    dag=dag
)

api_download = HttpToS3Operator(
    task_id='download_from_api',
    endpoint='/api/v1/products?date={{ ds }}',
    s3_bucket='integration-staging',
    s3_key='sources/api/{{ ds }}/products.json',
    http_conn_id='partner_api',
    headers={'Authorization': 'Bearer {{ var.value.api_token }}'},
    dag=dag
)

# Load all data into warehouse
load_customers = S3ToRedshiftOperator(
    task_id='load_customers',
    schema='staging',
    table='customers',
    s3_bucket='integration-staging',
    s3_key='sources/ftp/{{ ds }}/',
    wildcard_match=True,
    copy_options=["CSV", "IGNOREHEADER 1"],
    dag=dag
)

# Parallel downloads, then load
[ftp_download, api_download] >> load_customers

Data Export Pipeline

from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_sftp import S3ToSftpOperator

# Export aggregated data from Redshift
export_summary = RedshiftToS3Operator(
    task_id='export_summary',
    select_query="""
        SELECT 
            region,
            product_category,
            DATE_TRUNC('month', sale_date) as month,
            SUM(revenue) as total_revenue,
            COUNT(*) as transaction_count
        FROM fact_sales
        WHERE sale_date >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month')
        AND sale_date < DATE_TRUNC('month', CURRENT_DATE)
        GROUP BY region, product_category, month
    """,
    s3_bucket='analytics-exports',
    s3_key='reports/monthly_summary_{{ ds }}.csv',
    redshift_conn_id='redshift_prod',
    unload_options=[
        "HEADER",
        "DELIMITER ','",
        "NULL AS ''"
    ],
    include_header=True,
    dag=dag
)

# Send report to external partner
send_to_partner = S3ToSftpOperator(
    task_id='send_to_partner',
    s3_bucket='analytics-exports',
    s3_key='reports/monthly_summary_{{ ds }}.csv000',
    sftp_path='/uploads/monthly_summary_{{ ds }}.csv',
    sftp_conn_id='partner_sftp',
    dag=dag
)

export_summary >> send_to_partner

DynamoDB Backup to S3

from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDbToS3Operator

# Daily backup of DynamoDB table
backup_user_sessions = DynamoDbToS3Operator(
    task_id='backup_user_sessions',
    dynamodb_table_name='user_sessions',
    s3_bucket_name='dynamodb-backups',
    s3_key='backups/user_sessions/{{ ds }}/sessions.json',
    aws_conn_id='aws_default',
    dynamodb_scan_kwargs={
        'FilterExpression': 'attribute_exists(session_id)',
        'ProjectionExpression': 'session_id, user_id, created_at, last_active'
    },
    dag=dag
)

Types

# Transfer configuration types
class TransferConfig:
    source_conn_id: str
    dest_conn_id: str
    batch_size: int = 1000
    parallel: bool = False
    verify_data: bool = True

# S3 transfer options
class S3TransferOptions:
    multipart_threshold: int = 8 * 1024 * 1024  # 8MB
    max_concurrency: int = 10
    multipart_chunksize: int = 8 * 1024 * 1024
    use_threads: bool = True

# Redshift COPY options
class RedshiftCopyOptions:
    format: str = 'CSV'
    delimiter: str = ','
    quote_character: str = '"'
    escape_character: str = None
    null_as: str = ''
    ignore_header: int = 0
    date_format: str = 'auto'
    time_format: str = 'auto'
    ignore_blank_lines: bool = True
    truncate_columns: bool = False
    fill_record: bool = False
    blanks_as_null: bool = True
    empty_as_null: bool = True
    explicit_ids: bool = False
    acceptanydate: bool = False
    acceptinvchars: str = None
    max_error: int = 0
    compupdate: bool = True
    statupdate: bool = True

# File format types
class FileFormat:
    CSV = 'csv'
    JSON = 'json'
    PARQUET = 'parquet'
    AVRO = 'avro'
    ORC = 'orc'
    TSV = 'tsv'

# Compression types
class CompressionType:
    NONE = None
    GZIP = 'gzip'
    BZIP2 = 'bz2'
    LZOP = 'lzop'
    ZSTD = 'zstd'

# Transfer status
class TransferStatus:
    PENDING = 'pending'
    RUNNING = 'running'
    SUCCESS = 'success'
    FAILED = 'failed'
    CANCELLED = 'cancelled'

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json