CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-aws

Package for AWS-specific Dagster framework solid and resource components.

Pending
Overview
Eval results
Files

redshift-integration.mddocs/

Redshift Data Warehousing

Connect to and execute queries against Amazon Redshift clusters with connection pooling, query optimization, and comprehensive error handling for data warehousing operations.

Capabilities

Redshift Resource

Core Redshift resource providing configured database connections and query execution capabilities.

class RedshiftResource(ResourceWithBoto3Configuration):
    """
    Resource for connecting to Amazon Redshift clusters.
    """
    host: str
    port: int = 5439
    database: str
    user: str
    password: str
    
    def get_connection(self):
        """
        Get database connection to Redshift cluster.
        
        Returns:
            Connection: Database connection object
        """
    
    def execute_query(self, query: str) -> List[Dict]:
        """
        Execute SQL query against Redshift.
        
        Parameters:
            query: SQL query to execute
            
        Returns:
            List[Dict]: Query results as list of dictionaries
        """

def redshift_resource(**kwargs) -> RedshiftResource:
    """
    Factory function for Redshift resource.
    
    Returns:
        ResourceDefinition: Configured Redshift resource
    """

Redshift Client Integration

Direct Redshift client integration for advanced cluster management and query operations.

class RedshiftClient:
    """
    Client wrapper for Redshift operations.
    """
    
    def __init__(self, **kwargs): ...
    
    def execute_query(self, query: str): ...
    def get_cluster_credentials(self, cluster_identifier: str): ...

class RedshiftClientResource(RedshiftClient, ConfigurableResource):
    """
    Configurable Redshift client resource.
    """

Error Handling

class RedshiftError(Exception):
    """
    Exception raised for Redshift-related errors.
    """
    
    def __init__(self, message: str, query: Optional[str] = None): ...

Testing Utilities

class FakeRedshiftClient:
    """
    Mock Redshift client for testing.
    """

class FakeRedshiftClientResource(FakeRedshiftClient, ConfigurableResource): ...
class FakeRedshiftResource(ConfigurableResource): ...

def fake_redshift_resource(**kwargs): ...

Usage Examples

from dagster import op, job, Definitions
from dagster_aws.redshift import RedshiftResource

@op(required_resource_keys={"redshift"})
def query_sales_data(context):
    redshift = context.resources.redshift
    
    query = """
    SELECT product_id, SUM(sales_amount) as total_sales
    FROM sales_table 
    WHERE sale_date >= CURRENT_DATE - 30
    GROUP BY product_id
    ORDER BY total_sales DESC
    LIMIT 10
    """
    
    results = redshift.execute_query(query)
    context.log.info(f"Retrieved {len(results)} top products")
    return results

@job(
    resource_defs={
        "redshift": RedshiftResource(
            host="my-cluster.redshift.amazonaws.com",
            database="analytics",
            user="dagster_user",
            password="secure_password"
        )
    }
)
def sales_analysis_job():
    query_sales_data()

defs = Definitions(jobs=[sales_analysis_job])

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-aws

docs

athena-queries.md

cloudwatch-logging.md

ecr-integration.md

ecs-orchestration.md

emr-processing.md

index.md

parameter-store.md

pipes-orchestration.md

rds-operations.md

redshift-integration.md

s3-storage.md

secrets-management.md

tile.json