CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-aws

Package for AWS-specific Dagster framework solid and resource components.

Pending
Overview
Eval results
Files

athena-queries.mddocs/

Athena Query Service

Execute serverless SQL queries against data in S3 using Amazon Athena with result management, query optimization, and comprehensive error handling.

Capabilities

Athena Resource

class AthenaResource(ResourceWithBoto3Configuration):
    """
    Resource for executing queries with Amazon Athena.
    """
    work_group: str = "primary"
    s3_staging_dir: str
    
    def execute_query(
        self, 
        query: str,
        result_configuration: Optional[Dict] = None
    ) -> str:
        """
        Execute SQL query using Athena.
        
        Parameters:
            query: SQL query to execute
            result_configuration: Query result configuration
            
        Returns:
            str: Query execution ID
        """
    
    def get_query_results(self, execution_id: str) -> Dict:
        """
        Retrieve results for a completed query.
        
        Parameters:
            execution_id: Query execution ID
            
        Returns:
            Dict: Query results and metadata
        """
    
    def wait_for_query_completion(
        self, 
        execution_id: str,
        timeout_seconds: int = 300
    ) -> bool:
        """
        Wait for query to complete execution.
        
        Parameters:
            execution_id: Query execution ID
            timeout_seconds: Maximum wait time
            
        Returns:
            bool: True if completed successfully
        """

def athena_resource(**kwargs) -> AthenaResource:
    """
    Factory function for Athena resource.
    """

Athena Client

class AthenaClient:
    """
    Direct Athena client wrapper.
    """
    
    def start_query_execution(self, **kwargs): ...
    def get_query_execution(self, execution_id: str): ...
    def get_query_results(self, execution_id: str): ...

class AthenaClientResource(AthenaClient, ConfigurableResource): ...

Error Handling

class AthenaError(Exception):
    """
    Exception for Athena-related errors.
    """

class AthenaTimeout(Exception):
    """
    Exception for Athena query timeouts.
    """

Testing Utilities

class FakeAthenaClient: ...
class FakeAthenaResource: ...
def fake_athena_resource(**kwargs): ...

Usage Examples

from dagster import op, job, Definitions
from dagster_aws.athena import AthenaResource, AthenaTimeout

@op(required_resource_keys={"athena"})
def analyze_web_logs(context):
    athena = context.resources.athena
    
    query = """
    SELECT 
        date(timestamp) as date,
        count(*) as page_views,
        count(distinct user_id) as unique_visitors
    FROM web_logs 
    WHERE timestamp >= current_date - interval '7' day
    GROUP BY date(timestamp)
    ORDER BY date
    """
    
    try:
        execution_id = athena.execute_query(query)
        
        if athena.wait_for_query_completion(execution_id, timeout_seconds=120):
            results = athena.get_query_results(execution_id)
            return results['ResultSet']['Rows']
        else:
            raise AthenaTimeout("Query execution timed out")
            
    except Exception as e:
        context.log.error(f"Athena query failed: {e}")
        raise

@job(
    resource_defs={
        "athena": AthenaResource(
            s3_staging_dir="s3://my-athena-results/",
            work_group="analytics"
        )
    }
)
def web_analytics_job():
    analyze_web_logs()

defs = Definitions(jobs=[web_analytics_job])

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-aws

docs

athena-queries.md

cloudwatch-logging.md

ecr-integration.md

ecs-orchestration.md

emr-processing.md

index.md

parameter-store.md

pipes-orchestration.md

rds-operations.md

redshift-integration.md

s3-storage.md

secrets-management.md

tile.json