Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive data movement capabilities between AWS services and external systems. Provides operators for transferring data between S3, Redshift, databases, FTP/SFTP servers, and other data sources with support for various formats and transformation options.
Transfer data from S3 to Redshift tables with support for COPY command options and data transformation.
class S3ToRedshiftOperator(BaseOperator):
def __init__(self, schema: str, table: str, s3_bucket: str, s3_key: str, redshift_conn_id: str = 'redshift_default', aws_conn_id: str = 'aws_default', verify: bool = None, wildcard_match: bool = False, copy_options: list = None, autocommit: bool = True, parameters: dict = None, **kwargs):
"""
Transfer data from S3 to Redshift table.
Parameters:
- schema: Redshift schema name
- table: Redshift table name
- s3_bucket: S3 bucket containing source data
- s3_key: S3 key path for source data
- redshift_conn_id: Redshift connection ID
- aws_conn_id: AWS connection ID for S3 access
- verify: SSL certificate verification
- wildcard_match: Use wildcard matching for S3 keys
- copy_options: Additional COPY command options
- autocommit: Auto-commit the transaction
- parameters: Additional parameters for the operation
"""Export data from Redshift tables to S3 with support for UNLOAD command options and parallel processing.
class RedshiftToS3Operator(BaseOperator):
def __init__(self, s3_bucket: str, s3_key: str, schema: str = None, table: str = None, select_query: str = None, redshift_conn_id: str = 'redshift_default', aws_conn_id: str = 'aws_default', verify: bool = None, unload_options: list = None, autocommit: bool = True, include_header: bool = False, **kwargs):
"""
Transfer data from Redshift to S3.
Parameters:
- s3_bucket: S3 bucket for destination data
- s3_key: S3 key path for destination data
- schema: Redshift schema name (if using table)
- table: Redshift table name (if using table)
- select_query: Custom SELECT query (alternative to schema/table)
- redshift_conn_id: Redshift connection ID
- aws_conn_id: AWS connection ID for S3 access
- verify: SSL certificate verification
- unload_options: Additional UNLOAD command options
- autocommit: Auto-commit the transaction
- include_header: Include column headers in output
"""Transfer SQL query results from any database to S3 with support for various file formats and partitioning.
class SqlToS3Operator(BaseOperator):
def __init__(self, query: str, s3_bucket: str, s3_key: str, sql_conn_id: str, aws_conn_id: str = 'aws_default', verify: bool = None, replace: bool = True, pd_kwargs: dict = None, index: bool = True, **kwargs):
"""
Transfer SQL query results to S3.
Parameters:
- query: SQL query to execute
- s3_bucket: S3 bucket for destination data
- s3_key: S3 key path for destination data
- sql_conn_id: SQL database connection ID
- aws_conn_id: AWS connection ID for S3 access
- verify: SSL certificate verification
- replace: Replace existing S3 object
- pd_kwargs: Additional pandas parameters for data processing
- index: Include DataFrame index in output
"""Transfer data from S3 to SQL databases with support for data type inference and table creation.
class S3ToSqlOperator(BaseOperator):
def __init__(self, s3_bucket: str, s3_key: str, table: str, sql_conn_id: str, aws_conn_id: str = 'aws_default', verify: bool = None, wildcard_match: bool = False, copy_options: dict = None, **kwargs):
"""
Transfer data from S3 to SQL database table.
Parameters:
- s3_bucket: S3 bucket containing source data
- s3_key: S3 key path for source data
- table: Destination table name
- sql_conn_id: SQL database connection ID
- aws_conn_id: AWS connection ID for S3 access
- verify: SSL certificate verification
- wildcard_match: Use wildcard matching for S3 keys
- copy_options: Additional copy options
"""Export DynamoDB table data to S3 with support for parallel scans and data filtering.
class DynamoDbToS3Operator(BaseOperator):
def __init__(self, dynamodb_table_name: str, s3_bucket_name: str, s3_key: str, aws_conn_id: str = 'aws_default', dynamodb_scan_kwargs: dict = None, s3_key_type: str = 'table_name', **kwargs):
"""
Export DynamoDB table to S3.
Parameters:
- dynamodb_table_name: DynamoDB table name
- s3_bucket_name: S3 bucket for destination data
- s3_key: S3 key path for destination data
- aws_conn_id: AWS connection ID
- dynamodb_scan_kwargs: Additional DynamoDB scan parameters
- s3_key_type: S3 key generation strategy
"""Transfer operations between S3 and local file systems or remote file servers.
class LocalFilesystemToS3Operator(BaseOperator):
def __init__(self, filename: str, dest_key: str, dest_bucket: str = None, aws_conn_id: str = 'aws_default', verify: bool = None, replace: bool = True, **kwargs):
"""
Transfer local file to S3.
Parameters:
- filename: Local file path
- dest_key: S3 destination key
- dest_bucket: S3 destination bucket
- aws_conn_id: AWS connection ID
- verify: SSL certificate verification
- replace: Replace existing S3 object
"""
class S3ToFtpOperator(BaseOperator):
def __init__(self, s3_bucket: str, s3_key: str, ftp_path: str, ftp_conn_id: str = 'ftp_default', aws_conn_id: str = 'aws_default', **kwargs):
"""
Transfer S3 object to FTP server.
Parameters:
- s3_bucket: S3 source bucket
- s3_key: S3 source key
- ftp_path: FTP destination path
- ftp_conn_id: FTP connection ID
- aws_conn_id: AWS connection ID
"""
class FtpToS3Operator(BaseOperator):
def __init__(self, ftp_path: str, s3_bucket: str, s3_key: str, ftp_conn_id: str = 'ftp_default', aws_conn_id: str = 'aws_default', **kwargs):
"""
Transfer file from FTP server to S3.
Parameters:
- ftp_path: FTP source path
- s3_bucket: S3 destination bucket
- s3_key: S3 destination key
- ftp_conn_id: FTP connection ID
- aws_conn_id: AWS connection ID
"""
class S3ToSftpOperator(BaseOperator):
def __init__(self, sftp_path: str, s3_bucket: str, s3_key: str, sftp_conn_id: str = 'sftp_default', aws_conn_id: str = 'aws_default', **kwargs):
"""
Transfer S3 object to SFTP server.
Parameters:
- sftp_path: SFTP destination path
- s3_bucket: S3 source bucket
- s3_key: S3 source key
- sftp_conn_id: SFTP connection ID
- aws_conn_id: AWS connection ID
"""
class SftpToS3Operator(BaseOperator):
def __init__(self, s3_bucket: str, s3_key: str, sftp_path: str, sftp_conn_id: str = 'sftp_default', aws_conn_id: str = 'aws_default', **kwargs):
"""
Transfer file from SFTP server to S3.
Parameters:
- s3_bucket: S3 destination bucket
- s3_key: S3 destination key
- sftp_path: SFTP source path
- sftp_conn_id: SFTP connection ID
- aws_conn_id: AWS connection ID
"""Transfer data from web APIs and HTTP sources to S3.
class HttpToS3Operator(BaseOperator):
def __init__(self, endpoint: str, s3_bucket: str, s3_key: str, http_conn_id: str = 'http_default', aws_conn_id: str = 'aws_default', method: str = 'GET', headers: dict = None, **kwargs):
"""
Transfer HTTP response data to S3.
Parameters:
- endpoint: HTTP endpoint URL
- s3_bucket: S3 destination bucket
- s3_key: S3 destination key
- http_conn_id: HTTP connection ID
- aws_conn_id: AWS connection ID
- method: HTTP method (GET, POST, etc.)
- headers: HTTP request headers
"""
class GoogleApiToS3Operator(BaseOperator):
def __init__(self, google_api_service_name: str, google_api_service_version: str, google_api_endpoint_path: str, s3_bucket: str, s3_key: str, google_conn_id: str = 'google_cloud_default', aws_conn_id: str = 'aws_default', **kwargs):
"""
Transfer Google API data to S3.
Parameters:
- google_api_service_name: Google API service name
- google_api_service_version: Google API service version
- google_api_endpoint_path: API endpoint path
- s3_bucket: S3 destination bucket
- s3_key: S3 destination key
- google_conn_id: Google Cloud connection ID
- aws_conn_id: AWS connection ID
"""Transfer data between S3 and various database systems.
class MongoToS3Operator(BaseOperator):
def __init__(self, mongo_collection: str, s3_bucket: str, s3_key: str, mongo_conn_id: str = 'mongo_default', aws_conn_id: str = 'aws_default', mongo_query: dict = None, **kwargs):
"""
Transfer MongoDB collection data to S3.
Parameters:
- mongo_collection: MongoDB collection name
- s3_bucket: S3 destination bucket
- s3_key: S3 destination key
- mongo_conn_id: MongoDB connection ID
- aws_conn_id: AWS connection ID
- mongo_query: MongoDB query filter
"""from airflow import DAG
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
dag = DAG('s3_to_redshift_pipeline', start_date=datetime(2023, 1, 1))
# Wait for data file in S3
wait_for_data = S3KeySensor(
task_id='wait_for_data',
bucket_name='data-lake-bucket',
bucket_key='raw-data/{{ ds }}/sales_data.csv',
timeout=3600,
dag=dag
)
# Load data into Redshift
load_to_redshift = S3ToRedshiftOperator(
task_id='load_to_redshift',
schema='analytics',
table='daily_sales',
s3_bucket='data-lake-bucket',
s3_key='raw-data/{{ ds }}/sales_data.csv',
redshift_conn_id='redshift_prod',
aws_conn_id='aws_default',
copy_options=[
"CSV",
"IGNOREHEADER 1",
"DATEFORMAT 'YYYY-MM-DD'",
"TRUNCATECOLUMNS"
],
dag=dag
)
wait_for_data >> load_to_redshiftfrom airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
# Export PostgreSQL query results to S3
export_sales_data = SqlToS3Operator(
task_id='export_sales_data',
query="""
SELECT
date_trunc('day', order_date) as order_day,
customer_id,
product_id,
quantity,
price,
total_amount
FROM orders
WHERE order_date >= '{{ ds }}'
AND order_date < '{{ next_ds }}'
""",
s3_bucket='analytics-exports',
s3_key='exports/sales/{{ ds }}/daily_sales.parquet',
sql_conn_id='postgres_prod',
aws_conn_id='aws_default',
pd_kwargs={
'dtype': {
'customer_id': 'int64',
'product_id': 'int64',
'quantity': 'int32',
'price': 'float64'
}
},
dag=dag
)from airflow.providers.amazon.aws.transfers.ftp_to_s3 import FtpToS3Operator
from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
# Download files from multiple sources
ftp_download = FtpToS3Operator(
task_id='download_from_ftp',
ftp_path='/data/exports/customer_data.csv',
s3_bucket='integration-staging',
s3_key='sources/ftp/{{ ds }}/customer_data.csv',
ftp_conn_id='partner_ftp',
dag=dag
)
api_download = HttpToS3Operator(
task_id='download_from_api',
endpoint='/api/v1/products?date={{ ds }}',
s3_bucket='integration-staging',
s3_key='sources/api/{{ ds }}/products.json',
http_conn_id='partner_api',
headers={'Authorization': 'Bearer {{ var.value.api_token }}'},
dag=dag
)
# Load all data into warehouse
load_customers = S3ToRedshiftOperator(
task_id='load_customers',
schema='staging',
table='customers',
s3_bucket='integration-staging',
s3_key='sources/ftp/{{ ds }}/',
wildcard_match=True,
copy_options=["CSV", "IGNOREHEADER 1"],
dag=dag
)
# Parallel downloads, then load
[ftp_download, api_download] >> load_customersfrom airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_sftp import S3ToSftpOperator
# Export aggregated data from Redshift
export_summary = RedshiftToS3Operator(
task_id='export_summary',
select_query="""
SELECT
region,
product_category,
DATE_TRUNC('month', sale_date) as month,
SUM(revenue) as total_revenue,
COUNT(*) as transaction_count
FROM fact_sales
WHERE sale_date >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month')
AND sale_date < DATE_TRUNC('month', CURRENT_DATE)
GROUP BY region, product_category, month
""",
s3_bucket='analytics-exports',
s3_key='reports/monthly_summary_{{ ds }}.csv',
redshift_conn_id='redshift_prod',
unload_options=[
"HEADER",
"DELIMITER ','",
"NULL AS ''"
],
include_header=True,
dag=dag
)
# Send report to external partner
send_to_partner = S3ToSftpOperator(
task_id='send_to_partner',
s3_bucket='analytics-exports',
s3_key='reports/monthly_summary_{{ ds }}.csv000',
sftp_path='/uploads/monthly_summary_{{ ds }}.csv',
sftp_conn_id='partner_sftp',
dag=dag
)
export_summary >> send_to_partnerfrom airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDbToS3Operator
# Daily backup of DynamoDB table
backup_user_sessions = DynamoDbToS3Operator(
task_id='backup_user_sessions',
dynamodb_table_name='user_sessions',
s3_bucket_name='dynamodb-backups',
s3_key='backups/user_sessions/{{ ds }}/sessions.json',
aws_conn_id='aws_default',
dynamodb_scan_kwargs={
'FilterExpression': 'attribute_exists(session_id)',
'ProjectionExpression': 'session_id, user_id, created_at, last_active'
},
dag=dag
)# Transfer configuration types
class TransferConfig:
source_conn_id: str
dest_conn_id: str
batch_size: int = 1000
parallel: bool = False
verify_data: bool = True
# S3 transfer options
class S3TransferOptions:
multipart_threshold: int = 8 * 1024 * 1024 # 8MB
max_concurrency: int = 10
multipart_chunksize: int = 8 * 1024 * 1024
use_threads: bool = True
# Redshift COPY options
class RedshiftCopyOptions:
format: str = 'CSV'
delimiter: str = ','
quote_character: str = '"'
escape_character: str = None
null_as: str = ''
ignore_header: int = 0
date_format: str = 'auto'
time_format: str = 'auto'
ignore_blank_lines: bool = True
truncate_columns: bool = False
fill_record: bool = False
blanks_as_null: bool = True
empty_as_null: bool = True
explicit_ids: bool = False
acceptanydate: bool = False
acceptinvchars: str = None
max_error: int = 0
compupdate: bool = True
statupdate: bool = True
# File format types
class FileFormat:
CSV = 'csv'
JSON = 'json'
PARQUET = 'parquet'
AVRO = 'avro'
ORC = 'orc'
TSV = 'tsv'
# Compression types
class CompressionType:
NONE = None
GZIP = 'gzip'
BZIP2 = 'bz2'
LZOP = 'lzop'
ZSTD = 'zstd'
# Transfer status
class TransferStatus:
PENDING = 'pending'
RUNNING = 'running'
SUCCESS = 'success'
FAILED = 'failed'
CANCELLED = 'cancelled'Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon