Package for AWS-specific Dagster framework solid and resource components.
—
Execute serverless SQL queries against data in S3 using Amazon Athena with result management, query optimization, and comprehensive error handling.
class AthenaResource(ResourceWithBoto3Configuration):
"""
Resource for executing queries with Amazon Athena.
"""
work_group: str = "primary"
s3_staging_dir: str
def execute_query(
self,
query: str,
result_configuration: Optional[Dict] = None
) -> str:
"""
Execute SQL query using Athena.
Parameters:
query: SQL query to execute
result_configuration: Query result configuration
Returns:
str: Query execution ID
"""
def get_query_results(self, execution_id: str) -> Dict:
"""
Retrieve results for a completed query.
Parameters:
execution_id: Query execution ID
Returns:
Dict: Query results and metadata
"""
def wait_for_query_completion(
self,
execution_id: str,
timeout_seconds: int = 300
) -> bool:
"""
Wait for query to complete execution.
Parameters:
execution_id: Query execution ID
timeout_seconds: Maximum wait time
Returns:
bool: True if completed successfully
"""
def athena_resource(**kwargs) -> AthenaResource:
"""
Factory function for Athena resource.
"""class AthenaClient:
"""
Direct Athena client wrapper.
"""
def start_query_execution(self, **kwargs): ...
def get_query_execution(self, execution_id: str): ...
def get_query_results(self, execution_id: str): ...
class AthenaClientResource(AthenaClient, ConfigurableResource): ...class AthenaError(Exception):
"""
Exception for Athena-related errors.
"""
class AthenaTimeout(Exception):
"""
Exception for Athena query timeouts.
"""class FakeAthenaClient: ...
class FakeAthenaResource: ...
def fake_athena_resource(**kwargs): ...from dagster import op, job, Definitions
from dagster_aws.athena import AthenaResource, AthenaTimeout
@op(required_resource_keys={"athena"})
def analyze_web_logs(context):
athena = context.resources.athena
query = """
SELECT
date(timestamp) as date,
count(*) as page_views,
count(distinct user_id) as unique_visitors
FROM web_logs
WHERE timestamp >= current_date - interval '7' day
GROUP BY date(timestamp)
ORDER BY date
"""
try:
execution_id = athena.execute_query(query)
if athena.wait_for_query_completion(execution_id, timeout_seconds=120):
results = athena.get_query_results(execution_id)
return results['ResultSet']['Rows']
else:
raise AthenaTimeout("Query execution timed out")
except Exception as e:
context.log.error(f"Athena query failed: {e}")
raise
@job(
resource_defs={
"athena": AthenaResource(
s3_staging_dir="s3://my-athena-results/",
work_group="analytics"
)
}
)
def web_analytics_job():
analyze_web_logs()
defs = Definitions(jobs=[web_analytics_job])Install with Tessl CLI
npx tessl i tessl/pypi-dagster-aws