0
# Amazon Athena Analytics
1
2
Amazon Athena provides serverless SQL query capabilities for data stored in Amazon S3, enabling interactive analytics and data processing through standard SQL syntax without managing infrastructure.
3
4
## Capabilities
5
6
### SQL Query Execution
7
8
Execute Trino/Presto SQL queries against data in S3 with comprehensive result management and monitoring.
9
10
```python { .api }
11
class AthenaOperator(AwsBaseOperator):
12
"""
13
Submit a Trino/Presto query to Amazon Athena.
14
15
Parameters:
16
- query: str - Trino/Presto query to be run on Amazon Athena
17
- database: str - database to select
18
- catalog: str - catalog to select
19
- output_location: str - S3 path to write query results
20
- client_request_token: str - unique token to avoid duplicate executions
21
- workgroup: str - Athena workgroup for query execution (default: 'primary')
22
- query_execution_context: dict - context for query execution
23
- result_configuration: dict - configuration for results storage and encryption
24
- sleep_time: int - time in seconds between status checks (default: 30)
25
- max_polling_attempts: int - number of polling attempts before timeout
26
- log_query: bool - whether to log query and execution parameters (default: True)
27
- deferrable: bool - run operator in deferrable mode
28
- poll_interval: int - polling interval for deferrable mode
29
- aws_conn_id: str - Airflow connection for AWS credentials
30
31
Returns:
32
str: Query execution ID
33
"""
34
def __init__(
35
self,
36
*,
37
query: str,
38
database: str,
39
catalog: str = None,
40
output_location: str = None,
41
client_request_token: str = None,
42
workgroup: str = "primary",
43
query_execution_context: dict[str, str] = None,
44
result_configuration: dict = None,
45
sleep_time: int = 30,
46
max_polling_attempts: int = None,
47
log_query: bool = True,
48
deferrable: bool = False,
49
poll_interval: int = 30,
50
**kwargs
51
): ...
52
```
53
54
### Query Status Monitoring
55
56
Monitor Athena query execution status with configurable polling and timeout settings.
57
58
```python { .api }
59
class AthenaSensor(BaseSensorOperator):
60
"""
61
Wait for an Amazon Athena query to complete.
62
63
Parameters:
64
- query_execution_id: str - Athena query execution ID to monitor
65
- max_retries: int - maximum number of status check retries
66
- aws_conn_id: str - Airflow connection for AWS credentials
67
- sleep_time: int - time between status checks
68
- poke_interval: int - sensor poke interval
69
- timeout: int - maximum time to wait for completion
70
71
Returns:
72
bool: True when query completes successfully
73
"""
74
def __init__(
75
self,
76
query_execution_id: str,
77
max_retries: int = None,
78
aws_conn_id: str = 'aws_default',
79
sleep_time: int = 30,
80
**kwargs
81
): ...
82
```
83
84
### Athena Service Hook
85
86
Low-level Athena service operations for query management and result retrieval.
87
88
```python { .api }
89
class AthenaHook(AwsBaseHook):
90
"""
91
Hook for Amazon Athena service operations.
92
93
Parameters:
94
- aws_conn_id: str - Airflow connection for AWS credentials
95
- region_name: str - AWS region name
96
- verify: bool - whether to verify SSL certificates
97
- botocore_config: dict - botocore client configuration
98
"""
99
def __init__(
100
self,
101
aws_conn_id: str = 'aws_default',
102
region_name: str = None,
103
verify: bool = None,
104
botocore_config: dict = None,
105
**kwargs
106
): ...
107
108
def run_query(
109
self,
110
query: str,
111
query_context: dict,
112
result_configuration: dict,
113
client_request_token: str = None,
114
workgroup: str = 'primary'
115
) -> str:
116
"""
117
Run a query on Amazon Athena.
118
119
Parameters:
120
- query: str - SQL query to execute
121
- query_context: dict - query execution context
122
- result_configuration: dict - result storage configuration
123
- client_request_token: str - unique request token
124
- workgroup: str - Athena workgroup name
125
126
Returns:
127
str: Query execution ID
128
"""
129
...
130
131
def check_query_status(self, query_execution_id: str) -> str:
132
"""
133
Check the status of a submitted query.
134
135
Parameters:
136
- query_execution_id: str - query execution ID
137
138
Returns:
139
str: Query status ('QUEUED', 'RUNNING', 'SUCCEEDED', 'FAILED', 'CANCELLED')
140
"""
141
...
142
143
def get_query_results(
144
self,
145
query_execution_id: str,
146
next_token_id: str = None,
147
max_results: int = 1000
148
) -> dict:
149
"""
150
Get query results from Amazon Athena.
151
152
Parameters:
153
- query_execution_id: str - query execution ID
154
- next_token_id: str - pagination token
155
- max_results: int - maximum number of results to return
156
157
Returns:
158
dict: Query results with metadata
159
"""
160
...
161
162
def get_query_results_paginator(
163
self,
164
query_execution_id: str,
165
max_items: int = None,
166
page_size: int = None
167
):
168
"""
169
Get paginated query results.
170
171
Parameters:
172
- query_execution_id: str - query execution ID
173
- max_items: int - maximum items to return
174
- page_size: int - page size for pagination
175
176
Returns:
177
Iterator of result pages
178
"""
179
...
180
181
def stop_query(self, query_execution_id: str) -> dict:
182
"""
183
Stop/cancel a running query.
184
185
Parameters:
186
- query_execution_id: str - query execution ID to cancel
187
188
Returns:
189
dict: Cancellation response
190
"""
191
...
192
193
def get_output_location(self, query_execution_id: str) -> str:
194
"""
195
Get the S3 output location for query results.
196
197
Parameters:
198
- query_execution_id: str - query execution ID
199
200
Returns:
201
str: S3 URI of query results
202
"""
203
...
204
```
205
206
### Data Catalog Integration
207
208
Query and manage AWS Glue Data Catalog resources through Athena SQL interface.
209
210
```python { .api }
211
class AthenaCreateDataCatalogOperator(AwsBaseOperator):
212
"""
213
Create a data catalog in Amazon Athena.
214
215
Parameters:
216
- catalog_name: str - name of the data catalog
217
- catalog_type: str - type of catalog ('HIVE' or 'GLUE')
218
- description: str - description of the catalog
219
- parameters: dict - catalog configuration parameters
220
- tags: dict - tags to apply to the catalog
221
- aws_conn_id: str - Airflow connection for AWS credentials
222
223
Returns:
224
str: Data catalog ARN
225
"""
226
def __init__(
227
self,
228
catalog_name: str,
229
catalog_type: str,
230
description: str = None,
231
parameters: dict = None,
232
tags: dict = None,
233
**kwargs
234
): ...
235
```
236
237
## Usage Examples
238
239
### Basic Query Execution
240
241
```python
242
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
243
244
# Execute a simple analytics query
245
analytics_query = AthenaOperator(
246
task_id='run_sales_analysis',
247
query="""
248
SELECT
249
region,
250
SUM(sales_amount) as total_sales,
251
COUNT(*) as transaction_count
252
FROM sales_data
253
WHERE date_column >= date('2023-01-01')
254
GROUP BY region
255
ORDER BY total_sales DESC
256
""",
257
database='analytics_db',
258
catalog='AwsDataCatalog',
259
output_location='s3://my-results-bucket/athena-results/',
260
workgroup='analytics-workgroup',
261
sleep_time=10,
262
max_polling_attempts=100,
263
aws_conn_id='aws_default'
264
)
265
```
266
267
### Data Transformation Pipeline
268
269
```python
270
# Transform and prepare data for analytics
271
data_transform = AthenaOperator(
272
task_id='transform_customer_data',
273
query="""
274
CREATE TABLE analytics_db.customer_metrics AS
275
SELECT
276
customer_id,
277
customer_tier,
278
date_trunc('month', order_date) as month,
279
SUM(order_value) as monthly_spend,
280
COUNT(order_id) as monthly_orders,
281
AVG(order_value) as avg_order_value
282
FROM raw_data.orders o
283
JOIN raw_data.customers c ON o.customer_id = c.id
284
WHERE order_date >= date('2023-01-01')
285
GROUP BY customer_id, customer_tier, date_trunc('month', order_date)
286
""",
287
database='analytics_db',
288
output_location='s3://analytics-bucket/transformed-data/',
289
result_configuration={
290
'OutputLocation': 's3://analytics-bucket/query-results/',
291
'EncryptionConfiguration': {
292
'EncryptionOption': 'SSE_S3'
293
}
294
},
295
workgroup='data-processing',
296
log_query=True,
297
aws_conn_id='aws_default'
298
)
299
```
300
301
### Parameterized Query with Template
302
303
```python
304
# Use templated queries with Airflow variables
305
parameterized_query = AthenaOperator(
306
task_id='daily_metrics_report',
307
query="""
308
SELECT
309
'{{ ds }}' as report_date,
310
product_category,
311
SUM(revenue) as daily_revenue,
312
COUNT(DISTINCT customer_id) as unique_customers
313
FROM sales_fact
314
WHERE date_column = '{{ ds }}'
315
GROUP BY product_category
316
""",
317
database='reporting',
318
output_location='s3://reports-bucket/daily-metrics/{{ ds }}/',
319
workgroup='reporting-workgroup',
320
query_execution_context={
321
'Catalog': 'AwsDataCatalog',
322
'Database': 'reporting'
323
},
324
client_request_token='daily-report-{{ ds_nodash }}',
325
aws_conn_id='aws_default'
326
)
327
```
328
329
### Large Dataset Processing with Deferrable Mode
330
331
```python
332
# Process large datasets efficiently using deferrable execution
333
large_data_processing = AthenaOperator(
334
task_id='process_large_dataset',
335
query="""
336
CREATE TABLE processed_data.yearly_aggregates AS
337
SELECT
338
year(transaction_date) as year,
339
month(transaction_date) as month,
340
store_id,
341
product_category,
342
SUM(amount) as total_amount,
343
COUNT(*) as transaction_count,
344
AVG(amount) as avg_amount
345
FROM raw_data.transactions
346
WHERE transaction_date >= date('2020-01-01')
347
GROUP BY year(transaction_date), month(transaction_date), store_id, product_category
348
""",
349
database='processed_data',
350
output_location='s3://processed-data-bucket/yearly-aggregates/',
351
workgroup='heavy-processing',
352
deferrable=True, # Use deferrable mode for long-running queries
353
poll_interval=60, # Check status every minute
354
result_configuration={
355
'OutputLocation': 's3://processed-data-bucket/query-results/',
356
'EncryptionConfiguration': {
357
'EncryptionOption': 'SSE_KMS',
358
'KmsKey': 'arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012'
359
}
360
},
361
aws_conn_id='aws_default'
362
)
363
```
364
365
### Query Result Processing
366
367
```python
368
from airflow.providers.amazon.aws.hooks.athena import AthenaHook
369
370
def process_athena_results(**context):
371
"""Custom function to process Athena query results."""
372
athena_hook = AthenaHook(aws_conn_id='aws_default')
373
374
# Get query execution ID from previous task
375
query_execution_id = context['task_instance'].xcom_pull(task_ids='run_analytics_query')
376
377
# Get query results
378
results = athena_hook.get_query_results(query_execution_id=query_execution_id)
379
380
# Process results
381
for row in results['ResultSet']['Rows'][1:]: # Skip header row
382
data = [col.get('VarCharValue', '') for col in row['Data']]
383
# Process each data row
384
print(f"Processing row: {data}")
385
386
return f"Processed {len(results['ResultSet']['Rows']) - 1} rows"
387
388
# Use with PythonOperator
389
process_results = PythonOperator(
390
task_id='process_results',
391
python_callable=process_athena_results,
392
provide_context=True
393
)
394
395
analytics_query >> process_results
396
```
397
398
## Import Statements
399
400
```python
401
from airflow.providers.amazon.aws.operators.athena import (
402
AthenaOperator,
403
AthenaCreateDataCatalogOperator
404
)
405
from airflow.providers.amazon.aws.sensors.athena import AthenaSensor
406
from airflow.providers.amazon.aws.hooks.athena import AthenaHook
407
```