Python DB API 2.0 (PEP 249) client for Amazon Athena
npx @tessl/cli install tessl/pypi-pyathena@3.17.0A comprehensive Python DB API 2.0 (PEP 249) client for Amazon Athena that provides high-performance SQL query execution against data stored in Amazon S3. PyAthena offers multiple cursor types optimized for different data processing workflows, including pandas DataFrames, PyArrow Tables, and asynchronous operations, making it an essential tool for data engineers and analysts working with AWS data lakes.
pip install PyAthenapip install PyAthena[SQLAlchemy]pip install PyAthena[Pandas]pip install PyAthena[Arrow]pip install PyAthena[fastparquet]import pyathena
from pyathena import connectFor specialized cursors:
from pyathena.pandas.cursor import PandasCursor
from pyathena.arrow.cursor import ArrowCursor
from pyathena.async_cursor import AsyncCursor
from pyathena.spark.cursor import SparkCursorfrom pyathena import connect
# Connect to Athena
conn = connect(
s3_staging_dir="s3://your-bucket/results/",
region_name="us-west-2"
)
# Execute query with standard cursor
cursor = conn.cursor()
cursor.execute("SELECT * FROM your_table LIMIT 10")
# Get column information
print(cursor.description)
# Fetch results
rows = cursor.fetchall()
for row in rows:
print(row)
# Close connection
conn.close()PyAthena is built around a flexible cursor architecture supporting multiple result formats:
This design enables PyAthena to serve both traditional database applications and modern data science workflows while maintaining DB API 2.0 compliance.
Standard DB API 2.0 compliant database operations including connection management, query execution, result fetching, and transaction handling. Provides both tuple and dictionary result formats.
def connect(
s3_staging_dir: Optional[str] = None,
region_name: Optional[str] = None,
schema_name: Optional[str] = "default",
catalog_name: Optional[str] = "awsdatacatalog",
work_group: Optional[str] = None,
poll_interval: float = 1,
encryption_option: Optional[str] = None,
kms_key: Optional[str] = None,
profile_name: Optional[str] = None,
role_arn: Optional[str] = None,
role_session_name: str = "PyAthena-session-{timestamp}",
external_id: Optional[str] = None,
serial_number: Optional[str] = None,
duration_seconds: int = 3600,
converter: Optional[Converter] = None,
formatter: Optional[Formatter] = None,
retry_config: Optional[RetryConfig] = None,
cursor_class: Optional[Type[ConnectionCursor]] = None,
cursor_kwargs: Optional[Dict[str, Any]] = None,
kill_on_interrupt: bool = True,
session: Optional[Session] = None,
config: Optional[Config] = None,
result_reuse_enable: bool = False,
result_reuse_minutes: int = 1440,
on_start_query_execution: Optional[Callable[[str], None]] = None,
**kwargs
) -> Connection[ConnectionCursor]: ...
class Connection[ConnectionCursor]:
session: Session
client: BaseClient
retry_config: RetryConfig
s3_staging_dir: Optional[str]
region_name: Optional[str]
schema_name: Optional[str]
catalog_name: Optional[str]
work_group: Optional[str]
poll_interval: float
encryption_option: Optional[str]
kms_key: Optional[str]
def cursor(self, cursor: Optional[Type[FunctionalCursor]] = None, **kwargs) -> Union[FunctionalCursor, ConnectionCursor]: ...
def close(self) -> None: ...
def commit(self) -> None: ...
def rollback(self) -> None: ...
def __enter__(self) -> Connection[ConnectionCursor]: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
class Cursor:
arraysize: int
description: Optional[List[Tuple[str, str, None, None, int, int, str]]]
rowcount: int
rownumber: Optional[int]
query_id: Optional[str]
result_set: Optional[AthenaResultSet]
def execute(self, operation: str, parameters: Optional[Union[Dict[str, Any], List[str]]] = None, work_group: Optional[str] = None, s3_staging_dir: Optional[str] = None, cache_size: int = 0, cache_expiration_time: int = 0, result_reuse_enable: Optional[bool] = None, result_reuse_minutes: Optional[int] = None, paramstyle: Optional[str] = None, on_start_query_execution: Optional[Callable[[str], None]] = None, **kwargs) -> Cursor: ...
def executemany(self, operation: str, seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], **kwargs) -> None: ...
def fetchone(self) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
def fetchmany(self, size: Optional[int] = None) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
def fetchall(self) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
def cancel(self) -> None: ...
def close(self) -> None: ...
def __iter__(self) -> Iterator[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
def __next__(self) -> Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]: ...
class DictCursor(Cursor):
"""Cursor that returns results as dictionaries with column names as keys."""
dict_type: Type[Dict] = dictHigh-performance integration with pandas DataFrames, enabling direct query result processing as DataFrames with support for chunked processing of large datasets.
class PandasCursor:
arraysize: int
description: Optional[List[Tuple]]
rowcount: int
query_id: Optional[str]
result_set: Optional[AthenaPandasResultSet]
def execute(self, operation: str, parameters: Optional[Union[Dict[str, Any], List[str]]] = None, work_group: Optional[str] = None, s3_staging_dir: Optional[str] = None, cache_size: Optional[int] = 0, cache_expiration_time: Optional[int] = 0, result_reuse_enable: Optional[bool] = None, result_reuse_minutes: Optional[int] = None, paramstyle: Optional[str] = None, keep_default_na: bool = False, na_values: Optional[Iterable[str]] = ("",), quoting: int = 1, on_start_query_execution: Optional[Callable[[str], None]] = None, **kwargs) -> PandasCursor: ...
def executemany(self, operation: str, seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], **kwargs) -> None: ...
def fetchone(self) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
def fetchmany(self, size: Optional[int] = None) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
def fetchall(self) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
def as_pandas(self) -> Union[DataFrame, DataFrameIterator]: ...
def iter_chunks(self) -> Generator[DataFrame, None, None]: ...
def cancel(self) -> None: ...
def close(self) -> None: ...Native PyArrow Table support for columnar data processing, providing optimal performance for analytical workloads and seamless integration with the Arrow ecosystem.
class ArrowCursor:
def execute(self, operation: str, parameters=None, **kwargs) -> ArrowCursor: ...
def fetchall(self) -> Table: ...
def as_arrow(self) -> Table: ...Full async/await support with Future-based API for non-blocking query execution, enabling concurrent query processing and integration with async frameworks.
class AsyncCursor:
def execute(self, operation: str, parameters=None, **kwargs) -> Tuple[str, Future[AthenaResultSet]]: ...
def cancel(self, query_id: str) -> Future[None]: ...
def poll(self, query_id: str) -> Future[AthenaQueryExecution]: ...
def close(self, wait: bool = False) -> None: ...Integration with Athena's Spark execution engine for distributed processing, Jupyter notebook compatibility, and advanced analytics workloads.
class SparkCursor:
def execute(self, code: str, **kwargs) -> SparkCursor: ...
@property
def session_id(self) -> str: ...
@property
def calculation_id(self) -> Optional[str]: ...
@property
def state(self) -> Optional[str]: ...Complete SQLAlchemy dialect implementation with custom Athena types, enabling ORM support and integration with existing SQLAlchemy-based applications.
class AthenaDialect: ...
class AthenaRestDialect: ...
class AthenaPandasDialect: ...
class AthenaArrowDialect: ...
# Custom Athena types
class TINYINT: ...
class STRUCT: ...
class MAP: ...
class ARRAY: ...apilevel: str = "2.0"
threadsafety: int = 2
paramstyle: str = "pyformat"
# Type objects for DB API 2.0
STRING: DBAPITypeObject
BINARY: DBAPITypeObject
BOOLEAN: DBAPITypeObject
NUMBER: DBAPITypeObject
DATE: DBAPITypeObject
TIME: DBAPITypeObject
DATETIME: DBAPITypeObject
JSON: DBAPITypeObjectConnectionCursor = TypeVar("ConnectionCursor", bound=BaseCursor)
FunctionalCursor = TypeVar("FunctionalCursor", bound=BaseCursor)
class Connection[ConnectionCursor]:
session: Session
client: BaseClient
retry_config: RetryConfig
class Converter:
"""Base converter class for data type conversion."""
class Formatter:
"""Base formatter class for parameter formatting."""
class DefaultParameterFormatter(Formatter):
"""Default parameter formatter implementation."""
class RetryConfig:
"""Configuration for API retry logic."""
def __init__(
self,
exceptions: Iterable[str] = ("ThrottlingException", "TooManyRequestsException"),
attempt: int = 5,
multiplier: int = 1,
max_delay: int = 100,
exponential_base: int = 2
): ...
class BaseCursor:
"""Base cursor class with common functionality."""
class AthenaResultSet:
"""Result set wrapper for Athena query results."""class Error(Exception): ...
class Warning(Exception): ...
class InterfaceError(Error): ...
class DatabaseError(Error): ...
class InternalError(DatabaseError): ...
class OperationalError(DatabaseError): ...
class ProgrammingError(DatabaseError): ...
class DataError(DatabaseError): ...
class NotSupportedError(DatabaseError): ...class AthenaQueryExecution:
# State constants
STATE_QUEUED: str = "QUEUED"
STATE_RUNNING: str = "RUNNING"
STATE_SUCCEEDED: str = "SUCCEEDED"
STATE_FAILED: str = "FAILED"
STATE_CANCELLED: str = "CANCELLED"
class RetryConfig:
def __init__(
self,
attempts: int = 3,
delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0
): ...