CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

athena-analytics.mddocs/

Amazon Athena Analytics

Amazon Athena provides serverless SQL query capabilities for data stored in Amazon S3, enabling interactive analytics and data processing through standard SQL syntax without managing infrastructure.

Capabilities

SQL Query Execution

Execute Trino/Presto SQL queries against data in S3 with comprehensive result management and monitoring.

class AthenaOperator(AwsBaseOperator):
    """
    Submit a Trino/Presto query to Amazon Athena.
    
    Parameters:
    - query: str - Trino/Presto query to be run on Amazon Athena
    - database: str - database to select
    - catalog: str - catalog to select
    - output_location: str - S3 path to write query results
    - client_request_token: str - unique token to avoid duplicate executions
    - workgroup: str - Athena workgroup for query execution (default: 'primary')
    - query_execution_context: dict - context for query execution
    - result_configuration: dict - configuration for results storage and encryption
    - sleep_time: int - time in seconds between status checks (default: 30)
    - max_polling_attempts: int - number of polling attempts before timeout
    - log_query: bool - whether to log query and execution parameters (default: True)
    - deferrable: bool - run operator in deferrable mode
    - poll_interval: int - polling interval for deferrable mode
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    str: Query execution ID
    """
    def __init__(
        self,
        *,
        query: str,
        database: str,
        catalog: str = None,
        output_location: str = None,
        client_request_token: str = None,
        workgroup: str = "primary",
        query_execution_context: dict[str, str] = None,
        result_configuration: dict = None,
        sleep_time: int = 30,
        max_polling_attempts: int = None,
        log_query: bool = True,
        deferrable: bool = False,
        poll_interval: int = 30,
        **kwargs
    ): ...

Query Status Monitoring

Monitor Athena query execution status with configurable polling and timeout settings.

class AthenaSensor(BaseSensorOperator):
    """
    Wait for an Amazon Athena query to complete.
    
    Parameters:
    - query_execution_id: str - Athena query execution ID to monitor
    - max_retries: int - maximum number of status check retries
    - aws_conn_id: str - Airflow connection for AWS credentials
    - sleep_time: int - time between status checks
    - poke_interval: int - sensor poke interval
    - timeout: int - maximum time to wait for completion
    
    Returns:
    bool: True when query completes successfully
    """
    def __init__(
        self,
        query_execution_id: str,
        max_retries: int = None,
        aws_conn_id: str = 'aws_default',
        sleep_time: int = 30,
        **kwargs
    ): ...

Athena Service Hook

Low-level Athena service operations for query management and result retrieval.

class AthenaHook(AwsBaseHook):
    """
    Hook for Amazon Athena service operations.
    
    Parameters:
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    - verify: bool - whether to verify SSL certificates
    - botocore_config: dict - botocore client configuration
    """
    def __init__(
        self,
        aws_conn_id: str = 'aws_default',
        region_name: str = None,
        verify: bool = None,
        botocore_config: dict = None,
        **kwargs
    ): ...
    
    def run_query(
        self,
        query: str,
        query_context: dict,
        result_configuration: dict,
        client_request_token: str = None,
        workgroup: str = 'primary'
    ) -> str:
        """
        Run a query on Amazon Athena.
        
        Parameters:
        - query: str - SQL query to execute
        - query_context: dict - query execution context
        - result_configuration: dict - result storage configuration
        - client_request_token: str - unique request token
        - workgroup: str - Athena workgroup name
        
        Returns:
        str: Query execution ID
        """
        ...
    
    def check_query_status(self, query_execution_id: str) -> str:
        """
        Check the status of a submitted query.
        
        Parameters:
        - query_execution_id: str - query execution ID
        
        Returns:
        str: Query status ('QUEUED', 'RUNNING', 'SUCCEEDED', 'FAILED', 'CANCELLED')
        """
        ...
    
    def get_query_results(
        self,
        query_execution_id: str,
        next_token_id: str = None,
        max_results: int = 1000
    ) -> dict:
        """
        Get query results from Amazon Athena.
        
        Parameters:
        - query_execution_id: str - query execution ID
        - next_token_id: str - pagination token
        - max_results: int - maximum number of results to return
        
        Returns:
        dict: Query results with metadata
        """
        ...
    
    def get_query_results_paginator(
        self,
        query_execution_id: str,
        max_items: int = None,
        page_size: int = None
    ):
        """
        Get paginated query results.
        
        Parameters:
        - query_execution_id: str - query execution ID
        - max_items: int - maximum items to return
        - page_size: int - page size for pagination
        
        Returns:
        Iterator of result pages
        """
        ...
    
    def stop_query(self, query_execution_id: str) -> dict:
        """
        Stop/cancel a running query.
        
        Parameters:
        - query_execution_id: str - query execution ID to cancel
        
        Returns:
        dict: Cancellation response
        """
        ...
    
    def get_output_location(self, query_execution_id: str) -> str:
        """
        Get the S3 output location for query results.
        
        Parameters:
        - query_execution_id: str - query execution ID
        
        Returns:
        str: S3 URI of query results
        """
        ...

Data Catalog Integration

Query and manage AWS Glue Data Catalog resources through Athena SQL interface.

class AthenaCreateDataCatalogOperator(AwsBaseOperator):
    """
    Create a data catalog in Amazon Athena.
    
    Parameters:
    - catalog_name: str - name of the data catalog
    - catalog_type: str - type of catalog ('HIVE' or 'GLUE')
    - description: str - description of the catalog
    - parameters: dict - catalog configuration parameters
    - tags: dict - tags to apply to the catalog
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    str: Data catalog ARN
    """
    def __init__(
        self,
        catalog_name: str,
        catalog_type: str,
        description: str = None,
        parameters: dict = None,
        tags: dict = None,
        **kwargs
    ): ...

Usage Examples

Basic Query Execution

from airflow.providers.amazon.aws.operators.athena import AthenaOperator

# Execute a simple analytics query
analytics_query = AthenaOperator(
    task_id='run_sales_analysis',
    query="""
        SELECT 
            region,
            SUM(sales_amount) as total_sales,
            COUNT(*) as transaction_count
        FROM sales_data 
        WHERE date_column >= date('2023-01-01')
        GROUP BY region
        ORDER BY total_sales DESC
    """,
    database='analytics_db',
    catalog='AwsDataCatalog',
    output_location='s3://my-results-bucket/athena-results/',
    workgroup='analytics-workgroup',
    sleep_time=10,
    max_polling_attempts=100,
    aws_conn_id='aws_default'
)

Data Transformation Pipeline

# Transform and prepare data for analytics
data_transform = AthenaOperator(
    task_id='transform_customer_data',
    query="""
        CREATE TABLE analytics_db.customer_metrics AS
        SELECT 
            customer_id,
            customer_tier,
            date_trunc('month', order_date) as month,
            SUM(order_value) as monthly_spend,
            COUNT(order_id) as monthly_orders,
            AVG(order_value) as avg_order_value
        FROM raw_data.orders o
        JOIN raw_data.customers c ON o.customer_id = c.id
        WHERE order_date >= date('2023-01-01')
        GROUP BY customer_id, customer_tier, date_trunc('month', order_date)
    """,
    database='analytics_db',
    output_location='s3://analytics-bucket/transformed-data/',
    result_configuration={
        'OutputLocation': 's3://analytics-bucket/query-results/',
        'EncryptionConfiguration': {
            'EncryptionOption': 'SSE_S3'
        }
    },
    workgroup='data-processing',
    log_query=True,
    aws_conn_id='aws_default'
)

Parameterized Query with Template

# Use templated queries with Airflow variables
parameterized_query = AthenaOperator(
    task_id='daily_metrics_report',
    query="""
        SELECT 
            '{{ ds }}' as report_date,
            product_category,
            SUM(revenue) as daily_revenue,
            COUNT(DISTINCT customer_id) as unique_customers
        FROM sales_fact 
        WHERE date_column = '{{ ds }}'
        GROUP BY product_category
    """,
    database='reporting',
    output_location='s3://reports-bucket/daily-metrics/{{ ds }}/',
    workgroup='reporting-workgroup',
    query_execution_context={
        'Catalog': 'AwsDataCatalog',
        'Database': 'reporting'
    },
    client_request_token='daily-report-{{ ds_nodash }}',
    aws_conn_id='aws_default'
)

Large Dataset Processing with Deferrable Mode

# Process large datasets efficiently using deferrable execution
large_data_processing = AthenaOperator(
    task_id='process_large_dataset',
    query="""
        CREATE TABLE processed_data.yearly_aggregates AS
        SELECT 
            year(transaction_date) as year,
            month(transaction_date) as month,
            store_id,
            product_category, 
            SUM(amount) as total_amount,
            COUNT(*) as transaction_count,
            AVG(amount) as avg_amount
        FROM raw_data.transactions
        WHERE transaction_date >= date('2020-01-01')
        GROUP BY year(transaction_date), month(transaction_date), store_id, product_category
    """,
    database='processed_data',
    output_location='s3://processed-data-bucket/yearly-aggregates/',
    workgroup='heavy-processing',
    deferrable=True,  # Use deferrable mode for long-running queries
    poll_interval=60,  # Check status every minute
    result_configuration={
        'OutputLocation': 's3://processed-data-bucket/query-results/',
        'EncryptionConfiguration': {
            'EncryptionOption': 'SSE_KMS',
            'KmsKey': 'arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012'
        }
    },
    aws_conn_id='aws_default'
)

Query Result Processing

from airflow.providers.amazon.aws.hooks.athena import AthenaHook

def process_athena_results(**context):
    """Custom function to process Athena query results."""
    athena_hook = AthenaHook(aws_conn_id='aws_default')
    
    # Get query execution ID from previous task
    query_execution_id = context['task_instance'].xcom_pull(task_ids='run_analytics_query')
    
    # Get query results
    results = athena_hook.get_query_results(query_execution_id=query_execution_id)
    
    # Process results
    for row in results['ResultSet']['Rows'][1:]:  # Skip header row
        data = [col.get('VarCharValue', '') for col in row['Data']]
        # Process each data row
        print(f"Processing row: {data}")
    
    return f"Processed {len(results['ResultSet']['Rows']) - 1} rows"

# Use with PythonOperator
process_results = PythonOperator(
    task_id='process_results',
    python_callable=process_athena_results,
    provide_context=True
)

analytics_query >> process_results

Import Statements

from airflow.providers.amazon.aws.operators.athena import (
    AthenaOperator,
    AthenaCreateDataCatalogOperator
)
from airflow.providers.amazon.aws.sensors.athena import AthenaSensor
from airflow.providers.amazon.aws.hooks.athena import AthenaHook

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json