PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
AWS IAM authentication support for RDS PostgreSQL and Amazon Redshift with automatic token management, cross-provider integration, and support for both standard Redshift clusters and Redshift Serverless workgroups.
Generate and use IAM authentication tokens for AWS RDS PostgreSQL and Redshift connections.
def get_iam_token(self, conn: Connection) -> tuple[str, str, int]:
"""
Get IAM authentication token for AWS RDS/Redshift connection.
Parameters:
- conn: Connection object with AWS configuration
Returns:
tuple[str, str, int]: (username, token, port)
Dependencies:
Requires apache-airflow-providers-amazon package
Raises:
AirflowOptionalProviderFeatureException: If Amazon provider not installed
"""Configure PostgreSQL connection for IAM authentication:
{
"iam": true,
"aws_conn_id": "aws_default"
}Configure connection for Amazon Redshift with IAM:
{
"iam": true,
"redshift": true,
"cluster-identifier": "my-redshift-cluster",
"aws_conn_id": "my_aws_conn"
}Configure connection for Redshift Serverless:
{
"iam": true,
"redshift-serverless": true,
"workgroup-name": "my-serverless-workgroup",
"aws_conn_id": "my_aws_conn"
}from airflow.providers.postgres.hooks.postgres import PostgresHook
# Connection configured with IAM authentication
hook = PostgresHook(postgres_conn_id="rds_postgres_iam")
# Hook automatically handles IAM token generation
records = hook.get_records("SELECT current_user, current_database()")# Redshift connection with IAM
hook = PostgresHook(postgres_conn_id="redshift_iam")
# Execute Redshift-specific queries
hook.run("""
COPY users FROM 's3://my-bucket/users.csv'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
CSV
""")# Redshift Serverless connection
hook = PostgresHook(postgres_conn_id="redshift_serverless_iam")
# Query Redshift Serverless
df = hook.get_df("""
SELECT
user_id,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM orders
WHERE date >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY user_id
""", df_type="pandas")RDS PostgreSQL:
mydb.cluster-xyz.us-east-1.rds.amazonaws.comRedshift:
my-cluster.xyz.us-east-1.redshift.amazonaws.comRedshift Serverless:
my-workgroup.123456789012.us-east-1.redshift-serverless.amazonaws.com{
"iam": true,
"redshift": false,
"redshift-serverless": false,
"cluster-identifier": "optional-override",
"workgroup-name": "optional-override",
"aws_conn_id": "aws_default",
"region_name": "us-east-1"
}# Production environment
prod_hook = PostgresHook(postgres_conn_id="prod_redshift")
# Staging environment in different region
staging_hook = PostgresHook(postgres_conn_id="staging_redshift")
# Cross-region data sync
prod_data = prod_hook.get_df("SELECT * FROM processed_data")
staging_hook.insert_rows(
"processed_data_copy",
prod_data.values.tolist(),
target_fields=list(prod_data.columns)
)def long_running_process():
"""Handle long-running processes with token refresh."""
hook = PostgresHook(postgres_conn_id="rds_iam")
# Process data in batches
for batch_id in range(100):
try:
# Each query gets fresh token automatically
process_batch(hook, batch_id)
except Exception as e:
if "token" in str(e).lower():
# Token expired - hook will refresh on next use
print(f"Token refresh needed for batch {batch_id}")
continue
raisefrom airflow.providers.amazon.aws.hooks.s3 import S3Hook
def etl_s3_to_redshift():
"""ETL pipeline using S3 and Redshift with IAM."""
# S3 operations
s3_hook = S3Hook(aws_conn_id="aws_default")
# Redshift operations with same AWS credentials
redshift_hook = PostgresHook(postgres_conn_id="redshift_iam")
# List S3 files
files = s3_hook.list_keys("my-bucket", prefix="data/")
# Process each file
for file_key in files:
redshift_hook.run(f"""
COPY staging_table FROM 's3://my-bucket/{file_key}'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
CSV IGNOREHEADER 1
""")IAM tokens are automatically cached and refreshed:
# Optimize Redshift queries
hook.run("SET enable_result_cache_for_session TO off") # For ETL
hook.run("SET query_group TO 'etl_jobs'") # For workload management
# Use COPY for large data loads
hook.run("""
COPY large_table FROM 's3://bucket/data/'
IAM_ROLE 'arn:aws:iam::account:role/RedshiftRole'
FORMAT AS PARQUET
""")def handle_iam_errors():
try:
hook = PostgresHook(postgres_conn_id="rds_iam")
records = hook.get_records("SELECT 1")
except Exception as e:
error_msg = str(e).lower()
if "amazon provider" in error_msg:
print("Install: pip install apache-airflow-providers-amazon")
elif "access denied" in error_msg:
print("Check IAM permissions for RDS/Redshift access")
elif "cluster" in error_msg and "not found" in error_msg:
print("Verify cluster-identifier in connection extras")
elif "workgroup" in error_msg:
print("Verify workgroup-name for Redshift Serverless")
else:
raisedef validate_aws_connection(conn_id):
"""Validate AWS IAM connection setup."""
try:
hook = PostgresHook(postgres_conn_id=conn_id)
# Test basic connectivity
result = hook.get_first("SELECT current_user, current_database()")
print(f"Connected as: {result[0]} to database: {result[1]}")
return True
except Exception as e:
print(f"Connection validation failed: {e}")
return False
# Validate connections
validate_aws_connection("rds_postgres_iam")
validate_aws_connection("redshift_iam"){
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"rds-db:connect"
],
"Resource": [
"arn:aws:rds-db:region:account:dbuser:db-instance-id/db-username"
]
}
]
}{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"redshift:GetClusterCredentials"
],
"Resource": [
"arn:aws:redshift:region:account:cluster:cluster-name",
"arn:aws:redshift:region:account:dbuser:cluster-name/username"
]
}
]
}{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"redshift-serverless:GetCredentials"
],
"Resource": [
"arn:aws:redshift-serverless:region:account:workgroup/workgroup-name"
]
}
]
}Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-postgres