CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyathena

Python DB API 2.0 (PEP 249) client for Amazon Athena

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sqlalchemy-integration.mddocs/

SQLAlchemy Integration

Complete SQLAlchemy dialect implementation with custom Athena types, enabling ORM support and integration with existing SQLAlchemy-based applications. Provides seamless database abstraction layer for Athena.

Installation

pip install PyAthena[SQLAlchemy]

Capabilities

Athena Dialects

Multiple dialect implementations optimized for different use cases and result formats.

class AthenaDialect:
    """Base Athena dialect for standard SQLAlchemy operations."""
    name: str = "awsathena"
    
class AthenaRestDialect(AthenaDialect):
    """REST API-based dialect for standard operations."""
    name: str = "awsathena+rest"
    
class AthenaPandasDialect(AthenaDialect):
    """Pandas-optimized dialect for DataFrame operations.""" 
    name: str = "awsathena+pandas"
    
class AthenaArrowDialect(AthenaDialect):
    """Arrow-optimized dialect for columnar operations."""
    name: str = "awsathena+arrow"

Custom Athena Types

SQLAlchemy type implementations for Athena-specific data types.

class TINYINT(sqltypes.Integer):
    """Athena TINYINT type (8-bit integer)."""
    
class STRUCT(TypeEngine[Dict]):
    """Athena STRUCT type for nested objects."""
    
class MAP(TypeEngine[Dict]):
    """Athena MAP type for key-value pairs."""
    
class ARRAY(TypeEngine[List]):
    """Athena ARRAY type for ordered collections."""
    
class AthenaTimestamp(TypeEngine[datetime]):
    """Athena TIMESTAMP type with timezone support."""
    
class AthenaDate(TypeEngine[date]):
    """Athena DATE type."""

SQL Compilation

Custom compilers for translating SQLAlchemy constructs to Athena SQL.

class AthenaStatementCompiler(SQLCompiler):
    """Compiles SQLAlchemy statements to Athena SQL."""
    
class AthenaDDLCompiler(DDLCompiler):
    """Compiles DDL statements for Athena."""
    
class AthenaTypeCompiler(GenericTypeCompiler):
    """Compiles SQLAlchemy types to Athena SQL types."""

Identifier Preparation

Classes for properly formatting identifiers in different SQL contexts.

class AthenaDMLIdentifierPreparer(IdentifierPreparer):
    """Prepares identifiers for DML statements."""
    
class AthenaDDLIdentifierPreparer(IdentifierPreparer):
    """Prepares identifiers for DDL statements."""

Usage Examples

Basic SQLAlchemy Connection

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

# Create engine with Athena dialect
engine = create_engine(
    "awsathena+rest://:@athena.us-west-2.amazonaws.com:443/default?"
    "s3_staging_dir=s3://my-bucket/athena-results/"
)

# Test connection
with engine.connect() as conn:
    result = conn.execute(text("SELECT 1 as test_column"))
    print(result.fetchone())

Advanced Connection Configuration

from sqlalchemy import create_engine
from urllib.parse import quote_plus

# Connection string with all parameters
connection_params = {
    "aws_access_key_id": "YOUR_ACCESS_KEY",
    "aws_secret_access_key": quote_plus("YOUR_SECRET_KEY"),
    "region_name": "us-west-2",
    "schema_name": "default",
    "s3_staging_dir": quote_plus("s3://my-bucket/athena-results/"),
    "work_group": "primary",
    "catalog_name": "AwsDataCatalog"
}

# Build connection string
connection_string = (
    f"awsathena+rest://{connection_params['aws_access_key_id']}:"
    f"{connection_params['aws_secret_access_key']}@"
    f"athena.{connection_params['region_name']}.amazonaws.com:443/"
    f"{connection_params['schema_name']}?"
    f"s3_staging_dir={connection_params['s3_staging_dir']}&"
    f"work_group={connection_params['work_group']}&"
    f"catalog_name={connection_params['catalog_name']}"
)

engine = create_engine(connection_string)

ORM Model Definition

from sqlalchemy import Column, Integer, String, DateTime, Numeric, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from pyathena.sqlalchemy.types import TINYINT, STRUCT, ARRAY, MAP

Base = declarative_base()

class Customer(Base):
    __tablename__ = 'customers'
    
    customer_id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    email = Column(String(255), unique=True)
    age = Column(TINYINT)  # Athena-specific type
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime)
    total_spent = Column(Numeric(10, 2))
    
    # Complex types
    preferences = Column(MAP(String, String))  # Key-value preferences
    order_history = Column(ARRAY(Integer))     # Array of order IDs
    profile = Column(STRUCT([                  # Nested structure
        ('address', String),
        ('phone', String),
        ('preferences', MAP(String, String))
    ]))

class Order(Base):
    __tablename__ = 'orders'
    
    order_id = Column(Integer, primary_key=True)
    customer_id = Column(Integer, nullable=False)
    order_date = Column(DateTime)
    amount = Column(Numeric(10, 2))
    status = Column(String(20))
    items = Column(ARRAY(STRUCT([           # Array of structured items
        ('product_id', Integer),
        ('quantity', Integer),
        ('price', Numeric(8, 2))
    ])))

# Create engine and session
engine = create_engine("awsathena+rest://...")
Session = sessionmaker(bind=engine)
session = Session()

ORM Queries

from sqlalchemy import func, and_, or_
from datetime import datetime, timedelta

# Basic queries
active_customers = session.query(Customer).filter(Customer.is_active == True).all()

# Complex filtering
high_value_customers = session.query(Customer).filter(
    and_(
        Customer.total_spent > 1000,
        Customer.is_active == True,
        Customer.created_at > datetime.now() - timedelta(days=365)
    )
).all()

# Aggregation queries
customer_stats = session.query(
    func.count(Customer.customer_id).label('total_customers'),
    func.avg(Customer.total_spent).label('avg_spent'),
    func.max(Customer.total_spent).label('max_spent'),
    func.min(Customer.age).label('min_age'),
    func.max(Customer.age).label('max_age')
).first()

print(f"Total customers: {customer_stats.total_customers}")
print(f"Average spent: ${customer_stats.avg_spent:.2f}")

# Join queries
recent_orders = session.query(Customer, Order).join(
    Order, Customer.customer_id == Order.customer_id
).filter(
    Order.order_date > datetime.now() - timedelta(days=30)
).all()

# Group by queries
monthly_revenue = session.query(
    func.date_format(Order.order_date, '%Y-%m').label('month'),
    func.sum(Order.amount).label('total_revenue'),
    func.count(Order.order_id).label('order_count')
).group_by(
    func.date_format(Order.order_date, '%Y-%m')
).order_by('month').all()

for row in monthly_revenue:
    print(f"Month: {row.month}, Revenue: ${row.total_revenue:.2f}, Orders: {row.order_count}")

Working with Complex Types

from sqlalchemy import text

# Query with complex type operations
complex_query = text("""
SELECT 
    customer_id,
    name,
    preferences['newsletter'] as newsletter_pref,
    cardinality(order_history) as total_orders,
    profile.address as customer_address
FROM customers
WHERE preferences['vip'] = 'true'
  AND cardinality(order_history) > 5
""")

results = session.execute(complex_query).fetchall()
for row in results:
    print(f"Customer: {row.name}, Address: {row.customer_address}, Orders: {row.total_orders}")

# Insert with complex types
new_customer = Customer(
    customer_id=12345,
    name="John Doe", 
    email="john@example.com",
    age=35,
    preferences={
        'newsletter': 'true',
        'vip': 'false',
        'language': 'en'
    },
    order_history=[1001, 1002, 1003],
    profile={
        'address': '123 Main St, Anytown, USA',
        'phone': '+1-555-0123',
        'preferences': {
            'contact_method': 'email',
            'timezone': 'EST'
        }
    }
)

session.add(new_customer)
session.commit()

Pandas Integration with SQLAlchemy

from sqlalchemy import create_engine
import pandas as pd

# Use pandas dialect for DataFrame operations
engine = create_engine("awsathena+pandas://...")

# Read query results directly into DataFrame
df = pd.read_sql_query("""
SELECT 
    customer_id,
    name,
    total_spent,
    age,
    is_active
FROM customers
WHERE total_spent > 500
""", engine)

print(df.head())
print(f"DataFrame shape: {df.shape}")

# Use DataFrame for analysis
customer_analysis = df.groupby('is_active').agg({
    'total_spent': ['mean', 'sum', 'count'],
    'age': ['mean', 'min', 'max']
}).round(2)

print("Customer Analysis by Status:")
print(customer_analysis)

# Write DataFrame back to Athena (via S3)
# Note: This requires additional S3 write permissions
df_to_write = df[df['total_spent'] > 1000]
df_to_write.to_sql(
    'high_value_customers',
    engine,
    if_exists='replace',
    index=False,
    method='multi'  # Batch insert for better performance
)

Arrow Integration with SQLAlchemy

from sqlalchemy import create_engine
import pyarrow as pa

# Use Arrow dialect for columnar operations
engine = create_engine("awsathena+arrow://...")

# Execute query and get Arrow Table
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT 
            product_category,
            COUNT(*) as order_count,
            SUM(amount) as total_revenue,
            AVG(amount) as avg_order_value
        FROM orders
        GROUP BY product_category
        ORDER BY total_revenue DESC
    """))
    
    # Convert to Arrow Table (if using arrow dialect)
    arrow_table = pa.Table.from_pandas(result.fetchall())
    
    # High-performance columnar operations
    total_revenue = pa.compute.sum(arrow_table.column('total_revenue'))
    print(f"Total revenue across all categories: ${total_revenue.as_py():,.2f}")

DDL Operations

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime
from sqlalchemy.schema import CreateTable

engine = create_engine("awsathena+rest://...")
metadata = MetaData()

# Define table structure
analytics_table = Table(
    'user_analytics',
    metadata,
    Column('user_id', Integer, primary_key=True),
    Column('session_date', DateTime),
    Column('page_views', Integer),
    Column('session_duration', Integer),
    Column('user_agent', String(500)),
    Column('referrer', String(200))
)

# Generate CREATE TABLE statement
create_stmt = CreateTable(analytics_table)
print(str(create_stmt.compile(engine)))

# Create table in Athena
with engine.connect() as conn:
    metadata.create_all(conn)
    print("Table created successfully")

# Create external table pointing to S3 data
external_table_ddl = text("""
CREATE EXTERNAL TABLE IF NOT EXISTS web_logs (
    timestamp string,
    ip_address string,
    user_agent string,
    request_url string,
    response_code int,
    bytes_sent bigint
)
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://my-bucket/web-logs/'
TBLPROPERTIES ('has_encrypted_data'='false')
""")

with engine.connect() as conn:
    conn.execute(external_table_ddl)
    print("External table created")

Advanced Query Patterns

from sqlalchemy import create_engine, text, bindparam
from sqlalchemy.sql import and_, or_, func

engine = create_engine("awsathena+rest://...")

# Parameterized queries with SQLAlchemy
parameterized_query = text("""
SELECT 
    customer_id,
    name,
    total_spent,
    CASE 
        WHEN total_spent > :high_threshold THEN 'High Value'
        WHEN total_spent > :medium_threshold THEN 'Medium Value'
        ELSE 'Low Value'
    END as customer_segment
FROM customers
WHERE created_at >= :start_date
  AND is_active = :active_status
ORDER BY total_spent DESC
LIMIT :limit_count
""").bindparam(
    bindparam('high_threshold', Integer),
    bindparam('medium_threshold', Integer),
    bindparam('start_date', DateTime),
    bindparam('active_status', Boolean),
    bindparam('limit_count', Integer)
)

# Execute with parameters
with engine.connect() as conn:
    result = conn.execute(parameterized_query, {
        'high_threshold': 1000,
        'medium_threshold': 500,
        'start_date': datetime(2023, 1, 1),
        'active_status': True,
        'limit_count': 100
    })
    
    customers = result.fetchall()
    for customer in customers:
        print(f"{customer.name}: ${customer.total_spent:.2f} ({customer.customer_segment})")

# Window functions
window_query = text("""
SELECT 
    customer_id,
    order_date,
    amount,
    ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date) as order_sequence,
    SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date 
                     ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total,
    LAG(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) as prev_order_amount
FROM orders
WHERE order_date >= DATE '2023-01-01'
ORDER BY customer_id, order_date
""")

with engine.connect() as conn:
    result = conn.execute(window_query)
    for row in result:
        print(f"Customer {row.customer_id}: Order {row.order_sequence}, "
              f"Amount: ${row.amount:.2f}, Running Total: ${row.running_total:.2f}")

Connection Pooling and Performance

from sqlalchemy import create_engine, pool
from sqlalchemy.pool import QueuePool

# Configure connection pooling
engine = create_engine(
    "awsathena+rest://...",
    poolclass=QueuePool,
    pool_size=5,          # Number of connections to maintain
    max_overflow=10,      # Additional connections allowed
    pool_pre_ping=True,   # Verify connections before use
    pool_recycle=3600,    # Recycle connections after 1 hour
    connect_args={
        'poll_interval': 1,
        'kill_on_interrupt': True
    }
)

# Monitor connection pool
def check_pool_status():
    pool = engine.pool
    print(f"Pool size: {pool.size()}")
    print(f"Checked out connections: {pool.checkedout()}")
    print(f"Overflow connections: {pool.overflow()}")

# Use context managers for proper connection handling
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM large_table"))
    count = result.scalar()
    print(f"Table has {count} rows")

check_pool_status()

Transaction Handling

from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

engine = create_engine("awsathena+rest://...")

# Note: Athena doesn't support traditional transactions
# But SQLAlchemy can handle connection-level operations

def safe_bulk_operation():
    with engine.connect() as conn:
        try:
            # Multiple related operations
            conn.execute(text("CREATE TABLE temp_results AS SELECT * FROM source_table WHERE condition = 1"))
            
            conn.execute(text("""
                INSERT INTO final_table 
                SELECT customer_id, SUM(amount) as total
                FROM temp_results 
                GROUP BY customer_id
            """))
            
            conn.execute(text("DROP TABLE temp_results"))
            
            print("Bulk operation completed successfully")
            
        except SQLAlchemyError as e:
            print(f"Operation failed: {e}")
            # Cleanup if needed
            try:
                conn.execute(text("DROP TABLE IF EXISTS temp_results"))
            except:
                pass
            raise

safe_bulk_operation()

Connection String Formats

Basic Connection String

awsathena+rest://aws_access_key_id:aws_secret_access_key@athena.region.amazonaws.com:443/schema_name?s3_staging_dir=s3://bucket/path/

With All Parameters

awsathena+rest://access_key:secret_key@athena.us-west-2.amazonaws.com:443/default?s3_staging_dir=s3://my-bucket/results/&work_group=primary&catalog_name=AwsDataCatalog&region_name=us-west-2

Using Environment Variables

import os
from sqlalchemy import create_engine

# Set environment variables
os.environ['AWS_ACCESS_KEY_ID'] = 'your_access_key'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'your_secret_key'
os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'

# Simplified connection string
engine = create_engine(
    "awsathena+rest://:@athena.us-west-2.amazonaws.com:443/default?"
    "s3_staging_dir=s3://my-bucket/athena-results/"
)

Dialect-Specific Features

  • Base Dialect: Standard SQLAlchemy operations with tuple results
  • REST Dialect: Optimized for general-purpose queries
  • Pandas Dialect: Automatic DataFrame conversion for analytical queries
  • Arrow Dialect: Columnar processing for high-performance analytics

Each dialect supports the same SQL operations but optimizes result processing for different use cases.

Install with Tessl CLI

npx tessl i tessl/pypi-pyathena

docs

arrow-integration.md

async-operations.md

core-database.md

index.md

pandas-integration.md

spark-integration.md

sqlalchemy-integration.md

tile.json