CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyathena

Python DB API 2.0 (PEP 249) client for Amazon Athena

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-operations.mddocs/

Asynchronous Operations

Full async/await support with Future-based API for non-blocking query execution, enabling concurrent query processing and integration with async frameworks. Ideal for applications requiring high concurrency and responsive user interfaces.

Capabilities

Async Cursor

Asynchronous cursor providing non-blocking query execution with Future-based result handling, allowing multiple concurrent queries and integration with async/await patterns.

class AsyncCursor:
    arraysize: int
    max_workers: int
    
    def execute(
        self,
        operation: str,
        parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
        work_group: Optional[str] = None,
        s3_staging_dir: Optional[str] = None,
        cache_size: Optional[int] = 0,
        cache_expiration_time: Optional[int] = 0,
        result_reuse_enable: Optional[bool] = None,
        result_reuse_minutes: Optional[int] = None,
        paramstyle: Optional[str] = None,
        **kwargs
    ) -> Tuple[str, Future[Union[AthenaResultSet, Any]]]:
        """
        Execute a SQL statement asynchronously.
        
        Parameters:
        - operation: SQL query string
        - parameters: Query parameters (dict or sequence)
        - work_group: Athena workgroup for execution
        - s3_staging_dir: S3 location for query results
        - cache_size: Query result cache size
        - cache_expiration_time: Cache expiration time in seconds
        - result_reuse_enable: Enable query result reuse
        - result_reuse_minutes: Result reuse duration in minutes
        - paramstyle: Parameter substitution style
        - **kwargs: Additional execution options
        
        Returns:
        Tuple of (query_id, Future[AthenaResultSet]) for result handling
        """
    
    def executemany(
        self,
        operation: str,
        seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]],
        **kwargs
    ) -> None:
        """
        Execute multiple statements (not supported).
        
        Raises:
        NotSupportedError: Always raised as async executemany is not supported
        """
    
    def cancel(self, query_id: str) -> Future[None]:
        """
        Cancel a running query by query ID.
        
        Parameters:
        - query_id: ID of query to cancel
        
        Returns:
        Future that completes when cancellation is processed
        """
    
    def description(self, query_id: str) -> Future[Optional[List[Tuple[str, str, None, None, int, int, str]]]]:
        """
        Get column description for a query asynchronously.
        
        Parameters:
        - query_id: Query execution ID
        
        Returns:
        Future containing column metadata as list of tuples with
        (name, type_code, display_size, internal_size, precision, scale, null_ok)
        """
    
    def query_execution(self, query_id: str) -> Future[AthenaQueryExecution]:
        """
        Get query execution metadata asynchronously.
        
        Parameters:
        - query_id: Query execution ID
        
        Returns:
        Future[AthenaQueryExecution] with execution details
        """
    
    def poll(self, query_id: str) -> Future[AthenaQueryExecution]:
        """
        Poll query execution status asynchronously.
        
        Parameters:
        - query_id: Query execution ID
        
        Returns:
        Future[AthenaQueryExecution] with current status
        """
    
    def close(self, wait: bool = False) -> None:
        """
        Close cursor and shutdown thread pool executor.
        
        Parameters:
        - wait: If True, wait for all running queries to complete before shutdown
        """

Async Dict Cursor

Asynchronous cursor variant that returns results as dictionaries with column names as keys.

class AsyncDictCursor(AsyncCursor):
    dict_type: Type[Dict] = dict
    
    def execute(
        self,
        operation: str,
        parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
        **kwargs
    ) -> Tuple[str, Future[Union[AthenaResultSet, Any]]]:
        """
        Execute query asynchronously returning dictionary results.
        
        Parameters:
        - operation: SQL query string
        - parameters: Query parameters (dict or sequence)
        - **kwargs: Additional execution options
        
        Returns:
        Tuple of (query_id, Future[AthenaDictResultSet])
        """

Future-based Result Handling

PyAthena uses Python's concurrent.futures.Future for asynchronous result handling, providing standard async patterns.

class Future[T]:
    def result(self, timeout: Optional[float] = None) -> T:
        """Get result, blocking until available or timeout."""
    
    def add_done_callback(self, fn: Callable[[Future[T]], None]) -> None:
        """Add callback function called when Future completes."""
    
    def cancel(self) -> bool:
        """Attempt to cancel the Future."""
    
    def cancelled(self) -> bool:
        """Return True if Future was cancelled."""
    
    def done(self) -> bool:
        """Return True if Future is done (completed or cancelled)."""
    
    def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
        """Return exception if Future failed, None if successful."""

Usage Examples

Basic Async Query Execution

from pyathena import connect
from pyathena.async_cursor import AsyncCursor
import time

# Connect with async cursor
conn = connect(
    s3_staging_dir="s3://my-bucket/athena-results/",
    region_name="us-west-2",
    cursor_class=AsyncCursor
)

cursor = conn.cursor()

# Start query asynchronously
query_id, future = cursor.execute("SELECT COUNT(*) FROM large_table")
print(f"Query started with ID: {query_id}")

# Do other work while query runs
print("Doing other work while query executes...")
time.sleep(2)

# Check if query is complete
if future.done():
    result_set = future.result()
    print("Query completed!")
    print(result_set.fetchall())
else:
    print("Query still running...")
    result_set = future.result()  # Wait for completion
    print("Query completed!")
    print(result_set.fetchall())

cursor.close()
conn.close()

Concurrent Query Execution

from pyathena import connect
from pyathena.async_cursor import AsyncCursor
from concurrent.futures import as_completed
import time

def run_concurrent_queries():
    conn = connect(
        s3_staging_dir="s3://my-bucket/athena-results/",
        region_name="us-west-2",
        cursor_class=AsyncCursor
    )
    
    cursor = conn.cursor()
    
    # Multiple queries to run concurrently
    queries = [
        ("user_count", "SELECT COUNT(DISTINCT user_id) FROM users"),
        ("daily_revenue", "SELECT DATE(order_date), SUM(amount) FROM orders GROUP BY DATE(order_date)"),
        ("top_products", "SELECT product_id, COUNT(*) as sales FROM orders GROUP BY product_id ORDER BY sales DESC LIMIT 10"),
        ("monthly_stats", "SELECT MONTH(order_date), AVG(amount), COUNT(*) FROM orders GROUP BY MONTH(order_date)")
    ]
    
    # Start all queries
    running_queries = {}
    for name, query in queries:
        query_id, future = cursor.execute(query)
        running_queries[name] = (query_id, future)
        print(f"Started {name} query (ID: {query_id})")
    
    # Process results as they complete
    futures = {name: future for name, (_, future) in running_queries.items()}
    
    for future in as_completed(futures.values()):
        # Find which query completed
        completed_name = None
        for name, f in futures.items():
            if f is future:
                completed_name = name
                break
        
        try:
            result_set = future.result()
            results = result_set.fetchall()
            print(f"\n{completed_name} completed with {len(results)} rows:")
            for row in results[:5]:  # Show first 5 rows
                print(f"  {row}")
        except Exception as e:
            print(f"{completed_name} failed: {e}")
    
    cursor.close()
    conn.close()

run_concurrent_queries()

Async Query with Callbacks

from pyathena import connect
from pyathena.async_cursor import AsyncCursor
import time

def query_callback(future):
    """Callback function called when query completes."""
    try:
        result_set = future.result()
        results = result_set.fetchall()
        print(f"Query completed with {len(results)} rows")
        for row in results:
            print(f"  {row}")
    except Exception as e:
        print(f"Query failed: {e}")

def async_with_callback():
    conn = connect(
        s3_staging_dir="s3://my-bucket/athena-results/",
        region_name="us-west-2",
        cursor_class=AsyncCursor
    )
    
    cursor = conn.cursor()
    
    # Execute query with callback
    query_id, future = cursor.execute("SELECT * FROM products LIMIT 5")
    future.add_done_callback(query_callback)
    
    print(f"Query {query_id} started, callback will be called when complete")
    
    # Do other work
    for i in range(5):
        print(f"Doing other work... {i+1}")
        time.sleep(1)
    
    # Ensure query is complete before closing
    if not future.done():
        print("Waiting for query to complete...")
        future.result()  # Wait for completion
    
    cursor.close()
    conn.close()

async_with_callback()

Query Status Monitoring

from pyathena import connect
from pyathena.async_cursor import AsyncCursor
import time

def monitor_query_execution():
    conn = connect(
        s3_staging_dir="s3://my-bucket/athena-results/",
        region_name="us-west-2",
        cursor_class=AsyncCursor
    )
    
    cursor = conn.cursor()
    
    # Start a long-running query
    query_id, future = cursor.execute("""
        SELECT 
            customer_id,
            COUNT(*) as order_count,
            SUM(amount) as total_spent,
            AVG(amount) as avg_order_value
        FROM orders 
        GROUP BY customer_id
        HAVING COUNT(*) > 10
        ORDER BY total_spent DESC
    """)
    
    print(f"Started query {query_id}")
    
    # Monitor execution status
    while not future.done():
        # Get current execution status
        status_future = cursor.poll(query_id)
        execution = status_future.result()
        
        print(f"Query state: {execution.state}")
        if hasattr(execution, 'statistics') and execution.statistics:
            if hasattr(execution.statistics, 'data_scanned_in_bytes'):
                data_scanned = execution.statistics.data_scanned_in_bytes
                print(f"Data scanned: {data_scanned / 1024 / 1024:.2f} MB")
        
        if execution.state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            break
        
        time.sleep(2)  # Poll every 2 seconds
    
    # Get final results
    if future.done():
        try:
            result_set = future.result()
            results = result_set.fetchall()
            print(f"Query completed successfully with {len(results)} rows")
        except Exception as e:
            print(f"Query failed: {e}")
    
    cursor.close()
    conn.close()

monitor_query_execution()

Async Error Handling

from pyathena import connect
from pyathena.async_cursor import AsyncCursor
from pyathena.error import OperationalError, ProgrammingError
from concurrent.futures import TimeoutError

def async_error_handling():
    conn = connect(
        s3_staging_dir="s3://my-bucket/athena-results/",
        region_name="us-west-2",
        cursor_class=AsyncCursor
    )
    
    cursor = conn.cursor()
    
    # Test queries with different error conditions
    test_queries = [
        ("valid_query", "SELECT COUNT(*) FROM users"),
        ("syntax_error", "SELCT COUNT(*) FROM users"),  # Intentional typo
        ("missing_table", "SELECT * FROM nonexistent_table"),
        ("timeout_query", "SELECT * FROM very_large_table")  # May timeout
    ]
    
    for name, query in test_queries:
        try:
            print(f"\nExecuting {name}...")
            query_id, future = cursor.execute(query)
            
            # Set timeout for demonstration
            result_set = future.result(timeout=30)  # 30 second timeout
            results = result_set.fetchall()
            print(f"✓ {name} succeeded with {len(results)} rows")
            
        except ProgrammingError as e:
            print(f"✗ {name} failed with syntax error: {e}")
        except OperationalError as e:
            print(f"✗ {name} failed with operational error: {e}")
        except TimeoutError:
            print(f"✗ {name} timed out")
            # Cancel the timed-out query
            try:
                cancel_future = cursor.cancel(query_id)
                cancel_future.result(timeout=10)
                print(f"  Query {query_id} cancelled")
            except Exception as cancel_error:
                print(f"  Failed to cancel query: {cancel_error}")
        except Exception as e:
            print(f"✗ {name} failed with unexpected error: {e}")
    
    cursor.close()
    conn.close()

async_error_handling()

Integration with asyncio

import asyncio
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
from concurrent.futures import ThreadPoolExecutor

async def athena_with_asyncio():
    """Example integrating PyAthena async cursor with asyncio."""
    
    # Create connection in thread pool (connection setup is synchronous)
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as executor:
        conn = await loop.run_in_executor(
            executor,
            lambda: connect(
                s3_staging_dir="s3://my-bucket/athena-results/",
                region_name="us-west-2",
                cursor_class=AsyncCursor
            )
        )
    
    cursor = conn.cursor()
    
    # Execute multiple queries concurrently using asyncio
    async def execute_query(name, query):
        query_id, future = cursor.execute(query)
        print(f"Started {name} (Query ID: {query_id})")
        
        # Convert Future to asyncio-compatible awaitable
        result_set = await loop.run_in_executor(None, future.result)
        results = result_set.fetchall()
        
        print(f"Completed {name}: {len(results)} rows")
        return name, results
    
    # Define queries
    queries = [
        ("user_stats", "SELECT COUNT(*) as user_count FROM users"),
        ("order_stats", "SELECT COUNT(*) as order_count FROM orders"),
        ("product_stats", "SELECT COUNT(*) as product_count FROM products")
    ]
    
    # Execute all queries concurrently
    tasks = [execute_query(name, query) for name, query in queries]
    results = await asyncio.gather(*tasks)
    
    # Process results
    for name, data in results:
        print(f"{name}: {data}")
    
    # Cleanup
    cursor.close()
    conn.close()

# Run with asyncio
asyncio.run(athena_with_asyncio())

Async Query Pipeline

from pyathena import connect
from pyathena.async_cursor import AsyncCursor
import time

class AsyncQueryPipeline:
    """Pipeline for managing multiple dependent async queries."""
    
    def __init__(self, connection):
        self.conn = connection
        self.cursor = connection.cursor()
        self.results = {}
    
    def execute_stage(self, stage_name, query, dependencies=None):
        """Execute a query stage, optionally waiting for dependencies."""
        # Wait for dependencies if specified
        if dependencies:
            for dep in dependencies:
                if dep in self.results:
                    future = self.results[dep]['future']
                    if not future.done():
                        print(f"Waiting for dependency {dep}...")
                        future.result()
        
        print(f"Starting stage: {stage_name}")
        query_id, future = self.cursor.execute(query)
        
        self.results[stage_name] = {
            'query_id': query_id,
            'future': future,
            'start_time': time.time()
        }
        
        return query_id, future
    
    def wait_for_all(self):
        """Wait for all stages to complete."""
        for stage_name, stage_info in self.results.items():
            future = stage_info['future']
            if not future.done():
                print(f"Waiting for {stage_name}...")
                future.result()
            
            duration = time.time() - stage_info['start_time']
            print(f"{stage_name} completed in {duration:.2f} seconds")
    
    def get_results(self, stage_name):
        """Get results for a specific stage."""
        future = self.results[stage_name]['future']
        result_set = future.result()
        return result_set.fetchall()
    
    def close(self):
        """Close the cursor and connection."""
        self.cursor.close()
        self.conn.close()

# Example usage
def run_async_pipeline():
    conn = connect(
        s3_staging_dir="s3://my-bucket/athena-results/",
        region_name="us-west-2",
        cursor_class=AsyncCursor
    )
    
    pipeline = AsyncQueryPipeline(conn)
    
    # Stage 1: Data preparation
    pipeline.execute_stage(
        'data_prep',
        """
        CREATE TABLE temp_user_summary AS
        SELECT 
            user_id,
            COUNT(*) as order_count,
            SUM(amount) as total_spent
        FROM orders
        GROUP BY user_id
        """
    )
    
    # Stage 2: Analysis (depends on data_prep)
    pipeline.execute_stage(
        'user_analysis',
        """
        SELECT 
            CASE 
                WHEN total_spent > 1000 THEN 'High Value'
                WHEN total_spent > 500 THEN 'Medium Value'
                ELSE 'Low Value'
            END as customer_segment,
            COUNT(*) as customer_count,
            AVG(total_spent) as avg_spent
        FROM temp_user_summary
        GROUP BY customer_segment
        """,
        dependencies=['data_prep']
    )
    
    # Stage 3: Additional metrics (depends on data_prep)
    pipeline.execute_stage(
        'retention_metrics',
        """
        SELECT 
            order_count,
            COUNT(*) as customers_with_count,
            AVG(total_spent) as avg_spent_for_count
        FROM temp_user_summary
        GROUP BY order_count
        ORDER BY order_count
        """,
        dependencies=['data_prep']
    )
    
    # Wait for all stages to complete
    pipeline.wait_for_all()
    
    # Process results
    user_analysis = pipeline.get_results('user_analysis')
    retention_metrics = pipeline.get_results('retention_metrics')
    
    print("\nUser Analysis Results:")
    for row in user_analysis:
        print(f"  {row}")
    
    print("\nRetention Metrics:")
    for row in retention_metrics:
        print(f"  {row}")
    
    pipeline.close()

run_async_pipeline()

Performance Considerations

  • Use async cursors when you need to execute multiple queries concurrently
  • Async operations are particularly beneficial for I/O-bound workloads
  • Consider using connection pooling for high-concurrency applications
  • Monitor query execution status to handle long-running queries appropriately
  • Use appropriate timeouts to prevent hanging operations
  • Async cursors work well with web frameworks like FastAPI, aiohttp, and Tornado

Best Practices

  1. Resource Management: Always close cursors and connections properly
  2. Error Handling: Use try/except blocks with specific exception types
  3. Timeout Handling: Set appropriate timeouts for query execution
  4. Concurrent Limits: Don't exceed Athena's concurrent query limits
  5. Query Monitoring: Monitor long-running queries and provide user feedback
  6. Memory Management: Be mindful of memory usage with large result sets

Install with Tessl CLI

npx tessl i tessl/pypi-pyathena

docs

arrow-integration.md

async-operations.md

core-database.md

index.md

pandas-integration.md

spark-integration.md

sqlalchemy-integration.md

tile.json