Python DB API 2.0 (PEP 249) client for Amazon Athena
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Native PyArrow Table support for columnar data processing, providing optimal performance for analytical workloads and seamless integration with the Arrow ecosystem. Ideal for high-performance analytics and data interchange.
pip install PyAthena[Arrow]Cursor that returns query results as PyArrow Tables, providing columnar data format optimized for analytical operations and memory efficiency.
class ArrowCursor:
arraysize: int
description: Optional[List[Tuple]]
rowcount: int
def execute(self, operation: str, parameters=None, **kwargs) -> ArrowCursor:
"""
Execute a SQL statement with Arrow Table result processing.
Parameters:
- operation: SQL query string
- parameters: Query parameters (dict or sequence)
- **kwargs: Additional execution options
Returns:
Self for method chaining
"""
def fetchone(self) -> Optional[Table]:
"""
Fetch the next chunk as an Arrow Table.
Returns:
Arrow Table with single chunk or None if no more data
"""
def fetchmany(self, size: Optional[int] = None) -> Table:
"""
Fetch multiple rows as an Arrow Table.
Parameters:
- size: Number of rows to fetch (default: arraysize)
Returns:
Arrow Table containing the requested rows
"""
def fetchall(self) -> Table:
"""
Fetch all remaining rows as a single Arrow Table.
Returns:
Arrow Table containing all remaining rows
"""
def as_arrow(self) -> Table:
"""
Return results as Arrow Table.
Returns:
PyArrow Table with all query results
"""
def cancel(self) -> None:
"""Cancel the currently executing query."""
def close(self) -> None:
"""Close the cursor and free resources."""Asynchronous version of ArrowCursor for non-blocking operations with Future-based API.
class AsyncArrowCursor:
def execute(self, operation: str, parameters=None, **kwargs) -> Tuple[str, Future[Table]]:
"""
Execute query asynchronously returning query ID and Future.
Parameters:
- operation: SQL query string
- parameters: Query parameters
Returns:
Tuple of (query_id, Future[Table])
"""
def cancel(self, query_id: str) -> Future[None]:
"""Cancel query by ID asynchronously."""
def close(self, wait: bool = False) -> None:
"""Close cursor, optionally waiting for running queries."""Specialized result set class optimized for PyArrow Table creation with efficient columnar data processing.
class AthenaArrowResultSet:
def as_arrow(self) -> Table:
"""Convert result set to PyArrow Table."""
def fetchone_arrow(self) -> Optional[Table]:
"""Fetch single chunk as Arrow Table."""
def fetchmany_arrow(self, size: int) -> Table:
"""Fetch multiple rows as Arrow Table."""
def fetchall_arrow(self) -> Table:
"""Fetch all rows as Arrow Table."""from pyathena import connect
from pyathena.arrow.cursor import ArrowCursor
import pyarrow as pa
# Connect with Arrow cursor
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=ArrowCursor
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM sales_data WHERE year = 2023")
# Get results as Arrow Table
table = cursor.fetchall()
print(f"Table shape: {table.shape}")
print(f"Columns: {table.column_names}")
print(f"Schema: {table.schema}")
# Access column data
revenue_column = table.column('revenue')
print(f"Revenue column type: {revenue_column.type}")
print(f"Revenue sum: {pa.compute.sum(revenue_column)}")
cursor.close()
conn.close()from pyathena import connect
from pyathena.arrow.cursor import ArrowCursor
import pyarrow as pa
import pyarrow.compute as pc
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=ArrowCursor
)
cursor = conn.cursor()
# Complex analytical query
query = """
SELECT
product_id,
sale_date,
quantity,
unit_price,
quantity * unit_price as total_amount,
customer_segment
FROM sales_data
WHERE sale_date >= DATE '2023-01-01'
"""
cursor.execute(query)
table = cursor.fetchall()
# High-performance columnar operations
print("Performing columnar analytics...")
# Aggregations using Arrow compute functions
total_revenue = pc.sum(table.column('total_amount'))
avg_order_size = pc.mean(table.column('total_amount'))
max_quantity = pc.max(table.column('quantity'))
print(f"Total Revenue: ${total_revenue.as_py():,.2f}")
print(f"Average Order Size: ${avg_order_size.as_py():.2f}")
print(f"Max Quantity: {max_quantity.as_py()}")
# Group by operations
grouped = table.group_by('customer_segment').aggregate([
('total_amount', 'sum'),
('quantity', 'mean'),
('product_id', 'count')
])
print("\nRevenue by Customer Segment:")
for i in range(len(grouped)):
segment = grouped.column('customer_segment')[i].as_py()
revenue = grouped.column('total_amount_sum')[i].as_py()
print(f"{segment}: ${revenue:,.2f}")
cursor.close()
conn.close()from pyathena import connect
from pyathena.arrow.cursor import ArrowCursor
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
def process_sales_data():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=ArrowCursor
)
cursor = conn.cursor()
# Extract data
cursor.execute("""
SELECT
customer_id,
product_category,
sale_amount,
sale_date,
region
FROM sales_transactions
WHERE sale_date >= CURRENT_DATE - INTERVAL '30' DAY
""")
table = cursor.fetchall()
# Data transformations using Arrow
# Add computed columns
table = table.add_column(
len(table.column_names),
'month',
pc.month(table.column('sale_date'))
)
# Filter operations
high_value_sales = pc.filter(
table,
pc.greater(table.column('sale_amount'), pa.scalar(1000))
)
print(f"High value sales count: {len(high_value_sales)}")
# Export to various formats
# Parquet
pq.write_table(high_value_sales, 'high_value_sales.parquet')
# CSV
high_value_sales.to_pandas().to_csv('high_value_sales.csv', index=False)
# JSON
with open('high_value_sales.json', 'w') as f:
f.write(high_value_sales.to_pandas().to_json(orient='records'))
cursor.close()
conn.close()
return high_value_sales
# Process data
result_table = process_sales_data()
print(f"Processed {len(result_table)} high-value sales records")from pyathena import connect
from pyathena.arrow.cursor import ArrowCursor
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds
import pyarrow.flight as flight
# Data extraction from Athena
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=ArrowCursor
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM customer_analytics")
customer_table = cursor.fetchall()
# Create Arrow dataset for efficient querying
dataset = ds.InMemoryDataset(customer_table)
# Advanced filtering and projection
filtered_data = dataset.to_table(
filter=pc.and_(
pc.greater(ds.field('age'), pa.scalar(25)),
pc.equal(ds.field('active'), pa.scalar(True))
),
columns=['customer_id', 'age', 'total_spend', 'last_purchase_date']
)
print(f"Filtered customers: {len(filtered_data)}")
# Statistical analysis
stats = {
'mean_age': pc.mean(filtered_data.column('age')),
'mean_spend': pc.mean(filtered_data.column('total_spend')),
'total_customers': len(filtered_data)
}
for metric, value in stats.items():
print(f"{metric}: {value.as_py()}")
cursor.close()
conn.close()from pyathena import connect
from pyathena.arrow.cursor import ArrowCursor
import pyarrow as pa
def stream_process_large_dataset():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=ArrowCursor
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM large_transaction_table")
# Stream processing for memory efficiency
batch_size = 10000
total_processed = 0
running_sum = 0
while True:
batch = cursor.fetchmany(batch_size)
if len(batch) == 0:
break
# Process batch
batch_sum = pa.compute.sum(batch.column('amount')).as_py()
running_sum += batch_sum
total_processed += len(batch)
print(f"Processed {total_processed} rows, running sum: ${running_sum:,.2f}")
print(f"Final total: ${running_sum:,.2f} from {total_processed} transactions")
cursor.close()
conn.close()
stream_process_large_dataset()import asyncio
from pyathena import connect
from pyathena.arrow.async_cursor import AsyncArrowCursor
import pyarrow as pa
import pyarrow.compute as pc
async def concurrent_arrow_queries():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=AsyncArrowCursor
)
cursor = conn.cursor()
# Multiple analytical queries
queries = [
("daily_sales", "SELECT sale_date, SUM(amount) as daily_total FROM sales GROUP BY sale_date"),
("product_metrics", "SELECT product_id, COUNT(*) as sales_count FROM sales GROUP BY product_id"),
("customer_stats", "SELECT customer_segment, AVG(amount) as avg_spend FROM sales GROUP BY customer_segment")
]
# Execute all queries concurrently
futures = {}
for name, query in queries:
query_id, future = cursor.execute(query)
futures[name] = future
# Collect results
results = {}
for name, future in futures.items():
table = await future
results[name] = table
print(f"{name}: {len(table)} rows")
# Perform cross-table analytics
total_daily_sales = pc.sum(results['daily_sales'].column('daily_total'))
unique_products = len(results['product_metrics'])
print(f"Total sales across all days: ${total_daily_sales.as_py():,.2f}")
print(f"Unique products sold: {unique_products}")
cursor.close()
conn.close()
# Run async processing
asyncio.run(concurrent_arrow_queries())PyAthena maps Athena data types to appropriate Arrow types:
boolean → pa.bool_()tinyint → pa.int8()smallint → pa.int16()integer → pa.int32()bigint → pa.int64()real → pa.float32()double → pa.float64()decimal → pa.decimal128() or pa.decimal256()varchar, char → pa.string()date → pa.date32()timestamp → pa.timestamp('ns')array → pa.list_()map → pa.map_()row → pa.struct()Arrow integration provides several performance advantages:
PyAthena's Arrow support enables integration with:
Install with Tessl CLI
npx tessl i tessl/pypi-pyathena