Python DB API 2.0 (PEP 249) client for Amazon Athena
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
High-performance integration with pandas DataFrames, enabling direct query result processing as DataFrames with support for chunked processing of large datasets. Ideal for data analysis workflows and scientific computing applications.
pip install PyAthena[Pandas]Cursor that returns query results as pandas DataFrames instead of tuples or dictionaries, with full support for Athena's data types and automatic type conversion.
class PandasCursor:
arraysize: int
description: Optional[List[Tuple]]
rowcount: int
rownumber: Optional[int]
query_id: Optional[str]
result_set: Optional[AthenaPandasResultSet]
def execute(
self,
operation: str,
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
work_group: Optional[str] = None,
s3_staging_dir: Optional[str] = None,
cache_size: Optional[int] = 0,
cache_expiration_time: Optional[int] = 0,
result_reuse_enable: Optional[bool] = None,
result_reuse_minutes: Optional[int] = None,
paramstyle: Optional[str] = None,
keep_default_na: bool = False,
na_values: Optional[Iterable[str]] = ("",),
quoting: int = 1,
on_start_query_execution: Optional[Callable[[str], None]] = None,
**kwargs
) -> PandasCursor:
"""
Execute a SQL statement with DataFrame result processing.
Parameters:
- operation: SQL query string
- parameters: Query parameters (dict or sequence)
- work_group: Athena workgroup for execution
- s3_staging_dir: S3 location for query results
- cache_size: Query result cache size
- cache_expiration_time: Cache expiration time in seconds
- result_reuse_enable: Enable query result reuse
- result_reuse_minutes: Result reuse duration in minutes
- paramstyle: Parameter substitution style
- keep_default_na: Keep pandas default NA values
- na_values: Additional NA values to recognize
- quoting: CSV quoting behavior (QUOTE_MINIMAL=0, QUOTE_ALL=1, etc.)
- on_start_query_execution: Callback for query start
- **kwargs: Additional arguments passed to pandas.read_csv
Returns:
Self for method chaining
"""
def executemany(
self,
operation: str,
seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]],
**kwargs
) -> None:
"""Execute a SQL statement multiple times with different parameters."""
def fetchone(self) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
"""
Fetch the next row from result set (standard DB API method).
Returns:
Single row as tuple/dict or None if no more rows
"""
def fetchmany(self, size: Optional[int] = None) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
"""
Fetch multiple rows from result set (standard DB API method).
Parameters:
- size: Number of rows to fetch (default: arraysize)
Returns:
List of rows as tuples/dicts
"""
def fetchall(self) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
"""
Fetch all remaining rows from result set (standard DB API method).
Returns:
List of all remaining rows as tuples/dicts
"""
def as_pandas(self) -> Union[DataFrame, DataFrameIterator]:
"""
Return results as pandas DataFrame or iterator for large datasets.
Returns:
DataFrame for small results or when chunksize=None,
DataFrameIterator for chunked processing when chunksize is set
"""
def iter_chunks(self) -> Generator[DataFrame, None, None]:
"""
Iterate over result set in DataFrame chunks for memory-efficient processing.
Chunking behavior depends on cursor configuration:
- If chunksize is explicitly set, uses that value
- If auto_optimize_chunksize=True and chunksize=None, determines optimal chunksize
- If auto_optimize_chunksize=False and chunksize=None, yields entire DataFrame
Yields:
DataFrame chunks when chunking is enabled, or entire DataFrame as single chunk
"""
def cancel(self) -> None:
"""Cancel the currently executing query."""
def close(self) -> None:
"""Close the cursor and free associated resources."""Memory-efficient iterator for processing large query results in chunks without loading entire result set into memory.
class DataFrameIterator:
def __iter__(self) -> Iterator[DataFrame]:
"""Iterator protocol for DataFrame chunks."""
def __next__(self) -> DataFrame:
"""Get next DataFrame chunk."""
def to_pandas(self) -> DataFrame:
"""
Concatenate all chunks into single DataFrame.
Warning: May consume large amounts of memory for big datasets.
Returns:
Single DataFrame containing all data
"""Specialized result set class optimized for pandas DataFrame creation with efficient type conversion and memory management.
class AthenaPandasResultSet:
def as_pandas(self) -> DataFrame:
"""Convert result set to pandas DataFrame."""
def fetchone_pandas(self) -> Optional[DataFrame]:
"""Fetch single row as DataFrame."""
def fetchmany_pandas(self, size: int) -> DataFrame:
"""Fetch multiple rows as DataFrame."""
def fetchall_pandas(self) -> DataFrame:
"""Fetch all rows as DataFrame."""from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
import pandas as pd
# Connect with pandas cursor
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=PandasCursor
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM sales_data WHERE year = 2023")
# Get results as DataFrame
df = cursor.fetchall()
print(df.head())
print(f"Shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
cursor.close()
conn.close()from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=PandasCursor
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM large_table")
# Process in chunks to manage memory
total_rows = 0
for chunk in cursor.iter_chunks():
# Process each chunk
print(f"Processing chunk with {len(chunk)} rows")
# Example: calculate statistics per chunk
chunk_stats = chunk.describe()
print(chunk_stats)
total_rows += len(chunk)
print(f"Total rows processed: {total_rows}")
cursor.close()
conn.close()from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
import pandas as pd
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=PandasCursor
)
cursor = conn.cursor()
# Complex analytical query
query = """
SELECT
customer_id,
product_category,
SUM(amount) as total_amount,
COUNT(*) as transaction_count,
AVG(amount) as avg_amount
FROM transactions
WHERE date_column >= DATE '2023-01-01'
GROUP BY customer_id, product_category
ORDER BY total_amount DESC
"""
cursor.execute(query)
df = cursor.fetchall()
# Pandas operations on query results
# Pivot table
pivot_df = df.pivot_table(
values='total_amount',
index='customer_id',
columns='product_category',
fill_value=0
)
# Statistical analysis
correlation_matrix = df[['total_amount', 'transaction_count', 'avg_amount']].corr()
print("Correlation Matrix:")
print(correlation_matrix)
# Data visualization preparation
top_customers = df.nlargest(10, 'total_amount')
print("Top 10 Customers:")
print(top_customers[['customer_id', 'total_amount']])
cursor.close()
conn.close()import asyncio
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor
async def async_query_example():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=AsyncPandasCursor
)
cursor = conn.cursor()
# Execute multiple queries concurrently
queries = [
"SELECT COUNT(*) as count FROM table1",
"SELECT AVG(amount) as avg_amount FROM table2",
"SELECT MAX(date_column) as max_date FROM table3"
]
# Start all queries
futures = []
for query in queries:
query_id, future = cursor.execute(query)
futures.append(future)
# Wait for all results
results = await asyncio.gather(*futures)
for i, df in enumerate(results):
print(f"Query {i+1} result:")
print(df)
cursor.close()
conn.close()
# Run async example
asyncio.run(async_query_example())from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
import pandas as pd
def process_large_dataset():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=PandasCursor
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM very_large_table")
# Use iterator to avoid loading entire dataset
results_summary = {
'total_rows': 0,
'total_revenue': 0,
'unique_customers': set()
}
# Process chunk by chunk
for chunk_df in cursor.iter_chunks():
results_summary['total_rows'] += len(chunk_df)
results_summary['total_revenue'] += chunk_df['revenue'].sum()
results_summary['unique_customers'].update(chunk_df['customer_id'].unique())
# Optional: save intermediate results
# chunk_df.to_parquet(f'chunk_{chunk_number}.parquet')
# Final summary
results_summary['unique_customers'] = len(results_summary['unique_customers'])
print("Dataset Summary:")
for key, value in results_summary.items():
print(f"{key}: {value}")
cursor.close()
conn.close()
process_large_dataset()from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
# Data extraction for ML pipeline
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=PandasCursor
)
cursor = conn.cursor()
# Feature engineering query
feature_query = """
SELECT
customer_id,
age,
income,
account_balance,
num_transactions_last_month,
avg_transaction_amount,
days_since_last_transaction,
CASE WHEN churned = 'yes' THEN 1 ELSE 0 END as target
FROM customer_features
WHERE feature_date = CURRENT_DATE - INTERVAL '1' DAY
"""
cursor.execute(feature_query)
df = cursor.fetchall()
# Prepare features for machine learning
feature_columns = [
'age', 'income', 'account_balance',
'num_transactions_last_month', 'avg_transaction_amount',
'days_since_last_transaction'
]
X = df[feature_columns]
y = df['target']
# Preprocessing
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
X_scaled, y, test_size=0.2, random_state=42
)
print(f"Training set shape: {X_train.shape}")
print(f"Test set shape: {X_test.shape}")
print(f"Feature columns: {feature_columns}")
cursor.close()
conn.close()PyAthena automatically converts Athena data types to appropriate pandas dtypes:
boolean → booltinyint, smallint, integer, bigint → int64real, double, float → float64decimal → float64 or object (for high precision)varchar, char → object (string)date → datetime64[ns] (date only)timestamp → datetime64[ns]array, map, row → object (complex types as Python objects)iter_chunks()) for datasets that may not fit in memoryarraysize for optimal chunk sizesInstall with Tessl CLI
npx tessl i tessl/pypi-pyathena