CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

redshift-warehouse.mddocs/

Redshift Data Warehouse

Amazon Redshift integration for data warehouse operations including cluster management, SQL execution, and data loading. Supports both traditional Redshift connections and the modern Redshift Data API for serverless SQL execution.

Capabilities

Redshift SQL Hook

Hook for executing SQL operations against Redshift clusters using traditional database connections.

class RedshiftSqlHook(AwsBaseHook):
    def __init__(self, redshift_conn_id: str = 'redshift_default', **kwargs):
        """
        Initialize Redshift SQL Hook.
        
        Parameters:
        - redshift_conn_id: Redshift connection ID
        """

    def run(self, sql: str, autocommit: bool = False, parameters: dict = None, handler: callable = None) -> Any:
        """
        Execute SQL statement.
        
        Parameters:
        - sql: SQL statement to execute
        - autocommit: Enable autocommit mode
        - parameters: Query parameters
        - handler: Result handler function
        
        Returns:
        Query results
        """

    def get_records(self, sql: str, parameters: dict = None) -> list:
        """
        Execute SQL and return records.
        
        Parameters:
        - sql: SQL query to execute
        - parameters: Query parameters
        
        Returns:
        List of result records
        """

    def get_first(self, sql: str, parameters: dict = None) -> Any:
        """
        Execute SQL and return first result.
        
        Parameters:
        - sql: SQL query to execute
        - parameters: Query parameters
        
        Returns:
        First result record
        """

    def get_pandas_df(self, sql: str, parameters: dict = None, **kwargs) -> Any:
        """
        Execute SQL and return pandas DataFrame.
        
        Parameters:
        - sql: SQL query to execute
        - parameters: Query parameters
        
        Returns:
        pandas DataFrame with results
        """

    def test_connection(self) -> tuple:
        """
        Test Redshift connection.
        
        Returns:
        Connection test result tuple (success, message)
        """

Redshift Data Hook

Hook for serverless SQL execution using Redshift Data API.

class RedshiftDataHook(AwsBaseHook):
    def __init__(self, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Initialize Redshift Data Hook.
        
        Parameters:
        - aws_conn_id: AWS connection ID
        """

    def execute_query(self, database: str, sql: str, cluster_identifier: str = None, db_user: str = None, parameters: list = None, secret_arn: str = None, statement_name: str = None, with_event: bool = False, wait_for_completion: bool = True, poll_interval: int = 10) -> str:
        """
        Execute SQL using Redshift Data API.
        
        Parameters:
        - database: Database name
        - sql: SQL statement to execute
        - cluster_identifier: Redshift cluster identifier
        - db_user: Database user name
        - parameters: SQL parameters
        - secret_arn: AWS Secrets Manager ARN for credentials
        - statement_name: Statement name for identification
        - with_event: Enable event notifications
        - wait_for_completion: Wait for query completion
        - poll_interval: Polling interval in seconds
        
        Returns:
        Statement ID
        """

    def describe_statement(self, id: str) -> dict:
        """
        Get statement execution details.
        
        Parameters:
        - id: Statement ID
        
        Returns:
        Statement details
        """

    def get_statement_result(self, id: str, next_token: str = None) -> dict:
        """
        Get statement execution results.
        
        Parameters:
        - id: Statement ID
        - next_token: Pagination token
        
        Returns:
        Query results
        """

    def cancel_statement(self, id: str) -> bool:
        """
        Cancel running statement.
        
        Parameters:
        - id: Statement ID
        
        Returns:
        Cancellation success status
        """

    def list_statements(self, status: str = None, statement_name: str = None, max_items: int = 100, next_token: str = None) -> dict:
        """
        List executed statements.
        
        Parameters:
        - status: Filter by statement status
        - statement_name: Filter by statement name
        - max_items: Maximum items to return
        - next_token: Pagination token
        
        Returns:
        List of statements
        """

Redshift Cluster Hook

Hook for Redshift cluster lifecycle management.

class RedshiftHook(AwsBaseHook):
    def __init__(self, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Initialize Redshift Cluster Hook.
        
        Parameters:
        - aws_conn_id: AWS connection ID
        """

    def create_cluster(self, cluster_identifier: str, node_type: str, master_username: str, master_user_password: str, **kwargs) -> dict:
        """
        Create Redshift cluster.
        
        Parameters:
        - cluster_identifier: Unique cluster identifier
        - node_type: Node type (e.g., 'dc2.large')
        - master_username: Master username
        - master_user_password: Master password
        
        Returns:
        Cluster configuration
        """

    def delete_cluster(self, cluster_identifier: str, skip_final_cluster_snapshot: bool = False, final_cluster_snapshot_identifier: str = None) -> dict:
        """
        Delete Redshift cluster.
        
        Parameters:
        - cluster_identifier: Cluster identifier
        - skip_final_cluster_snapshot: Skip final snapshot
        - final_cluster_snapshot_identifier: Final snapshot identifier
        
        Returns:
        Deletion response
        """

    def describe_clusters(self, cluster_identifier: str = None) -> dict:
        """
        Describe Redshift clusters.
        
        Parameters:
        - cluster_identifier: Specific cluster identifier
        
        Returns:
        Cluster descriptions
        """

    def pause_cluster(self, cluster_identifier: str) -> dict:
        """
        Pause Redshift cluster.
        
        Parameters:
        - cluster_identifier: Cluster identifier
        
        Returns:
        Pause response
        """

    def resume_cluster(self, cluster_identifier: str) -> dict:
        """
        Resume paused Redshift cluster.
        
        Parameters:
        - cluster_identifier: Cluster identifier
        
        Returns:
        Resume response
        """

    def get_cluster_status(self, cluster_identifier: str) -> str:
        """
        Get cluster status.
        
        Parameters:
        - cluster_identifier: Cluster identifier
        
        Returns:
        Current cluster status
        """

Redshift Operators

Task implementations for Redshift operations.

class RedshiftSqlOperator(BaseOperator):
    def __init__(self, sql: str, redshift_conn_id: str = 'redshift_default', parameters: dict = None, autocommit: bool = True, **kwargs):
        """
        Execute SQL on Redshift cluster.
        
        Parameters:
        - sql: SQL statement to execute
        - redshift_conn_id: Redshift connection ID
        - parameters: SQL parameters
        - autocommit: Enable autocommit mode
        """

class RedshiftDataOperator(BaseOperator):
    def __init__(self, database: str, sql: str, cluster_identifier: str = None, db_user: str = None, secret_arn: str = None, statement_name: str = None, parameters: list = None, poll_interval: int = 10, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Execute SQL using Redshift Data API.
        
        Parameters:
        - database: Database name
        - sql: SQL statement to execute
        - cluster_identifier: Redshift cluster identifier
        - db_user: Database user name
        - secret_arn: AWS Secrets Manager ARN for credentials
        - statement_name: Statement name
        - parameters: SQL parameters
        - poll_interval: Polling interval in seconds
        - aws_conn_id: AWS connection ID
        """

class RedshiftCreateClusterOperator(BaseOperator):
    def __init__(self, cluster_identifier: str, node_type: str, master_username: str, master_user_password: str, publicly_accessible: bool = True, port: int = 5439, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Create Redshift cluster.
        
        Parameters:
        - cluster_identifier: Cluster identifier
        - node_type: Node type
        - master_username: Master username
        - master_user_password: Master password
        - publicly_accessible: Public accessibility
        - port: Database port
        - aws_conn_id: AWS connection ID
        """

class RedshiftDeleteClusterOperator(BaseOperator):
    def __init__(self, cluster_identifier: str, skip_final_cluster_snapshot: bool = False, final_cluster_snapshot_identifier: str = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Delete Redshift cluster.
        
        Parameters:
        - cluster_identifier: Cluster identifier
        - skip_final_cluster_snapshot: Skip final snapshot
        - final_cluster_snapshot_identifier: Final snapshot identifier
        - aws_conn_id: AWS connection ID
        """

class RedshiftPauseClusterOperator(BaseOperator):
    def __init__(self, cluster_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Pause Redshift cluster.
        
        Parameters:
        - cluster_identifier: Cluster identifier
        - aws_conn_id: AWS connection ID
        """

class RedshiftResumeClusterOperator(BaseOperator):
    def __init__(self, cluster_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Resume Redshift cluster.
        
        Parameters:
        - cluster_identifier: Cluster identifier
        - aws_conn_id: AWS connection ID
        """

Redshift Sensors

Monitoring tasks for Redshift cluster states and query execution.

class RedshiftClusterSensor(BaseSensorOperator):
    def __init__(self, cluster_identifier: str, target_status: str = 'available', aws_conn_id: str = 'aws_default', **kwargs):
        """
        Wait for Redshift cluster to reach target status.
        
        Parameters:
        - cluster_identifier: Cluster identifier
        - target_status: Target cluster status
        - aws_conn_id: AWS connection ID
        """

Usage Examples

Data Warehouse Operations

from airflow import DAG
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSqlOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator

dag = DAG('redshift_analytics', start_date=datetime(2023, 1, 1))

# Create staging tables
create_staging = RedshiftSqlOperator(
    task_id='create_staging_tables',
    redshift_conn_id='redshift_prod',
    sql="""
        CREATE TABLE IF NOT EXISTS staging.sales_data (
            transaction_id VARCHAR(50),
            customer_id INTEGER,
            product_id INTEGER,
            quantity INTEGER,
            price DECIMAL(10,2),
            transaction_date DATE,
            region VARCHAR(50)
        );
        
        TRUNCATE TABLE staging.sales_data;
    """,
    dag=dag
)

# Load data from S3
load_data = S3ToRedshiftOperator(
    task_id='load_from_s3',
    schema='staging',
    table='sales_data',
    s3_bucket='data-warehouse-staging',
    s3_key='sales/{{ ds }}/',
    redshift_conn_id='redshift_prod',
    copy_options=[
        "CSV",
        "IGNOREHEADER 1",
        "TIMEFORMAT 'YYYY-MM-DD'",
        "TRUNCATECOLUMNS"
    ],
    dag=dag
)

# Transform and load to production tables
transform_data = RedshiftSqlOperator(
    task_id='transform_and_load',
    redshift_conn_id='redshift_prod',
    sql="""
        -- Update dimension tables
        INSERT INTO dim_customers (customer_id, region)
        SELECT DISTINCT customer_id, region
        FROM staging.sales_data s
        WHERE NOT EXISTS (
            SELECT 1 FROM dim_customers d 
            WHERE d.customer_id = s.customer_id
        );
        
        -- Insert fact data
        INSERT INTO fact_sales (
            transaction_id, customer_id, product_id, 
            quantity, price, transaction_date
        )
        SELECT 
            transaction_id, customer_id, product_id,
            quantity, price, transaction_date
        FROM staging.sales_data;
        
        -- Update statistics
        ANALYZE fact_sales;
        ANALYZE dim_customers;
    """,
    dag=dag
)

create_staging >> load_data >> transform_data

Redshift Data API Usage

from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator

# Execute query using Data API
analyze_sales = RedshiftDataOperator(
    task_id='analyze_sales_data',
    database='analytics',
    cluster_identifier='analytics-cluster',
    sql="""
        SELECT 
            region,
            DATE_TRUNC('month', transaction_date) as month,
            SUM(quantity * price) as revenue,
            COUNT(*) as transaction_count
        FROM fact_sales 
        WHERE transaction_date >= CURRENT_DATE - INTERVAL '3 months'
        GROUP BY region, month
        ORDER BY region, month;
    """,
    statement_name='monthly_sales_analysis',
    aws_conn_id='aws_default',
    dag=dag
)

Cluster Lifecycle Management

from airflow.providers.amazon.aws.operators.redshift_cluster import (
    RedshiftCreateClusterOperator,
    RedshiftPauseClusterOperator,
    RedshiftResumeClusterOperator
)
from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor

# Resume cluster for processing
resume_cluster = RedshiftResumeClusterOperator(
    task_id='resume_cluster',
    cluster_identifier='analytics-cluster',
    dag=dag
)

# Wait for cluster to be available
wait_for_cluster = RedshiftClusterSensor(
    task_id='wait_for_available',
    cluster_identifier='analytics-cluster',
    target_status='available',
    timeout=1800,  # 30 minutes
    dag=dag
)

# Run analytics workload
run_analytics = RedshiftSqlOperator(
    task_id='run_analytics',
    sql='CALL analytics.run_monthly_reports();',
    redshift_conn_id='redshift_prod',
    dag=dag
)

# Pause cluster to save costs
pause_cluster = RedshiftPauseClusterOperator(
    task_id='pause_cluster',
    cluster_identifier='analytics-cluster',
    dag=dag
)

resume_cluster >> wait_for_cluster >> run_analytics >> pause_cluster

Types

# Redshift cluster states
class RedshiftClusterState:
    AVAILABLE = 'available'
    CREATING = 'creating'
    DELETING = 'deleting'
    FINAL_SNAPSHOT = 'final-snapshot'
    HARDWARE_FAILURE = 'hardware-failure'
    INCOMPATIBLE_HSMS = 'incompatible-hsm'
    INCOMPATIBLE_NETWORK = 'incompatible-network'
    INCOMPATIBLE_PARAMETERS = 'incompatible-parameters'
    INCOMPATIBLE_RESTORE = 'incompatible-restore'
    MODIFYING = 'modifying'
    PAUSED = 'paused'
    REBOOTING = 'rebooting'
    RENAMING = 'renaming'
    RESIZING = 'resizing'
    ROTATING_KEYS = 'rotating-keys'
    STORAGE_FULL = 'storage-full'
    UPDATING_HSMS = 'updating-hsms'

# Redshift node types
class RedshiftNodeType:
    DC2_LARGE = 'dc2.large'
    DC2_8XLARGE = 'dc2.8xlarge'
    DS2_XLARGE = 'ds2.xlarge'
    DS2_8XLARGE = 'ds2.8xlarge'
    RA3_XLPLUS = 'ra3.xlplus'
    RA3_4XLARGE = 'ra3.4xlarge'
    RA3_16XLARGE = 'ra3.16xlarge'

# Statement status for Data API
class StatementStatus:
    SUBMITTED = 'SUBMITTED'
    PICKED = 'PICKED'
    STARTED = 'STARTED'
    FINISHED = 'FINISHED'
    ABORTED = 'ABORTED'
    FAILED = 'FAILED'

# Cluster configuration
class RedshiftClusterConfig:
    cluster_identifier: str
    node_type: str
    master_username: str
    db_name: str = None
    port: int = 5439
    cluster_type: str = 'multi-node'
    number_of_nodes: int = 1
    publicly_accessible: bool = True
    encrypted: bool = False
    hsm_client_certificate_identifier: str = None
    hsm_configuration_identifier: str = None
    elastic_ip: str = None
    cluster_parameter_group_name: str = None
    cluster_subnet_group_name: str = None
    availability_zone: str = None
    preferred_maintenance_window: str = None
    cluster_version: str = None
    allow_version_upgrade: bool = True
    automated_snapshot_retention_period: int = 1
    manual_snapshot_retention_period: int = None
    snapshot_identifier: str = None
    snapshot_cluster_identifier: str = None
    owner_account: str = None
    additional_info: str = None
    kms_key_id: str = None
    enhanced_vpc_routing: bool = False
    iam_roles: list = None
    maintenance_track_name: str = None
    snapshot_schedule_identifier: str = None
    aqua_configuration_status: str = None
    default_iam_role_arn: str = None
    tags: list = None

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json