CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-postgres

PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

aws-integration.mddocs/

AWS Integration and Authentication

AWS IAM authentication support for RDS PostgreSQL and Amazon Redshift with automatic token management, cross-provider integration, and support for both standard Redshift clusters and Redshift Serverless workgroups.

Capabilities

IAM Token Authentication

Generate and use IAM authentication tokens for AWS RDS PostgreSQL and Redshift connections.

def get_iam_token(self, conn: Connection) -> tuple[str, str, int]:
    """
    Get IAM authentication token for AWS RDS/Redshift connection.
    
    Parameters:
    - conn: Connection object with AWS configuration
    
    Returns:
    tuple[str, str, int]: (username, token, port)
    
    Dependencies:
    Requires apache-airflow-providers-amazon package
    
    Raises:
    AirflowOptionalProviderFeatureException: If Amazon provider not installed
    """

AWS Connection Configuration

Basic IAM Authentication

Configure PostgreSQL connection for IAM authentication:

{
  "iam": true,
  "aws_conn_id": "aws_default"
}

Redshift Configuration

Configure connection for Amazon Redshift with IAM:

{
  "iam": true,
  "redshift": true,
  "cluster-identifier": "my-redshift-cluster",
  "aws_conn_id": "my_aws_conn"
}

Redshift Serverless Configuration

Configure connection for Redshift Serverless:

{
  "iam": true,
  "redshift-serverless": true,
  "workgroup-name": "my-serverless-workgroup",
  "aws_conn_id": "my_aws_conn"
}

Usage Examples

RDS PostgreSQL with IAM

from airflow.providers.postgres.hooks.postgres import PostgresHook

# Connection configured with IAM authentication
hook = PostgresHook(postgres_conn_id="rds_postgres_iam")

# Hook automatically handles IAM token generation
records = hook.get_records("SELECT current_user, current_database()")

Redshift with IAM

# Redshift connection with IAM
hook = PostgresHook(postgres_conn_id="redshift_iam")

# Execute Redshift-specific queries
hook.run("""
    COPY users FROM 's3://my-bucket/users.csv'
    IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
    CSV
""")

Redshift Serverless with IAM

# Redshift Serverless connection
hook = PostgresHook(postgres_conn_id="redshift_serverless_iam")

# Query Redshift Serverless
df = hook.get_df("""
    SELECT 
        user_id,
        COUNT(*) as order_count,
        SUM(amount) as total_amount
    FROM orders 
    WHERE date >= CURRENT_DATE - INTERVAL '30 days'
    GROUP BY user_id
""", df_type="pandas")

Connection Configuration Details

Connection Host Format

RDS PostgreSQL:

mydb.cluster-xyz.us-east-1.rds.amazonaws.com

Redshift:

my-cluster.xyz.us-east-1.redshift.amazonaws.com

Redshift Serverless:

my-workgroup.123456789012.us-east-1.redshift-serverless.amazonaws.com

Extra Parameters

{
  "iam": true,
  "redshift": false,
  "redshift-serverless": false,
  "cluster-identifier": "optional-override",
  "workgroup-name": "optional-override",
  "aws_conn_id": "aws_default",
  "region_name": "us-east-1"
}

Parameter Details

  • iam (bool): Enable IAM authentication
  • redshift (bool): Enable Redshift-specific features
  • redshift-serverless (bool): Enable Redshift Serverless support
  • cluster-identifier (str): Override cluster ID from hostname
  • workgroup-name (str): Override workgroup name from hostname
  • aws_conn_id (str): AWS connection ID for IAM credentials
  • region_name (str): AWS region override

Advanced Configuration

Multi-Region Setup

# Production environment
prod_hook = PostgresHook(postgres_conn_id="prod_redshift")

# Staging environment in different region
staging_hook = PostgresHook(postgres_conn_id="staging_redshift")

# Cross-region data sync
prod_data = prod_hook.get_df("SELECT * FROM processed_data")
staging_hook.insert_rows(
    "processed_data_copy",
    prod_data.values.tolist(),
    target_fields=list(prod_data.columns)
)

Automatic Token Refresh

def long_running_process():
    """Handle long-running processes with token refresh."""
    hook = PostgresHook(postgres_conn_id="rds_iam")
    
    # Process data in batches
    for batch_id in range(100):
        try:
            # Each query gets fresh token automatically
            process_batch(hook, batch_id)
        except Exception as e:
            if "token" in str(e).lower():
                # Token expired - hook will refresh on next use
                print(f"Token refresh needed for batch {batch_id}")
                continue
            raise

Cross-Provider Integration

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

def etl_s3_to_redshift():
    """ETL pipeline using S3 and Redshift with IAM."""
    
    # S3 operations
    s3_hook = S3Hook(aws_conn_id="aws_default")
    
    # Redshift operations with same AWS credentials
    redshift_hook = PostgresHook(postgres_conn_id="redshift_iam")
    
    # List S3 files
    files = s3_hook.list_keys("my-bucket", prefix="data/")
    
    # Process each file
    for file_key in files:
        redshift_hook.run(f"""
            COPY staging_table FROM 's3://my-bucket/{file_key}'
            IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
            CSV IGNOREHEADER 1
        """)

Performance Considerations

Token Caching

IAM tokens are automatically cached and refreshed:

  • Token Lifetime: 15 minutes (AWS default)
  • Automatic Refresh: Handled transparently by the hook
  • Connection Pooling: Tokens shared across connection pool

Redshift Optimization

# Optimize Redshift queries
hook.run("SET enable_result_cache_for_session TO off")  # For ETL
hook.run("SET query_group TO 'etl_jobs'")              # For workload management

# Use COPY for large data loads
hook.run("""
    COPY large_table FROM 's3://bucket/data/'
    IAM_ROLE 'arn:aws:iam::account:role/RedshiftRole'
    FORMAT AS PARQUET
""")

Error Handling

Common IAM Authentication Errors

def handle_iam_errors():
    try:
        hook = PostgresHook(postgres_conn_id="rds_iam")
        records = hook.get_records("SELECT 1")
    except Exception as e:
        error_msg = str(e).lower()
        
        if "amazon provider" in error_msg:
            print("Install: pip install apache-airflow-providers-amazon")
        elif "access denied" in error_msg:
            print("Check IAM permissions for RDS/Redshift access")
        elif "cluster" in error_msg and "not found" in error_msg:
            print("Verify cluster-identifier in connection extras")
        elif "workgroup" in error_msg:
            print("Verify workgroup-name for Redshift Serverless")
        else:
            raise

Connection Validation

def validate_aws_connection(conn_id):
    """Validate AWS IAM connection setup."""
    try:
        hook = PostgresHook(postgres_conn_id=conn_id)
        
        # Test basic connectivity
        result = hook.get_first("SELECT current_user, current_database()")
        print(f"Connected as: {result[0]} to database: {result[1]}")
        
        return True
    except Exception as e:
        print(f"Connection validation failed: {e}")
        return False

# Validate connections
validate_aws_connection("rds_postgres_iam")
validate_aws_connection("redshift_iam")

IAM Permissions Required

RDS PostgreSQL

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "rds-db:connect"
      ],
      "Resource": [
        "arn:aws:rds-db:region:account:dbuser:db-instance-id/db-username"
      ]
    }
  ]
}

Redshift

{
  "Version": "2012-10-17", 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "redshift:GetClusterCredentials"
      ],
      "Resource": [
        "arn:aws:redshift:region:account:cluster:cluster-name",
        "arn:aws:redshift:region:account:dbuser:cluster-name/username"
      ]
    }
  ]
}

Redshift Serverless

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow", 
      "Action": [
        "redshift-serverless:GetCredentials"
      ],
      "Resource": [
        "arn:aws:redshift-serverless:region:account:workgroup/workgroup-name"
      ]
    }
  ]
}

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-postgres

docs

asset-management.md

aws-integration.md

bulk-operations.md

data-retrieval.md

database-connection.md

index.md

openlineage-integration.md

schema-operations.md

sql-dialect.md

tile.json