or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

arrow-integration.mdasync-operations.mdcore-database.mdindex.mdpandas-integration.mdspark-integration.mdsqlalchemy-integration.md
tile.json

tessl/pypi-pyathena

Python DB API 2.0 (PEP 249) client for Amazon Athena

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pyathena@3.17.x

To install, run

npx @tessl/cli install tessl/pypi-pyathena@3.17.0

index.mddocs/

PyAthena

A comprehensive Python DB API 2.0 (PEP 249) client for Amazon Athena that provides high-performance SQL query execution against data stored in Amazon S3. PyAthena offers multiple cursor types optimized for different data processing workflows, including pandas DataFrames, PyArrow Tables, and asynchronous operations, making it an essential tool for data engineers and analysts working with AWS data lakes.

Package Information

  • Package Name: PyAthena
  • Language: Python
  • Installation: pip install PyAthena
  • Extra Packages:
    • SQLAlchemy: pip install PyAthena[SQLAlchemy]
    • Pandas: pip install PyAthena[Pandas]
    • Arrow: pip install PyAthena[Arrow]
    • fastparquet: pip install PyAthena[fastparquet]

Core Imports

import pyathena
from pyathena import connect

For specialized cursors:

from pyathena.pandas.cursor import PandasCursor
from pyathena.arrow.cursor import ArrowCursor
from pyathena.async_cursor import AsyncCursor
from pyathena.spark.cursor import SparkCursor

Basic Usage

from pyathena import connect

# Connect to Athena
conn = connect(
    s3_staging_dir="s3://your-bucket/results/",
    region_name="us-west-2"
)

# Execute query with standard cursor
cursor = conn.cursor()
cursor.execute("SELECT * FROM your_table LIMIT 10")

# Get column information
print(cursor.description)

# Fetch results
rows = cursor.fetchall()
for row in rows:
    print(row)

# Close connection
conn.close()

Architecture

PyAthena is built around a flexible cursor architecture supporting multiple result formats:

  • Connection Management: Single connection class with pluggable cursor types
  • Cursor Variants: Standard, Dictionary, Pandas, Arrow, Async, and Spark cursors
  • Result Processing: Memory-efficient streaming and chunked processing for large datasets
  • AWS Integration: Native boto3 integration with retry logic and authentication
  • SQLAlchemy Support: Complete dialect implementation with custom Athena types

This design enables PyAthena to serve both traditional database applications and modern data science workflows while maintaining DB API 2.0 compliance.

Capabilities

Core Database Operations

Standard DB API 2.0 compliant database operations including connection management, query execution, result fetching, and transaction handling. Provides both tuple and dictionary result formats.

def connect(
    s3_staging_dir: Optional[str] = None,
    region_name: Optional[str] = None,
    schema_name: Optional[str] = "default",
    catalog_name: Optional[str] = "awsdatacatalog",
    work_group: Optional[str] = None,
    poll_interval: float = 1,
    encryption_option: Optional[str] = None,
    kms_key: Optional[str] = None,
    profile_name: Optional[str] = None,
    role_arn: Optional[str] = None,
    role_session_name: str = "PyAthena-session-{timestamp}",
    external_id: Optional[str] = None,
    serial_number: Optional[str] = None,
    duration_seconds: int = 3600,
    converter: Optional[Converter] = None,
    formatter: Optional[Formatter] = None,
    retry_config: Optional[RetryConfig] = None,
    cursor_class: Optional[Type[ConnectionCursor]] = None,
    cursor_kwargs: Optional[Dict[str, Any]] = None,
    kill_on_interrupt: bool = True,
    session: Optional[Session] = None,
    config: Optional[Config] = None,
    result_reuse_enable: bool = False,
    result_reuse_minutes: int = 1440,
    on_start_query_execution: Optional[Callable[[str], None]] = None,
    **kwargs
) -> Connection[ConnectionCursor]: ...

class Connection[ConnectionCursor]:
    session: Session
    client: BaseClient
    retry_config: RetryConfig
    s3_staging_dir: Optional[str]
    region_name: Optional[str]
    schema_name: Optional[str]
    catalog_name: Optional[str] 
    work_group: Optional[str]
    poll_interval: float
    encryption_option: Optional[str]
    kms_key: Optional[str]
    
    def cursor(self, cursor: Optional[Type[FunctionalCursor]] = None, **kwargs) -> Union[FunctionalCursor, ConnectionCursor]: ...
    def close(self) -> None: ...
    def commit(self) -> None: ...
    def rollback(self) -> None: ...
    def __enter__(self) -> Connection[ConnectionCursor]: ...
    def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

class Cursor:
    arraysize: int
    description: Optional[List[Tuple[str, str, None, None, int, int, str]]]
    rowcount: int
    rownumber: Optional[int]
    query_id: Optional[str]
    result_set: Optional[AthenaResultSet]
    
    def execute(self, operation: str, parameters: Optional[Union[Dict[str, Any], List[str]]] = None, work_group: Optional[str] = None, s3_staging_dir: Optional[str] = None, cache_size: int = 0, cache_expiration_time: int = 0, result_reuse_enable: Optional[bool] = None, result_reuse_minutes: Optional[int] = None, paramstyle: Optional[str] = None, on_start_query_execution: Optional[Callable[[str], None]] = None, **kwargs) -> Cursor: ...
    def executemany(self, operation: str, seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], **kwargs) -> None: ...
    def fetchone(self) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
    def fetchmany(self, size: Optional[int] = None) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
    def fetchall(self) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
    def cancel(self) -> None: ...
    def close(self) -> None: ...
    def __iter__(self) -> Iterator[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
    def __next__(self) -> Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]: ...

class DictCursor(Cursor):
    """Cursor that returns results as dictionaries with column names as keys."""
    dict_type: Type[Dict] = dict

Core Database Operations

Pandas Integration

High-performance integration with pandas DataFrames, enabling direct query result processing as DataFrames with support for chunked processing of large datasets.

class PandasCursor:
    arraysize: int
    description: Optional[List[Tuple]]
    rowcount: int
    query_id: Optional[str]
    result_set: Optional[AthenaPandasResultSet]
    
    def execute(self, operation: str, parameters: Optional[Union[Dict[str, Any], List[str]]] = None, work_group: Optional[str] = None, s3_staging_dir: Optional[str] = None, cache_size: Optional[int] = 0, cache_expiration_time: Optional[int] = 0, result_reuse_enable: Optional[bool] = None, result_reuse_minutes: Optional[int] = None, paramstyle: Optional[str] = None, keep_default_na: bool = False, na_values: Optional[Iterable[str]] = ("",), quoting: int = 1, on_start_query_execution: Optional[Callable[[str], None]] = None, **kwargs) -> PandasCursor: ...
    def executemany(self, operation: str, seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], **kwargs) -> None: ...
    def fetchone(self) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
    def fetchmany(self, size: Optional[int] = None) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
    def fetchall(self) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
    def as_pandas(self) -> Union[DataFrame, DataFrameIterator]: ...
    def iter_chunks(self) -> Generator[DataFrame, None, None]: ...
    def cancel(self) -> None: ...
    def close(self) -> None: ...

Pandas Integration

PyArrow Integration

Native PyArrow Table support for columnar data processing, providing optimal performance for analytical workloads and seamless integration with the Arrow ecosystem.

class ArrowCursor:
    def execute(self, operation: str, parameters=None, **kwargs) -> ArrowCursor: ...
    def fetchall(self) -> Table: ...
    def as_arrow(self) -> Table: ...

PyArrow Integration

Asynchronous Operations

Full async/await support with Future-based API for non-blocking query execution, enabling concurrent query processing and integration with async frameworks.

class AsyncCursor:
    def execute(self, operation: str, parameters=None, **kwargs) -> Tuple[str, Future[AthenaResultSet]]: ...
    def cancel(self, query_id: str) -> Future[None]: ...
    def poll(self, query_id: str) -> Future[AthenaQueryExecution]: ...
    def close(self, wait: bool = False) -> None: ...

Asynchronous Operations

Spark Integration

Integration with Athena's Spark execution engine for distributed processing, Jupyter notebook compatibility, and advanced analytics workloads.

class SparkCursor:
    def execute(self, code: str, **kwargs) -> SparkCursor: ...
    @property
    def session_id(self) -> str: ...
    @property
    def calculation_id(self) -> Optional[str]: ...
    @property
    def state(self) -> Optional[str]: ...

Spark Integration

SQLAlchemy Integration

Complete SQLAlchemy dialect implementation with custom Athena types, enabling ORM support and integration with existing SQLAlchemy-based applications.

class AthenaDialect: ...
class AthenaRestDialect: ...
class AthenaPandasDialect: ...
class AthenaArrowDialect: ...

# Custom Athena types
class TINYINT: ...
class STRUCT: ...
class MAP: ...
class ARRAY: ...

SQLAlchemy Integration

Types

DB API Constants

apilevel: str = "2.0"
threadsafety: int = 2
paramstyle: str = "pyformat"

# Type objects for DB API 2.0
STRING: DBAPITypeObject
BINARY: DBAPITypeObject
BOOLEAN: DBAPITypeObject
NUMBER: DBAPITypeObject
DATE: DBAPITypeObject
TIME: DBAPITypeObject
DATETIME: DBAPITypeObject
JSON: DBAPITypeObject

Core Connection Types

ConnectionCursor = TypeVar("ConnectionCursor", bound=BaseCursor)
FunctionalCursor = TypeVar("FunctionalCursor", bound=BaseCursor)

class Connection[ConnectionCursor]:
    session: Session
    client: BaseClient
    retry_config: RetryConfig

class Converter:
    """Base converter class for data type conversion."""

class Formatter:
    """Base formatter class for parameter formatting."""

class DefaultParameterFormatter(Formatter):
    """Default parameter formatter implementation."""

class RetryConfig:
    """Configuration for API retry logic."""
    def __init__(
        self,
        exceptions: Iterable[str] = ("ThrottlingException", "TooManyRequestsException"),
        attempt: int = 5,
        multiplier: int = 1,
        max_delay: int = 100,
        exponential_base: int = 2
    ): ...

class BaseCursor:
    """Base cursor class with common functionality."""

class AthenaResultSet:
    """Result set wrapper for Athena query results."""

Exception Hierarchy

class Error(Exception): ...
class Warning(Exception): ...
class InterfaceError(Error): ...
class DatabaseError(Error): ...
class InternalError(DatabaseError): ...
class OperationalError(DatabaseError): ...
class ProgrammingError(DatabaseError): ...
class DataError(DatabaseError): ...
class NotSupportedError(DatabaseError): ...

Query Execution Models

class AthenaQueryExecution:
    # State constants
    STATE_QUEUED: str = "QUEUED"
    STATE_RUNNING: str = "RUNNING"
    STATE_SUCCEEDED: str = "SUCCEEDED"
    STATE_FAILED: str = "FAILED"
    STATE_CANCELLED: str = "CANCELLED"

class RetryConfig:
    def __init__(
        self,
        attempts: int = 3,
        delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0
    ): ...