0
# Athena Query Service
1
2
Execute serverless SQL queries against data in S3 using Amazon Athena with result management, query optimization, and comprehensive error handling.
3
4
## Capabilities
5
6
### Athena Resource
7
8
```python { .api }
9
class AthenaResource(ResourceWithBoto3Configuration):
10
"""
11
Resource for executing queries with Amazon Athena.
12
"""
13
work_group: str = "primary"
14
s3_staging_dir: str
15
16
def execute_query(
17
self,
18
query: str,
19
result_configuration: Optional[Dict] = None
20
) -> str:
21
"""
22
Execute SQL query using Athena.
23
24
Parameters:
25
query: SQL query to execute
26
result_configuration: Query result configuration
27
28
Returns:
29
str: Query execution ID
30
"""
31
32
def get_query_results(self, execution_id: str) -> Dict:
33
"""
34
Retrieve results for a completed query.
35
36
Parameters:
37
execution_id: Query execution ID
38
39
Returns:
40
Dict: Query results and metadata
41
"""
42
43
def wait_for_query_completion(
44
self,
45
execution_id: str,
46
timeout_seconds: int = 300
47
) -> bool:
48
"""
49
Wait for query to complete execution.
50
51
Parameters:
52
execution_id: Query execution ID
53
timeout_seconds: Maximum wait time
54
55
Returns:
56
bool: True if completed successfully
57
"""
58
59
def athena_resource(**kwargs) -> AthenaResource:
60
"""
61
Factory function for Athena resource.
62
"""
63
```
64
65
### Athena Client
66
67
```python { .api }
68
class AthenaClient:
69
"""
70
Direct Athena client wrapper.
71
"""
72
73
def start_query_execution(self, **kwargs): ...
74
def get_query_execution(self, execution_id: str): ...
75
def get_query_results(self, execution_id: str): ...
76
77
class AthenaClientResource(AthenaClient, ConfigurableResource): ...
78
```
79
80
### Error Handling
81
82
```python { .api }
83
class AthenaError(Exception):
84
"""
85
Exception for Athena-related errors.
86
"""
87
88
class AthenaTimeout(Exception):
89
"""
90
Exception for Athena query timeouts.
91
"""
92
```
93
94
### Testing Utilities
95
96
```python { .api }
97
class FakeAthenaClient: ...
98
class FakeAthenaResource: ...
99
def fake_athena_resource(**kwargs): ...
100
```
101
102
## Usage Examples
103
104
```python
105
from dagster import op, job, Definitions
106
from dagster_aws.athena import AthenaResource, AthenaTimeout
107
108
@op(required_resource_keys={"athena"})
109
def analyze_web_logs(context):
110
athena = context.resources.athena
111
112
query = """
113
SELECT
114
date(timestamp) as date,
115
count(*) as page_views,
116
count(distinct user_id) as unique_visitors
117
FROM web_logs
118
WHERE timestamp >= current_date - interval '7' day
119
GROUP BY date(timestamp)
120
ORDER BY date
121
"""
122
123
try:
124
execution_id = athena.execute_query(query)
125
126
if athena.wait_for_query_completion(execution_id, timeout_seconds=120):
127
results = athena.get_query_results(execution_id)
128
return results['ResultSet']['Rows']
129
else:
130
raise AthenaTimeout("Query execution timed out")
131
132
except Exception as e:
133
context.log.error(f"Athena query failed: {e}")
134
raise
135
136
@job(
137
resource_defs={
138
"athena": AthenaResource(
139
s3_staging_dir="s3://my-athena-results/",
140
work_group="analytics"
141
)
142
}
143
)
144
def web_analytics_job():
145
analyze_web_logs()
146
147
defs = Definitions(jobs=[web_analytics_job])
148
```