Python DB API 2.0 (PEP 249) client for Amazon Athena
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Standard DB API 2.0 compliant database operations for connecting to Amazon Athena, executing SQL queries, and processing results. Provides both tuple and dictionary result formats with full cursor functionality.
Create and manage connections to Amazon Athena with comprehensive configuration options for AWS authentication, S3 staging, and query execution parameters.
def connect(
s3_staging_dir: Optional[str] = None,
region_name: Optional[str] = None,
schema_name: Optional[str] = "default",
catalog_name: Optional[str] = "awsdatacatalog",
work_group: Optional[str] = None,
poll_interval: float = 1,
encryption_option: Optional[str] = None,
kms_key: Optional[str] = None,
profile_name: Optional[str] = None,
role_arn: Optional[str] = None,
role_session_name: str = "PyAthena-session-{timestamp}",
external_id: Optional[str] = None,
serial_number: Optional[str] = None,
duration_seconds: int = 3600,
converter: Optional[Converter] = None,
formatter: Optional[Formatter] = None,
retry_config: Optional[RetryConfig] = None,
cursor_class: Optional[Type[ConnectionCursor]] = None,
cursor_kwargs: Optional[Dict[str, Any]] = None,
**kwargs
) -> Connection[ConnectionCursor]:
"""
Create a connection to Amazon Athena.
Parameters:
- s3_staging_dir: S3 location for query results (required for most operations)
- region_name: AWS region name (e.g., 'us-west-2')
- schema_name: Default database/schema name (default: 'default')
- catalog_name: Data catalog name (default: 'awsdatacatalog')
- work_group: Athena workgroup name
- poll_interval: Query polling interval in seconds (default: 1)
- encryption_option: S3 encryption option ('SSE_S3', 'SSE_KMS', 'CSE_KMS')
- kms_key: KMS key ID for encryption
- profile_name: AWS profile name for credentials
- role_arn: IAM role ARN for assume role
- role_session_name: Session name for assume role (includes timestamp)
- external_id: External ID for assume role
- serial_number: MFA serial number for assume role
- duration_seconds: STS assume role duration in seconds (default: 3600)
- converter: Type converter instance for result processing
- formatter: Parameter formatter instance for query formatting
- retry_config: Configuration for API retry logic
- cursor_class: Cursor class to use for connections
- cursor_kwargs: Additional keyword arguments for cursor initialization
- **kwargs: Additional connection parameters (AWS credentials, etc.)
Returns:
Connection object with specified cursor type
"""DB API 2.0 compliant connection object with cursor creation, transaction management, and resource cleanup.
class Connection[ConnectionCursor]:
session: Session
client: BaseClient
retry_config: RetryConfig
def cursor(self, cursor=None, **kwargs) -> ConnectionCursor:
"""
Create a new cursor object using the connection.
Parameters:
- cursor: Cursor instance to configure (optional)
- **kwargs: Additional cursor configuration options
Returns:
Cursor object of the connection's cursor type
"""
def close(self) -> None:
"""
Close the connection and release resources.
"""
def commit(self) -> None:
"""
Commit any pending transaction (no-op for Athena).
"""
def rollback(self) -> None:
"""
Rollback any pending transaction.
Raises:
NotSupportedError: Athena does not support transactions
"""
def __enter__(self) -> Connection[ConnectionCursor]:
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit with automatic cleanup."""Standard cursor for executing queries and fetching results as tuples, providing full DB API 2.0 compliance.
class Cursor:
arraysize: int
description: Optional[List[Tuple]]
rowcount: int
rownumber: Optional[int]
def execute(self, operation: str, parameters=None, **kwargs) -> Cursor:
"""
Execute a SQL statement.
Parameters:
- operation: SQL query string
- parameters: Query parameters (dict or sequence)
- **kwargs: Additional execution options
Returns:
Self for method chaining
"""
def executemany(self, operation: str, seq_of_parameters, **kwargs) -> None:
"""
Execute a SQL statement multiple times with different parameters.
Parameters:
- operation: SQL query string
- seq_of_parameters: Sequence of parameter sets
- **kwargs: Additional execution options
"""
def fetchone(self) -> Optional[Tuple]:
"""
Fetch the next row of a query result set.
Returns:
Single row as tuple or None if no more rows
"""
def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
"""
Fetch the next set of rows of a query result set.
Parameters:
- size: Number of rows to fetch (default: arraysize)
Returns:
List of rows as tuples
"""
def fetchall(self) -> List[Tuple]:
"""
Fetch all remaining rows of a query result set.
Returns:
List of all remaining rows as tuples
"""
def cancel(self) -> None:
"""
Cancel the currently executing query.
"""
def close(self) -> None:
"""
Close the cursor and free associated resources.
"""
def __iter__(self) -> Iterator[Tuple]:
"""Iterator protocol support for row-by-row processing."""
def __next__(self) -> Tuple:
"""Iterator protocol implementation."""Cursor variant that returns query results as dictionaries with column names as keys.
class DictCursor(Cursor):
def fetchone(self) -> Optional[Dict[str, Any]]:
"""
Fetch the next row as a dictionary.
Returns:
Single row as dict with column names as keys, or None
"""
def fetchmany(self, size: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Fetch multiple rows as dictionaries.
Parameters:
- size: Number of rows to fetch
Returns:
List of rows as dictionaries
"""
def fetchall(self) -> List[Dict[str, Any]]:
"""
Fetch all remaining rows as dictionaries.
Returns:
List of all remaining rows as dictionaries
"""from pyathena import connect
# Create connection
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
schema_name="default"
)
# Execute query
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as row_count FROM my_table")
# Get results
result = cursor.fetchone()
print(f"Row count: {result[0]}")
cursor.close()
conn.close()from pyathena import connect
from pyathena.cursor import DictCursor
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=DictCursor
)
cursor = conn.cursor()
cursor.execute("SELECT name, age, city FROM users LIMIT 5")
# Results as dictionaries
for row in cursor.fetchall():
print(f"Name: {row['name']}, Age: {row['age']}, City: {row['city']}")
cursor.close()
conn.close()from pyathena import connect
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2"
)
cursor = conn.cursor()
# Using named parameters (recommended)
cursor.execute(
"SELECT * FROM users WHERE age > %(min_age)s AND city = %(city)s",
parameters={'min_age': 25, 'city': 'San Francisco'}
)
results = cursor.fetchall()
print(f"Found {len(results)} users")
cursor.close()
conn.close()from pyathena import connect
# Automatic resource cleanup
with connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2"
) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM my_table LIMIT 10")
for row in cursor: # Iterator support
print(row)PyAthena raises standard DB API 2.0 exceptions for different error conditions:
from pyathena import connect
from pyathena.error import OperationalError, ProgrammingError
try:
conn = connect(s3_staging_dir="s3://my-bucket/results/")
cursor = conn.cursor()
cursor.execute("SELECT * FROM nonexistent_table")
except ProgrammingError as e:
print(f"SQL Error: {e}")
except OperationalError as e:
print(f"Execution Error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-pyathena