Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Amazon Athena provides serverless SQL query capabilities for data stored in Amazon S3, enabling interactive analytics and data processing through standard SQL syntax without managing infrastructure.
Execute Trino/Presto SQL queries against data in S3 with comprehensive result management and monitoring.
class AthenaOperator(AwsBaseOperator):
"""
Submit a Trino/Presto query to Amazon Athena.
Parameters:
- query: str - Trino/Presto query to be run on Amazon Athena
- database: str - database to select
- catalog: str - catalog to select
- output_location: str - S3 path to write query results
- client_request_token: str - unique token to avoid duplicate executions
- workgroup: str - Athena workgroup for query execution (default: 'primary')
- query_execution_context: dict - context for query execution
- result_configuration: dict - configuration for results storage and encryption
- sleep_time: int - time in seconds between status checks (default: 30)
- max_polling_attempts: int - number of polling attempts before timeout
- log_query: bool - whether to log query and execution parameters (default: True)
- deferrable: bool - run operator in deferrable mode
- poll_interval: int - polling interval for deferrable mode
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
str: Query execution ID
"""
def __init__(
self,
*,
query: str,
database: str,
catalog: str = None,
output_location: str = None,
client_request_token: str = None,
workgroup: str = "primary",
query_execution_context: dict[str, str] = None,
result_configuration: dict = None,
sleep_time: int = 30,
max_polling_attempts: int = None,
log_query: bool = True,
deferrable: bool = False,
poll_interval: int = 30,
**kwargs
): ...Monitor Athena query execution status with configurable polling and timeout settings.
class AthenaSensor(BaseSensorOperator):
"""
Wait for an Amazon Athena query to complete.
Parameters:
- query_execution_id: str - Athena query execution ID to monitor
- max_retries: int - maximum number of status check retries
- aws_conn_id: str - Airflow connection for AWS credentials
- sleep_time: int - time between status checks
- poke_interval: int - sensor poke interval
- timeout: int - maximum time to wait for completion
Returns:
bool: True when query completes successfully
"""
def __init__(
self,
query_execution_id: str,
max_retries: int = None,
aws_conn_id: str = 'aws_default',
sleep_time: int = 30,
**kwargs
): ...Low-level Athena service operations for query management and result retrieval.
class AthenaHook(AwsBaseHook):
"""
Hook for Amazon Athena service operations.
Parameters:
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
- verify: bool - whether to verify SSL certificates
- botocore_config: dict - botocore client configuration
"""
def __init__(
self,
aws_conn_id: str = 'aws_default',
region_name: str = None,
verify: bool = None,
botocore_config: dict = None,
**kwargs
): ...
def run_query(
self,
query: str,
query_context: dict,
result_configuration: dict,
client_request_token: str = None,
workgroup: str = 'primary'
) -> str:
"""
Run a query on Amazon Athena.
Parameters:
- query: str - SQL query to execute
- query_context: dict - query execution context
- result_configuration: dict - result storage configuration
- client_request_token: str - unique request token
- workgroup: str - Athena workgroup name
Returns:
str: Query execution ID
"""
...
def check_query_status(self, query_execution_id: str) -> str:
"""
Check the status of a submitted query.
Parameters:
- query_execution_id: str - query execution ID
Returns:
str: Query status ('QUEUED', 'RUNNING', 'SUCCEEDED', 'FAILED', 'CANCELLED')
"""
...
def get_query_results(
self,
query_execution_id: str,
next_token_id: str = None,
max_results: int = 1000
) -> dict:
"""
Get query results from Amazon Athena.
Parameters:
- query_execution_id: str - query execution ID
- next_token_id: str - pagination token
- max_results: int - maximum number of results to return
Returns:
dict: Query results with metadata
"""
...
def get_query_results_paginator(
self,
query_execution_id: str,
max_items: int = None,
page_size: int = None
):
"""
Get paginated query results.
Parameters:
- query_execution_id: str - query execution ID
- max_items: int - maximum items to return
- page_size: int - page size for pagination
Returns:
Iterator of result pages
"""
...
def stop_query(self, query_execution_id: str) -> dict:
"""
Stop/cancel a running query.
Parameters:
- query_execution_id: str - query execution ID to cancel
Returns:
dict: Cancellation response
"""
...
def get_output_location(self, query_execution_id: str) -> str:
"""
Get the S3 output location for query results.
Parameters:
- query_execution_id: str - query execution ID
Returns:
str: S3 URI of query results
"""
...Query and manage AWS Glue Data Catalog resources through Athena SQL interface.
class AthenaCreateDataCatalogOperator(AwsBaseOperator):
"""
Create a data catalog in Amazon Athena.
Parameters:
- catalog_name: str - name of the data catalog
- catalog_type: str - type of catalog ('HIVE' or 'GLUE')
- description: str - description of the catalog
- parameters: dict - catalog configuration parameters
- tags: dict - tags to apply to the catalog
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
str: Data catalog ARN
"""
def __init__(
self,
catalog_name: str,
catalog_type: str,
description: str = None,
parameters: dict = None,
tags: dict = None,
**kwargs
): ...from airflow.providers.amazon.aws.operators.athena import AthenaOperator
# Execute a simple analytics query
analytics_query = AthenaOperator(
task_id='run_sales_analysis',
query="""
SELECT
region,
SUM(sales_amount) as total_sales,
COUNT(*) as transaction_count
FROM sales_data
WHERE date_column >= date('2023-01-01')
GROUP BY region
ORDER BY total_sales DESC
""",
database='analytics_db',
catalog='AwsDataCatalog',
output_location='s3://my-results-bucket/athena-results/',
workgroup='analytics-workgroup',
sleep_time=10,
max_polling_attempts=100,
aws_conn_id='aws_default'
)# Transform and prepare data for analytics
data_transform = AthenaOperator(
task_id='transform_customer_data',
query="""
CREATE TABLE analytics_db.customer_metrics AS
SELECT
customer_id,
customer_tier,
date_trunc('month', order_date) as month,
SUM(order_value) as monthly_spend,
COUNT(order_id) as monthly_orders,
AVG(order_value) as avg_order_value
FROM raw_data.orders o
JOIN raw_data.customers c ON o.customer_id = c.id
WHERE order_date >= date('2023-01-01')
GROUP BY customer_id, customer_tier, date_trunc('month', order_date)
""",
database='analytics_db',
output_location='s3://analytics-bucket/transformed-data/',
result_configuration={
'OutputLocation': 's3://analytics-bucket/query-results/',
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_S3'
}
},
workgroup='data-processing',
log_query=True,
aws_conn_id='aws_default'
)# Use templated queries with Airflow variables
parameterized_query = AthenaOperator(
task_id='daily_metrics_report',
query="""
SELECT
'{{ ds }}' as report_date,
product_category,
SUM(revenue) as daily_revenue,
COUNT(DISTINCT customer_id) as unique_customers
FROM sales_fact
WHERE date_column = '{{ ds }}'
GROUP BY product_category
""",
database='reporting',
output_location='s3://reports-bucket/daily-metrics/{{ ds }}/',
workgroup='reporting-workgroup',
query_execution_context={
'Catalog': 'AwsDataCatalog',
'Database': 'reporting'
},
client_request_token='daily-report-{{ ds_nodash }}',
aws_conn_id='aws_default'
)# Process large datasets efficiently using deferrable execution
large_data_processing = AthenaOperator(
task_id='process_large_dataset',
query="""
CREATE TABLE processed_data.yearly_aggregates AS
SELECT
year(transaction_date) as year,
month(transaction_date) as month,
store_id,
product_category,
SUM(amount) as total_amount,
COUNT(*) as transaction_count,
AVG(amount) as avg_amount
FROM raw_data.transactions
WHERE transaction_date >= date('2020-01-01')
GROUP BY year(transaction_date), month(transaction_date), store_id, product_category
""",
database='processed_data',
output_location='s3://processed-data-bucket/yearly-aggregates/',
workgroup='heavy-processing',
deferrable=True, # Use deferrable mode for long-running queries
poll_interval=60, # Check status every minute
result_configuration={
'OutputLocation': 's3://processed-data-bucket/query-results/',
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_KMS',
'KmsKey': 'arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012'
}
},
aws_conn_id='aws_default'
)from airflow.providers.amazon.aws.hooks.athena import AthenaHook
def process_athena_results(**context):
"""Custom function to process Athena query results."""
athena_hook = AthenaHook(aws_conn_id='aws_default')
# Get query execution ID from previous task
query_execution_id = context['task_instance'].xcom_pull(task_ids='run_analytics_query')
# Get query results
results = athena_hook.get_query_results(query_execution_id=query_execution_id)
# Process results
for row in results['ResultSet']['Rows'][1:]: # Skip header row
data = [col.get('VarCharValue', '') for col in row['Data']]
# Process each data row
print(f"Processing row: {data}")
return f"Processed {len(results['ResultSet']['Rows']) - 1} rows"
# Use with PythonOperator
process_results = PythonOperator(
task_id='process_results',
python_callable=process_athena_results,
provide_context=True
)
analytics_query >> process_resultsfrom airflow.providers.amazon.aws.operators.athena import (
AthenaOperator,
AthenaCreateDataCatalogOperator
)
from airflow.providers.amazon.aws.sensors.athena import AthenaSensor
from airflow.providers.amazon.aws.hooks.athena import AthenaHookInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon