Package for AWS-specific Dagster framework solid and resource components.
—
Connect to and execute queries against Amazon Redshift clusters with connection pooling, query optimization, and comprehensive error handling for data warehousing operations.
Core Redshift resource providing configured database connections and query execution capabilities.
class RedshiftResource(ResourceWithBoto3Configuration):
"""
Resource for connecting to Amazon Redshift clusters.
"""
host: str
port: int = 5439
database: str
user: str
password: str
def get_connection(self):
"""
Get database connection to Redshift cluster.
Returns:
Connection: Database connection object
"""
def execute_query(self, query: str) -> List[Dict]:
"""
Execute SQL query against Redshift.
Parameters:
query: SQL query to execute
Returns:
List[Dict]: Query results as list of dictionaries
"""
def redshift_resource(**kwargs) -> RedshiftResource:
"""
Factory function for Redshift resource.
Returns:
ResourceDefinition: Configured Redshift resource
"""Direct Redshift client integration for advanced cluster management and query operations.
class RedshiftClient:
"""
Client wrapper for Redshift operations.
"""
def __init__(self, **kwargs): ...
def execute_query(self, query: str): ...
def get_cluster_credentials(self, cluster_identifier: str): ...
class RedshiftClientResource(RedshiftClient, ConfigurableResource):
"""
Configurable Redshift client resource.
"""class RedshiftError(Exception):
"""
Exception raised for Redshift-related errors.
"""
def __init__(self, message: str, query: Optional[str] = None): ...class FakeRedshiftClient:
"""
Mock Redshift client for testing.
"""
class FakeRedshiftClientResource(FakeRedshiftClient, ConfigurableResource): ...
class FakeRedshiftResource(ConfigurableResource): ...
def fake_redshift_resource(**kwargs): ...from dagster import op, job, Definitions
from dagster_aws.redshift import RedshiftResource
@op(required_resource_keys={"redshift"})
def query_sales_data(context):
redshift = context.resources.redshift
query = """
SELECT product_id, SUM(sales_amount) as total_sales
FROM sales_table
WHERE sale_date >= CURRENT_DATE - 30
GROUP BY product_id
ORDER BY total_sales DESC
LIMIT 10
"""
results = redshift.execute_query(query)
context.log.info(f"Retrieved {len(results)} top products")
return results
@job(
resource_defs={
"redshift": RedshiftResource(
host="my-cluster.redshift.amazonaws.com",
database="analytics",
user="dagster_user",
password="secure_password"
)
}
)
def sales_analysis_job():
query_sales_data()
defs = Definitions(jobs=[sales_analysis_job])Install with Tessl CLI
npx tessl i tessl/pypi-dagster-aws