Python DB API 2.0 (PEP 249) client for Amazon Athena
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete SQLAlchemy dialect implementation with custom Athena types, enabling ORM support and integration with existing SQLAlchemy-based applications. Provides seamless database abstraction layer for Athena.
pip install PyAthena[SQLAlchemy]Multiple dialect implementations optimized for different use cases and result formats.
class AthenaDialect:
"""Base Athena dialect for standard SQLAlchemy operations."""
name: str = "awsathena"
class AthenaRestDialect(AthenaDialect):
"""REST API-based dialect for standard operations."""
name: str = "awsathena+rest"
class AthenaPandasDialect(AthenaDialect):
"""Pandas-optimized dialect for DataFrame operations."""
name: str = "awsathena+pandas"
class AthenaArrowDialect(AthenaDialect):
"""Arrow-optimized dialect for columnar operations."""
name: str = "awsathena+arrow"SQLAlchemy type implementations for Athena-specific data types.
class TINYINT(sqltypes.Integer):
"""Athena TINYINT type (8-bit integer)."""
class STRUCT(TypeEngine[Dict]):
"""Athena STRUCT type for nested objects."""
class MAP(TypeEngine[Dict]):
"""Athena MAP type for key-value pairs."""
class ARRAY(TypeEngine[List]):
"""Athena ARRAY type for ordered collections."""
class AthenaTimestamp(TypeEngine[datetime]):
"""Athena TIMESTAMP type with timezone support."""
class AthenaDate(TypeEngine[date]):
"""Athena DATE type."""Custom compilers for translating SQLAlchemy constructs to Athena SQL.
class AthenaStatementCompiler(SQLCompiler):
"""Compiles SQLAlchemy statements to Athena SQL."""
class AthenaDDLCompiler(DDLCompiler):
"""Compiles DDL statements for Athena."""
class AthenaTypeCompiler(GenericTypeCompiler):
"""Compiles SQLAlchemy types to Athena SQL types."""Classes for properly formatting identifiers in different SQL contexts.
class AthenaDMLIdentifierPreparer(IdentifierPreparer):
"""Prepares identifiers for DML statements."""
class AthenaDDLIdentifierPreparer(IdentifierPreparer):
"""Prepares identifiers for DDL statements."""from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# Create engine with Athena dialect
engine = create_engine(
"awsathena+rest://:@athena.us-west-2.amazonaws.com:443/default?"
"s3_staging_dir=s3://my-bucket/athena-results/"
)
# Test connection
with engine.connect() as conn:
result = conn.execute(text("SELECT 1 as test_column"))
print(result.fetchone())from sqlalchemy import create_engine
from urllib.parse import quote_plus
# Connection string with all parameters
connection_params = {
"aws_access_key_id": "YOUR_ACCESS_KEY",
"aws_secret_access_key": quote_plus("YOUR_SECRET_KEY"),
"region_name": "us-west-2",
"schema_name": "default",
"s3_staging_dir": quote_plus("s3://my-bucket/athena-results/"),
"work_group": "primary",
"catalog_name": "AwsDataCatalog"
}
# Build connection string
connection_string = (
f"awsathena+rest://{connection_params['aws_access_key_id']}:"
f"{connection_params['aws_secret_access_key']}@"
f"athena.{connection_params['region_name']}.amazonaws.com:443/"
f"{connection_params['schema_name']}?"
f"s3_staging_dir={connection_params['s3_staging_dir']}&"
f"work_group={connection_params['work_group']}&"
f"catalog_name={connection_params['catalog_name']}"
)
engine = create_engine(connection_string)from sqlalchemy import Column, Integer, String, DateTime, Numeric, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from pyathena.sqlalchemy.types import TINYINT, STRUCT, ARRAY, MAP
Base = declarative_base()
class Customer(Base):
__tablename__ = 'customers'
customer_id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
email = Column(String(255), unique=True)
age = Column(TINYINT) # Athena-specific type
is_active = Column(Boolean, default=True)
created_at = Column(DateTime)
total_spent = Column(Numeric(10, 2))
# Complex types
preferences = Column(MAP(String, String)) # Key-value preferences
order_history = Column(ARRAY(Integer)) # Array of order IDs
profile = Column(STRUCT([ # Nested structure
('address', String),
('phone', String),
('preferences', MAP(String, String))
]))
class Order(Base):
__tablename__ = 'orders'
order_id = Column(Integer, primary_key=True)
customer_id = Column(Integer, nullable=False)
order_date = Column(DateTime)
amount = Column(Numeric(10, 2))
status = Column(String(20))
items = Column(ARRAY(STRUCT([ # Array of structured items
('product_id', Integer),
('quantity', Integer),
('price', Numeric(8, 2))
])))
# Create engine and session
engine = create_engine("awsathena+rest://...")
Session = sessionmaker(bind=engine)
session = Session()from sqlalchemy import func, and_, or_
from datetime import datetime, timedelta
# Basic queries
active_customers = session.query(Customer).filter(Customer.is_active == True).all()
# Complex filtering
high_value_customers = session.query(Customer).filter(
and_(
Customer.total_spent > 1000,
Customer.is_active == True,
Customer.created_at > datetime.now() - timedelta(days=365)
)
).all()
# Aggregation queries
customer_stats = session.query(
func.count(Customer.customer_id).label('total_customers'),
func.avg(Customer.total_spent).label('avg_spent'),
func.max(Customer.total_spent).label('max_spent'),
func.min(Customer.age).label('min_age'),
func.max(Customer.age).label('max_age')
).first()
print(f"Total customers: {customer_stats.total_customers}")
print(f"Average spent: ${customer_stats.avg_spent:.2f}")
# Join queries
recent_orders = session.query(Customer, Order).join(
Order, Customer.customer_id == Order.customer_id
).filter(
Order.order_date > datetime.now() - timedelta(days=30)
).all()
# Group by queries
monthly_revenue = session.query(
func.date_format(Order.order_date, '%Y-%m').label('month'),
func.sum(Order.amount).label('total_revenue'),
func.count(Order.order_id).label('order_count')
).group_by(
func.date_format(Order.order_date, '%Y-%m')
).order_by('month').all()
for row in monthly_revenue:
print(f"Month: {row.month}, Revenue: ${row.total_revenue:.2f}, Orders: {row.order_count}")from sqlalchemy import text
# Query with complex type operations
complex_query = text("""
SELECT
customer_id,
name,
preferences['newsletter'] as newsletter_pref,
cardinality(order_history) as total_orders,
profile.address as customer_address
FROM customers
WHERE preferences['vip'] = 'true'
AND cardinality(order_history) > 5
""")
results = session.execute(complex_query).fetchall()
for row in results:
print(f"Customer: {row.name}, Address: {row.customer_address}, Orders: {row.total_orders}")
# Insert with complex types
new_customer = Customer(
customer_id=12345,
name="John Doe",
email="john@example.com",
age=35,
preferences={
'newsletter': 'true',
'vip': 'false',
'language': 'en'
},
order_history=[1001, 1002, 1003],
profile={
'address': '123 Main St, Anytown, USA',
'phone': '+1-555-0123',
'preferences': {
'contact_method': 'email',
'timezone': 'EST'
}
}
)
session.add(new_customer)
session.commit()from sqlalchemy import create_engine
import pandas as pd
# Use pandas dialect for DataFrame operations
engine = create_engine("awsathena+pandas://...")
# Read query results directly into DataFrame
df = pd.read_sql_query("""
SELECT
customer_id,
name,
total_spent,
age,
is_active
FROM customers
WHERE total_spent > 500
""", engine)
print(df.head())
print(f"DataFrame shape: {df.shape}")
# Use DataFrame for analysis
customer_analysis = df.groupby('is_active').agg({
'total_spent': ['mean', 'sum', 'count'],
'age': ['mean', 'min', 'max']
}).round(2)
print("Customer Analysis by Status:")
print(customer_analysis)
# Write DataFrame back to Athena (via S3)
# Note: This requires additional S3 write permissions
df_to_write = df[df['total_spent'] > 1000]
df_to_write.to_sql(
'high_value_customers',
engine,
if_exists='replace',
index=False,
method='multi' # Batch insert for better performance
)from sqlalchemy import create_engine
import pyarrow as pa
# Use Arrow dialect for columnar operations
engine = create_engine("awsathena+arrow://...")
# Execute query and get Arrow Table
with engine.connect() as conn:
result = conn.execute(text("""
SELECT
product_category,
COUNT(*) as order_count,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value
FROM orders
GROUP BY product_category
ORDER BY total_revenue DESC
"""))
# Convert to Arrow Table (if using arrow dialect)
arrow_table = pa.Table.from_pandas(result.fetchall())
# High-performance columnar operations
total_revenue = pa.compute.sum(arrow_table.column('total_revenue'))
print(f"Total revenue across all categories: ${total_revenue.as_py():,.2f}")from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime
from sqlalchemy.schema import CreateTable
engine = create_engine("awsathena+rest://...")
metadata = MetaData()
# Define table structure
analytics_table = Table(
'user_analytics',
metadata,
Column('user_id', Integer, primary_key=True),
Column('session_date', DateTime),
Column('page_views', Integer),
Column('session_duration', Integer),
Column('user_agent', String(500)),
Column('referrer', String(200))
)
# Generate CREATE TABLE statement
create_stmt = CreateTable(analytics_table)
print(str(create_stmt.compile(engine)))
# Create table in Athena
with engine.connect() as conn:
metadata.create_all(conn)
print("Table created successfully")
# Create external table pointing to S3 data
external_table_ddl = text("""
CREATE EXTERNAL TABLE IF NOT EXISTS web_logs (
timestamp string,
ip_address string,
user_agent string,
request_url string,
response_code int,
bytes_sent bigint
)
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://my-bucket/web-logs/'
TBLPROPERTIES ('has_encrypted_data'='false')
""")
with engine.connect() as conn:
conn.execute(external_table_ddl)
print("External table created")from sqlalchemy import create_engine, text, bindparam
from sqlalchemy.sql import and_, or_, func
engine = create_engine("awsathena+rest://...")
# Parameterized queries with SQLAlchemy
parameterized_query = text("""
SELECT
customer_id,
name,
total_spent,
CASE
WHEN total_spent > :high_threshold THEN 'High Value'
WHEN total_spent > :medium_threshold THEN 'Medium Value'
ELSE 'Low Value'
END as customer_segment
FROM customers
WHERE created_at >= :start_date
AND is_active = :active_status
ORDER BY total_spent DESC
LIMIT :limit_count
""").bindparam(
bindparam('high_threshold', Integer),
bindparam('medium_threshold', Integer),
bindparam('start_date', DateTime),
bindparam('active_status', Boolean),
bindparam('limit_count', Integer)
)
# Execute with parameters
with engine.connect() as conn:
result = conn.execute(parameterized_query, {
'high_threshold': 1000,
'medium_threshold': 500,
'start_date': datetime(2023, 1, 1),
'active_status': True,
'limit_count': 100
})
customers = result.fetchall()
for customer in customers:
print(f"{customer.name}: ${customer.total_spent:.2f} ({customer.customer_segment})")
# Window functions
window_query = text("""
SELECT
customer_id,
order_date,
amount,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date) as order_sequence,
SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total,
LAG(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) as prev_order_amount
FROM orders
WHERE order_date >= DATE '2023-01-01'
ORDER BY customer_id, order_date
""")
with engine.connect() as conn:
result = conn.execute(window_query)
for row in result:
print(f"Customer {row.customer_id}: Order {row.order_sequence}, "
f"Amount: ${row.amount:.2f}, Running Total: ${row.running_total:.2f}")from sqlalchemy import create_engine, pool
from sqlalchemy.pool import QueuePool
# Configure connection pooling
engine = create_engine(
"awsathena+rest://...",
poolclass=QueuePool,
pool_size=5, # Number of connections to maintain
max_overflow=10, # Additional connections allowed
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600, # Recycle connections after 1 hour
connect_args={
'poll_interval': 1,
'kill_on_interrupt': True
}
)
# Monitor connection pool
def check_pool_status():
pool = engine.pool
print(f"Pool size: {pool.size()}")
print(f"Checked out connections: {pool.checkedout()}")
print(f"Overflow connections: {pool.overflow()}")
# Use context managers for proper connection handling
with engine.connect() as conn:
result = conn.execute(text("SELECT COUNT(*) FROM large_table"))
count = result.scalar()
print(f"Table has {count} rows")
check_pool_status()from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
engine = create_engine("awsathena+rest://...")
# Note: Athena doesn't support traditional transactions
# But SQLAlchemy can handle connection-level operations
def safe_bulk_operation():
with engine.connect() as conn:
try:
# Multiple related operations
conn.execute(text("CREATE TABLE temp_results AS SELECT * FROM source_table WHERE condition = 1"))
conn.execute(text("""
INSERT INTO final_table
SELECT customer_id, SUM(amount) as total
FROM temp_results
GROUP BY customer_id
"""))
conn.execute(text("DROP TABLE temp_results"))
print("Bulk operation completed successfully")
except SQLAlchemyError as e:
print(f"Operation failed: {e}")
# Cleanup if needed
try:
conn.execute(text("DROP TABLE IF EXISTS temp_results"))
except:
pass
raise
safe_bulk_operation()awsathena+rest://aws_access_key_id:aws_secret_access_key@athena.region.amazonaws.com:443/schema_name?s3_staging_dir=s3://bucket/path/awsathena+rest://access_key:secret_key@athena.us-west-2.amazonaws.com:443/default?s3_staging_dir=s3://my-bucket/results/&work_group=primary&catalog_name=AwsDataCatalog®ion_name=us-west-2import os
from sqlalchemy import create_engine
# Set environment variables
os.environ['AWS_ACCESS_KEY_ID'] = 'your_access_key'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'your_secret_key'
os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'
# Simplified connection string
engine = create_engine(
"awsathena+rest://:@athena.us-west-2.amazonaws.com:443/default?"
"s3_staging_dir=s3://my-bucket/athena-results/"
)Each dialect supports the same SQL operations but optimizes result processing for different use cases.
Install with Tessl CLI
npx tessl i tessl/pypi-pyathena