Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Amazon Redshift integration for data warehouse operations including cluster management, SQL execution, and data loading. Supports both traditional Redshift connections and the modern Redshift Data API for serverless SQL execution.
Hook for executing SQL operations against Redshift clusters using traditional database connections.
class RedshiftSqlHook(AwsBaseHook):
def __init__(self, redshift_conn_id: str = 'redshift_default', **kwargs):
"""
Initialize Redshift SQL Hook.
Parameters:
- redshift_conn_id: Redshift connection ID
"""
def run(self, sql: str, autocommit: bool = False, parameters: dict = None, handler: callable = None) -> Any:
"""
Execute SQL statement.
Parameters:
- sql: SQL statement to execute
- autocommit: Enable autocommit mode
- parameters: Query parameters
- handler: Result handler function
Returns:
Query results
"""
def get_records(self, sql: str, parameters: dict = None) -> list:
"""
Execute SQL and return records.
Parameters:
- sql: SQL query to execute
- parameters: Query parameters
Returns:
List of result records
"""
def get_first(self, sql: str, parameters: dict = None) -> Any:
"""
Execute SQL and return first result.
Parameters:
- sql: SQL query to execute
- parameters: Query parameters
Returns:
First result record
"""
def get_pandas_df(self, sql: str, parameters: dict = None, **kwargs) -> Any:
"""
Execute SQL and return pandas DataFrame.
Parameters:
- sql: SQL query to execute
- parameters: Query parameters
Returns:
pandas DataFrame with results
"""
def test_connection(self) -> tuple:
"""
Test Redshift connection.
Returns:
Connection test result tuple (success, message)
"""Hook for serverless SQL execution using Redshift Data API.
class RedshiftDataHook(AwsBaseHook):
def __init__(self, aws_conn_id: str = 'aws_default', **kwargs):
"""
Initialize Redshift Data Hook.
Parameters:
- aws_conn_id: AWS connection ID
"""
def execute_query(self, database: str, sql: str, cluster_identifier: str = None, db_user: str = None, parameters: list = None, secret_arn: str = None, statement_name: str = None, with_event: bool = False, wait_for_completion: bool = True, poll_interval: int = 10) -> str:
"""
Execute SQL using Redshift Data API.
Parameters:
- database: Database name
- sql: SQL statement to execute
- cluster_identifier: Redshift cluster identifier
- db_user: Database user name
- parameters: SQL parameters
- secret_arn: AWS Secrets Manager ARN for credentials
- statement_name: Statement name for identification
- with_event: Enable event notifications
- wait_for_completion: Wait for query completion
- poll_interval: Polling interval in seconds
Returns:
Statement ID
"""
def describe_statement(self, id: str) -> dict:
"""
Get statement execution details.
Parameters:
- id: Statement ID
Returns:
Statement details
"""
def get_statement_result(self, id: str, next_token: str = None) -> dict:
"""
Get statement execution results.
Parameters:
- id: Statement ID
- next_token: Pagination token
Returns:
Query results
"""
def cancel_statement(self, id: str) -> bool:
"""
Cancel running statement.
Parameters:
- id: Statement ID
Returns:
Cancellation success status
"""
def list_statements(self, status: str = None, statement_name: str = None, max_items: int = 100, next_token: str = None) -> dict:
"""
List executed statements.
Parameters:
- status: Filter by statement status
- statement_name: Filter by statement name
- max_items: Maximum items to return
- next_token: Pagination token
Returns:
List of statements
"""Hook for Redshift cluster lifecycle management.
class RedshiftHook(AwsBaseHook):
def __init__(self, aws_conn_id: str = 'aws_default', **kwargs):
"""
Initialize Redshift Cluster Hook.
Parameters:
- aws_conn_id: AWS connection ID
"""
def create_cluster(self, cluster_identifier: str, node_type: str, master_username: str, master_user_password: str, **kwargs) -> dict:
"""
Create Redshift cluster.
Parameters:
- cluster_identifier: Unique cluster identifier
- node_type: Node type (e.g., 'dc2.large')
- master_username: Master username
- master_user_password: Master password
Returns:
Cluster configuration
"""
def delete_cluster(self, cluster_identifier: str, skip_final_cluster_snapshot: bool = False, final_cluster_snapshot_identifier: str = None) -> dict:
"""
Delete Redshift cluster.
Parameters:
- cluster_identifier: Cluster identifier
- skip_final_cluster_snapshot: Skip final snapshot
- final_cluster_snapshot_identifier: Final snapshot identifier
Returns:
Deletion response
"""
def describe_clusters(self, cluster_identifier: str = None) -> dict:
"""
Describe Redshift clusters.
Parameters:
- cluster_identifier: Specific cluster identifier
Returns:
Cluster descriptions
"""
def pause_cluster(self, cluster_identifier: str) -> dict:
"""
Pause Redshift cluster.
Parameters:
- cluster_identifier: Cluster identifier
Returns:
Pause response
"""
def resume_cluster(self, cluster_identifier: str) -> dict:
"""
Resume paused Redshift cluster.
Parameters:
- cluster_identifier: Cluster identifier
Returns:
Resume response
"""
def get_cluster_status(self, cluster_identifier: str) -> str:
"""
Get cluster status.
Parameters:
- cluster_identifier: Cluster identifier
Returns:
Current cluster status
"""Task implementations for Redshift operations.
class RedshiftSqlOperator(BaseOperator):
def __init__(self, sql: str, redshift_conn_id: str = 'redshift_default', parameters: dict = None, autocommit: bool = True, **kwargs):
"""
Execute SQL on Redshift cluster.
Parameters:
- sql: SQL statement to execute
- redshift_conn_id: Redshift connection ID
- parameters: SQL parameters
- autocommit: Enable autocommit mode
"""
class RedshiftDataOperator(BaseOperator):
def __init__(self, database: str, sql: str, cluster_identifier: str = None, db_user: str = None, secret_arn: str = None, statement_name: str = None, parameters: list = None, poll_interval: int = 10, aws_conn_id: str = 'aws_default', **kwargs):
"""
Execute SQL using Redshift Data API.
Parameters:
- database: Database name
- sql: SQL statement to execute
- cluster_identifier: Redshift cluster identifier
- db_user: Database user name
- secret_arn: AWS Secrets Manager ARN for credentials
- statement_name: Statement name
- parameters: SQL parameters
- poll_interval: Polling interval in seconds
- aws_conn_id: AWS connection ID
"""
class RedshiftCreateClusterOperator(BaseOperator):
def __init__(self, cluster_identifier: str, node_type: str, master_username: str, master_user_password: str, publicly_accessible: bool = True, port: int = 5439, aws_conn_id: str = 'aws_default', **kwargs):
"""
Create Redshift cluster.
Parameters:
- cluster_identifier: Cluster identifier
- node_type: Node type
- master_username: Master username
- master_user_password: Master password
- publicly_accessible: Public accessibility
- port: Database port
- aws_conn_id: AWS connection ID
"""
class RedshiftDeleteClusterOperator(BaseOperator):
def __init__(self, cluster_identifier: str, skip_final_cluster_snapshot: bool = False, final_cluster_snapshot_identifier: str = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Delete Redshift cluster.
Parameters:
- cluster_identifier: Cluster identifier
- skip_final_cluster_snapshot: Skip final snapshot
- final_cluster_snapshot_identifier: Final snapshot identifier
- aws_conn_id: AWS connection ID
"""
class RedshiftPauseClusterOperator(BaseOperator):
def __init__(self, cluster_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):
"""
Pause Redshift cluster.
Parameters:
- cluster_identifier: Cluster identifier
- aws_conn_id: AWS connection ID
"""
class RedshiftResumeClusterOperator(BaseOperator):
def __init__(self, cluster_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):
"""
Resume Redshift cluster.
Parameters:
- cluster_identifier: Cluster identifier
- aws_conn_id: AWS connection ID
"""Monitoring tasks for Redshift cluster states and query execution.
class RedshiftClusterSensor(BaseSensorOperator):
def __init__(self, cluster_identifier: str, target_status: str = 'available', aws_conn_id: str = 'aws_default', **kwargs):
"""
Wait for Redshift cluster to reach target status.
Parameters:
- cluster_identifier: Cluster identifier
- target_status: Target cluster status
- aws_conn_id: AWS connection ID
"""from airflow import DAG
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSqlOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
dag = DAG('redshift_analytics', start_date=datetime(2023, 1, 1))
# Create staging tables
create_staging = RedshiftSqlOperator(
task_id='create_staging_tables',
redshift_conn_id='redshift_prod',
sql="""
CREATE TABLE IF NOT EXISTS staging.sales_data (
transaction_id VARCHAR(50),
customer_id INTEGER,
product_id INTEGER,
quantity INTEGER,
price DECIMAL(10,2),
transaction_date DATE,
region VARCHAR(50)
);
TRUNCATE TABLE staging.sales_data;
""",
dag=dag
)
# Load data from S3
load_data = S3ToRedshiftOperator(
task_id='load_from_s3',
schema='staging',
table='sales_data',
s3_bucket='data-warehouse-staging',
s3_key='sales/{{ ds }}/',
redshift_conn_id='redshift_prod',
copy_options=[
"CSV",
"IGNOREHEADER 1",
"TIMEFORMAT 'YYYY-MM-DD'",
"TRUNCATECOLUMNS"
],
dag=dag
)
# Transform and load to production tables
transform_data = RedshiftSqlOperator(
task_id='transform_and_load',
redshift_conn_id='redshift_prod',
sql="""
-- Update dimension tables
INSERT INTO dim_customers (customer_id, region)
SELECT DISTINCT customer_id, region
FROM staging.sales_data s
WHERE NOT EXISTS (
SELECT 1 FROM dim_customers d
WHERE d.customer_id = s.customer_id
);
-- Insert fact data
INSERT INTO fact_sales (
transaction_id, customer_id, product_id,
quantity, price, transaction_date
)
SELECT
transaction_id, customer_id, product_id,
quantity, price, transaction_date
FROM staging.sales_data;
-- Update statistics
ANALYZE fact_sales;
ANALYZE dim_customers;
""",
dag=dag
)
create_staging >> load_data >> transform_datafrom airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
# Execute query using Data API
analyze_sales = RedshiftDataOperator(
task_id='analyze_sales_data',
database='analytics',
cluster_identifier='analytics-cluster',
sql="""
SELECT
region,
DATE_TRUNC('month', transaction_date) as month,
SUM(quantity * price) as revenue,
COUNT(*) as transaction_count
FROM fact_sales
WHERE transaction_date >= CURRENT_DATE - INTERVAL '3 months'
GROUP BY region, month
ORDER BY region, month;
""",
statement_name='monthly_sales_analysis',
aws_conn_id='aws_default',
dag=dag
)from airflow.providers.amazon.aws.operators.redshift_cluster import (
RedshiftCreateClusterOperator,
RedshiftPauseClusterOperator,
RedshiftResumeClusterOperator
)
from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor
# Resume cluster for processing
resume_cluster = RedshiftResumeClusterOperator(
task_id='resume_cluster',
cluster_identifier='analytics-cluster',
dag=dag
)
# Wait for cluster to be available
wait_for_cluster = RedshiftClusterSensor(
task_id='wait_for_available',
cluster_identifier='analytics-cluster',
target_status='available',
timeout=1800, # 30 minutes
dag=dag
)
# Run analytics workload
run_analytics = RedshiftSqlOperator(
task_id='run_analytics',
sql='CALL analytics.run_monthly_reports();',
redshift_conn_id='redshift_prod',
dag=dag
)
# Pause cluster to save costs
pause_cluster = RedshiftPauseClusterOperator(
task_id='pause_cluster',
cluster_identifier='analytics-cluster',
dag=dag
)
resume_cluster >> wait_for_cluster >> run_analytics >> pause_cluster# Redshift cluster states
class RedshiftClusterState:
AVAILABLE = 'available'
CREATING = 'creating'
DELETING = 'deleting'
FINAL_SNAPSHOT = 'final-snapshot'
HARDWARE_FAILURE = 'hardware-failure'
INCOMPATIBLE_HSMS = 'incompatible-hsm'
INCOMPATIBLE_NETWORK = 'incompatible-network'
INCOMPATIBLE_PARAMETERS = 'incompatible-parameters'
INCOMPATIBLE_RESTORE = 'incompatible-restore'
MODIFYING = 'modifying'
PAUSED = 'paused'
REBOOTING = 'rebooting'
RENAMING = 'renaming'
RESIZING = 'resizing'
ROTATING_KEYS = 'rotating-keys'
STORAGE_FULL = 'storage-full'
UPDATING_HSMS = 'updating-hsms'
# Redshift node types
class RedshiftNodeType:
DC2_LARGE = 'dc2.large'
DC2_8XLARGE = 'dc2.8xlarge'
DS2_XLARGE = 'ds2.xlarge'
DS2_8XLARGE = 'ds2.8xlarge'
RA3_XLPLUS = 'ra3.xlplus'
RA3_4XLARGE = 'ra3.4xlarge'
RA3_16XLARGE = 'ra3.16xlarge'
# Statement status for Data API
class StatementStatus:
SUBMITTED = 'SUBMITTED'
PICKED = 'PICKED'
STARTED = 'STARTED'
FINISHED = 'FINISHED'
ABORTED = 'ABORTED'
FAILED = 'FAILED'
# Cluster configuration
class RedshiftClusterConfig:
cluster_identifier: str
node_type: str
master_username: str
db_name: str = None
port: int = 5439
cluster_type: str = 'multi-node'
number_of_nodes: int = 1
publicly_accessible: bool = True
encrypted: bool = False
hsm_client_certificate_identifier: str = None
hsm_configuration_identifier: str = None
elastic_ip: str = None
cluster_parameter_group_name: str = None
cluster_subnet_group_name: str = None
availability_zone: str = None
preferred_maintenance_window: str = None
cluster_version: str = None
allow_version_upgrade: bool = True
automated_snapshot_retention_period: int = 1
manual_snapshot_retention_period: int = None
snapshot_identifier: str = None
snapshot_cluster_identifier: str = None
owner_account: str = None
additional_info: str = None
kms_key_id: str = None
enhanced_vpc_routing: bool = False
iam_roles: list = None
maintenance_track_name: str = None
snapshot_schedule_identifier: str = None
aqua_configuration_status: str = None
default_iam_role_arn: str = None
tags: list = NoneInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon