CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyathena

Python DB API 2.0 (PEP 249) client for Amazon Athena

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

core-database.mddocs/

Core Database Operations

Standard DB API 2.0 compliant database operations for connecting to Amazon Athena, executing SQL queries, and processing results. Provides both tuple and dictionary result formats with full cursor functionality.

Capabilities

Connection Management

Create and manage connections to Amazon Athena with comprehensive configuration options for AWS authentication, S3 staging, and query execution parameters.

def connect(
    s3_staging_dir: Optional[str] = None,
    region_name: Optional[str] = None,
    schema_name: Optional[str] = "default",
    catalog_name: Optional[str] = "awsdatacatalog",
    work_group: Optional[str] = None,
    poll_interval: float = 1,
    encryption_option: Optional[str] = None,
    kms_key: Optional[str] = None,
    profile_name: Optional[str] = None,
    role_arn: Optional[str] = None,
    role_session_name: str = "PyAthena-session-{timestamp}",
    external_id: Optional[str] = None,
    serial_number: Optional[str] = None,
    duration_seconds: int = 3600,
    converter: Optional[Converter] = None,
    formatter: Optional[Formatter] = None,
    retry_config: Optional[RetryConfig] = None,
    cursor_class: Optional[Type[ConnectionCursor]] = None,
    cursor_kwargs: Optional[Dict[str, Any]] = None,
    **kwargs
) -> Connection[ConnectionCursor]:
    """
    Create a connection to Amazon Athena.
    
    Parameters:
    - s3_staging_dir: S3 location for query results (required for most operations)
    - region_name: AWS region name (e.g., 'us-west-2')
    - schema_name: Default database/schema name (default: 'default')
    - catalog_name: Data catalog name (default: 'awsdatacatalog')
    - work_group: Athena workgroup name
    - poll_interval: Query polling interval in seconds (default: 1)
    - encryption_option: S3 encryption option ('SSE_S3', 'SSE_KMS', 'CSE_KMS')
    - kms_key: KMS key ID for encryption
    - profile_name: AWS profile name for credentials
    - role_arn: IAM role ARN for assume role
    - role_session_name: Session name for assume role (includes timestamp)
    - external_id: External ID for assume role
    - serial_number: MFA serial number for assume role
    - duration_seconds: STS assume role duration in seconds (default: 3600)
    - converter: Type converter instance for result processing
    - formatter: Parameter formatter instance for query formatting
    - retry_config: Configuration for API retry logic
    - cursor_class: Cursor class to use for connections
    - cursor_kwargs: Additional keyword arguments for cursor initialization
    - **kwargs: Additional connection parameters (AWS credentials, etc.)
    
    Returns:
    Connection object with specified cursor type
    """

Connection Class

DB API 2.0 compliant connection object with cursor creation, transaction management, and resource cleanup.

class Connection[ConnectionCursor]:
    session: Session
    client: BaseClient
    retry_config: RetryConfig
    
    def cursor(self, cursor=None, **kwargs) -> ConnectionCursor:
        """
        Create a new cursor object using the connection.
        
        Parameters:
        - cursor: Cursor instance to configure (optional)
        - **kwargs: Additional cursor configuration options
        
        Returns:
        Cursor object of the connection's cursor type
        """
    
    def close(self) -> None:
        """
        Close the connection and release resources.
        """
    
    def commit(self) -> None:
        """
        Commit any pending transaction (no-op for Athena).
        """
    
    def rollback(self) -> None:
        """
        Rollback any pending transaction.
        
        Raises:
        NotSupportedError: Athena does not support transactions
        """
    
    def __enter__(self) -> Connection[ConnectionCursor]:
        """Context manager entry."""
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Context manager exit with automatic cleanup."""

Standard Cursor

Standard cursor for executing queries and fetching results as tuples, providing full DB API 2.0 compliance.

class Cursor:
    arraysize: int
    description: Optional[List[Tuple]]
    rowcount: int
    rownumber: Optional[int]
    
    def execute(self, operation: str, parameters=None, **kwargs) -> Cursor:
        """
        Execute a SQL statement.
        
        Parameters:
        - operation: SQL query string
        - parameters: Query parameters (dict or sequence)
        - **kwargs: Additional execution options
        
        Returns:
        Self for method chaining
        """
    
    def executemany(self, operation: str, seq_of_parameters, **kwargs) -> None:
        """
        Execute a SQL statement multiple times with different parameters.
        
        Parameters:
        - operation: SQL query string
        - seq_of_parameters: Sequence of parameter sets
        - **kwargs: Additional execution options
        """
    
    def fetchone(self) -> Optional[Tuple]:
        """
        Fetch the next row of a query result set.
        
        Returns:
        Single row as tuple or None if no more rows
        """
    
    def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
        """
        Fetch the next set of rows of a query result set.
        
        Parameters:
        - size: Number of rows to fetch (default: arraysize)
        
        Returns:
        List of rows as tuples
        """
    
    def fetchall(self) -> List[Tuple]:
        """
        Fetch all remaining rows of a query result set.
        
        Returns:
        List of all remaining rows as tuples
        """
    
    def cancel(self) -> None:
        """
        Cancel the currently executing query.
        """
    
    def close(self) -> None:
        """
        Close the cursor and free associated resources.
        """
    
    def __iter__(self) -> Iterator[Tuple]:
        """Iterator protocol support for row-by-row processing."""
    
    def __next__(self) -> Tuple:
        """Iterator protocol implementation."""

Dictionary Cursor

Cursor variant that returns query results as dictionaries with column names as keys.

class DictCursor(Cursor):
    def fetchone(self) -> Optional[Dict[str, Any]]:
        """
        Fetch the next row as a dictionary.
        
        Returns:
        Single row as dict with column names as keys, or None
        """
    
    def fetchmany(self, size: Optional[int] = None) -> List[Dict[str, Any]]:
        """
        Fetch multiple rows as dictionaries.
        
        Parameters:
        - size: Number of rows to fetch
        
        Returns:
        List of rows as dictionaries
        """
    
    def fetchall(self) -> List[Dict[str, Any]]:
        """
        Fetch all remaining rows as dictionaries.
        
        Returns:
        List of all remaining rows as dictionaries
        """

Usage Examples

Basic Connection and Query

from pyathena import connect

# Create connection
conn = connect(
    s3_staging_dir="s3://my-bucket/athena-results/",
    region_name="us-west-2",
    schema_name="default"
)

# Execute query
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as row_count FROM my_table")

# Get results
result = cursor.fetchone()
print(f"Row count: {result[0]}")

cursor.close()
conn.close()

Using Dictionary Cursor

from pyathena import connect
from pyathena.cursor import DictCursor

conn = connect(
    s3_staging_dir="s3://my-bucket/athena-results/",
    region_name="us-west-2",
    cursor_class=DictCursor
)

cursor = conn.cursor()
cursor.execute("SELECT name, age, city FROM users LIMIT 5")

# Results as dictionaries
for row in cursor.fetchall():
    print(f"Name: {row['name']}, Age: {row['age']}, City: {row['city']}")

cursor.close()
conn.close()

Parameterized Queries

from pyathena import connect

conn = connect(
    s3_staging_dir="s3://my-bucket/athena-results/",
    region_name="us-west-2"
)

cursor = conn.cursor()

# Using named parameters (recommended)
cursor.execute(
    "SELECT * FROM users WHERE age > %(min_age)s AND city = %(city)s",
    parameters={'min_age': 25, 'city': 'San Francisco'}
)

results = cursor.fetchall()
print(f"Found {len(results)} users")

cursor.close()
conn.close()

Context Manager Usage

from pyathena import connect

# Automatic resource cleanup
with connect(
    s3_staging_dir="s3://my-bucket/athena-results/",
    region_name="us-west-2"
) as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM my_table LIMIT 10")
        for row in cursor:  # Iterator support
            print(row)

Error Handling

PyAthena raises standard DB API 2.0 exceptions for different error conditions:

from pyathena import connect
from pyathena.error import OperationalError, ProgrammingError

try:
    conn = connect(s3_staging_dir="s3://my-bucket/results/")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM nonexistent_table")
except ProgrammingError as e:
    print(f"SQL Error: {e}")
except OperationalError as e:
    print(f"Execution Error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-pyathena

docs

arrow-integration.md

async-operations.md

core-database.md

index.md

pandas-integration.md

spark-integration.md

sqlalchemy-integration.md

tile.json