CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-impyla

Python client for the Impala distributed query engine and HiveServer2 implementations

Pending
Overview
Eval results
Files

sqlalchemy-integration.mddocs/

SQLAlchemy Integration

SQLAlchemy dialect support for Impala, enabling ORM and core SQLAlchemy functionality with Impala and Hive backends. This integration allows developers to use familiar SQLAlchemy patterns for database operations.

Capabilities

Dialect Classes

SQLAlchemy dialects that provide the interface between SQLAlchemy and Impala/Hive.

class ImpalaDialect:
    """
    SQLAlchemy dialect for Impala.
    
    Provides SQLAlchemy integration for Impala databases, supporting
    core SQLAlchemy functionality including table reflection, query
    construction, and result handling.
    
    Attributes:
        name (str): Dialect name 'impala'
        driver (str): Driver name 'impyla'
        supports_alter (bool): Whether ALTER statements are supported
        max_identifier_length (int): Maximum identifier length (128)
        supports_sane_rowcount (bool): Whether rowcount is reliable
        supports_sane_multi_rowcount (bool): Whether multi-statement rowcount works
        supports_native_decimal (bool): Whether native decimal is supported
        default_schema_name (str): Default schema name
        supports_default_values (bool): Whether DEFAULT values are supported
        supports_sequences (bool): Whether sequences are supported
        sequences_optional (bool): Whether sequences are optional
        preexecute_autoincrement_sequences (bool): Autoincrement sequence behavior
        postfetch_lastrowid (bool): Whether to fetch last row ID after insert
    """

class Impala4Dialect(ImpalaDialect):
    """
    SQLAlchemy dialect for Impala 4.x.
    
    Specialized dialect for Impala 4.x versions with enhanced
    features and optimizations specific to newer Impala releases.
    Inherits from ImpalaDialect with additional features.
    """

Custom SQL Types

Impala-specific SQL types for proper data type mapping in SQLAlchemy.

class TINYINT:
    """
    Impala TINYINT data type.
    
    Represents Impala's TINYINT type (8-bit signed integer) in SQLAlchemy.
    Maps to Python int with range validation.
    """

class INT:
    """
    Impala INT data type.
    
    Represents Impala's INT type (32-bit signed integer) in SQLAlchemy.
    Maps to Python int.
    """

class DOUBLE:
    """
    Impala DOUBLE data type.
    
    Represents Impala's DOUBLE type (64-bit floating point) in SQLAlchemy.
    Maps to Python float.
    """

class STRING:
    """
    Impala STRING data type.
    
    Represents Impala's STRING type (variable-length string) in SQLAlchemy.
    Maps to Python str.
    """

Dialect Support Classes

Supporting classes for the Impala SQLAlchemy dialect implementation.

class ImpalaDDLCompiler:
    """
    DDL compiler for Impala dialect.
    
    Handles compilation of Data Definition Language (DDL) statements
    specific to Impala's SQL syntax and capabilities.
    """

class ImpalaTypeCompiler:
    """
    Type compiler for Impala dialect.
    
    Handles compilation of SQLAlchemy types to Impala-specific
    type representations in SQL statements.
    """

class Impala4TypeCompiler(ImpalaTypeCompiler):
    """
    Type compiler for Impala 4.x dialect.
    
    Enhanced type compiler with support for Impala 4.x specific
    data types and type representations.
    """

class ImpalaIdentifierPreparer:
    """
    Identifier preparer for Impala dialect.
    
    Handles proper quoting and escaping of identifiers (table names,
    column names, etc.) according to Impala's identifier rules.
    """

class ImpalaExecutionContext:
    """
    Execution context for Impala dialect.
    
    Manages the execution context for SQLAlchemy operations,
    including parameter binding and result processing.
    """

Usage Examples

Basic SQLAlchemy Engine Setup

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

# Create engine using impyla dialect
engine = create_engine(
    'impala://impala-host:21050/default',
    echo=True  # Enable SQL logging
)

# Test connection
with engine.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    version = result.fetchone()
    print(f"Connected to: {version[0]}")

Authentication with SQLAlchemy

from sqlalchemy import create_engine

# Kerberos authentication
engine = create_engine(
    'impala://impala-host:21050/default',
    connect_args={
        'auth_mechanism': 'GSSAPI',
        'kerberos_service_name': 'impala'
    }
)

# LDAP authentication
engine = create_engine(
    'impala://username:password@impala-host:21050/default',
    connect_args={
        'auth_mechanism': 'LDAP'
    }
)

# SSL connection
engine = create_engine(
    'impala://impala-host:21050/default',
    connect_args={
        'use_ssl': True,
        'ca_cert': '/path/to/ca-cert.pem'
    }
)

Table Reflection and Metadata

from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.orm import sessionmaker

engine = create_engine('impala://impala-host:21050/sales_db')

# Reflect existing tables
metadata = MetaData()
metadata.reflect(bind=engine)

# Access reflected tables
customers = metadata.tables['customers']
orders = metadata.tables['orders']

print("Customers table columns:")
for column in customers.columns:
    print(f"  {column.name}: {column.type}")

# Query using reflected table
with engine.connect() as conn:
    # Select all customers
    result = conn.execute(customers.select().limit(10))
    for row in result:
        print(row)

ORM Usage with Impala

from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime

Base = declarative_base()

class Customer(Base):
    __tablename__ = 'customers'
    
    customer_id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)
    created_at = Column(DateTime)

class Order(Base):
    __tablename__ = 'orders'
    
    order_id = Column(Integer, primary_key=True)
    customer_id = Column(Integer)
    order_date = Column(DateTime)
    total_amount = Column(Float)
    status = Column(String)

# Setup
engine = create_engine('impala://impala-host:21050/ecommerce')
Session = sessionmaker(bind=engine)
session = Session()

# Query using ORM
recent_customers = session.query(Customer).filter(
    Customer.created_at >= datetime(2023, 1, 1)
).limit(10).all()

for customer in recent_customers:
    print(f"Customer: {customer.name} ({customer.email})")

# Aggregate queries
from sqlalchemy import func

monthly_sales = session.query(
    func.date_trunc('month', Order.order_date).label('month'),
    func.sum(Order.total_amount).label('total_sales'),
    func.count(Order.order_id).label('order_count')
).group_by(
    func.date_trunc('month', Order.order_date)
).order_by('month').all()

for month, sales, count in monthly_sales:
    print(f"{month}: ${sales:,.2f} ({count} orders)")

session.close()

Core SQLAlchemy Queries

from sqlalchemy import create_engine, text, select, func, and_, or_
from sqlalchemy import MetaData, Table

engine = create_engine('impala://impala-host:21050/analytics')

# Reflect tables
metadata = MetaData()
metadata.reflect(bind=engine)
sales = metadata.tables['sales']
products = metadata.tables['products']

with engine.connect() as conn:
    # Complex query with joins
    query = select([
        products.c.category,
        func.sum(sales.c.amount).label('total_sales'),
        func.count(sales.c.sale_id).label('sale_count'),
        func.avg(sales.c.amount).label('avg_sale')
    ]).select_from(
        sales.join(products, sales.c.product_id == products.c.product_id)
    ).where(
        and_(
            sales.c.sale_date >= '2023-01-01',
            products.c.category.in_(['Electronics', 'Clothing', 'Books'])
        )
    ).group_by(
        products.c.category
    ).order_by(
        func.sum(sales.c.amount).desc()
    )
    
    result = conn.execute(query)
    
    print("Sales by Category:")
    for row in result:
        print(f"{row.category}: ${row.total_sales:,.2f} "
              f"({row.sale_count} sales, avg: ${row.avg_sale:.2f})")

Custom Types Usage

from sqlalchemy import create_engine, Column, Table, MetaData
from impala.sqlalchemy import TINYINT, INT, DOUBLE, STRING

engine = create_engine('impala://impala-host:21050/test_db')
metadata = MetaData()

# Define table with Impala-specific types
sensor_data = Table('sensor_readings', metadata,
    Column('sensor_id', INT, primary_key=True),
    Column('reading_type', TINYINT),  # 0-255 range
    Column('temperature', DOUBLE),
    Column('location', STRING),
    Column('notes', STRING)
)

# Create table (if it doesn't exist)
with engine.connect() as conn:
    # Note: CREATE TABLE IF NOT EXISTS may need to be done manually
    # as Impala has specific syntax requirements
    
    # Insert data using the defined types
    conn.execute(sensor_data.insert().values([
        {'sensor_id': 1, 'reading_type': 1, 'temperature': 23.5, 
         'location': 'Building A', 'notes': 'Normal reading'},
        {'sensor_id': 2, 'reading_type': 2, 'temperature': 25.1, 
         'location': 'Building B', 'notes': 'Slightly elevated'}
    ]))
    
    # Query back the data
    result = conn.execute(sensor_data.select())
    for row in result:
        print(f"Sensor {row.sensor_id}: {row.temperature}°C at {row.location}")

Advanced Features and Optimizations

from sqlalchemy import create_engine, text
from sqlalchemy.pool import StaticPool

# Connection pooling configuration
engine = create_engine(
    'impala://impala-host:21050/warehouse',
    poolclass=StaticPool,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,  # Validate connections
    connect_args={
        'timeout': 60,
        'retries': 3
    }
)

# Partition-aware queries
with engine.connect() as conn:
    # Query with partition pruning
    partitioned_query = text("""
        SELECT 
            product_category,
            SUM(sales_amount) as total_sales
        FROM sales_partitioned 
        WHERE 
            year = :year 
            AND month = :month
        GROUP BY product_category
    """)
    
    result = conn.execute(partitioned_query, year=2023, month=6)
    
    for row in result:
        print(f"{row.product_category}: ${row.total_sales:,.2f}")

# Bulk operations
def bulk_insert_with_sqlalchemy(engine, table_name, data_rows):
    """Efficient bulk insert using SQLAlchemy."""
    
    metadata = MetaData()
    metadata.reflect(bind=engine)
    table = metadata.tables[table_name]
    
    with engine.connect() as conn:
        # Use bulk insert for better performance
        conn.execute(table.insert(), data_rows)
        print(f"Inserted {len(data_rows)} rows into {table_name}")

# Usage
sample_data = [
    {'product_id': 1, 'name': 'Widget A', 'price': 19.99},
    {'product_id': 2, 'name': 'Widget B', 'price': 24.99},
    {'product_id': 3, 'name': 'Widget C', 'price': 29.99}
]

bulk_insert_with_sqlalchemy(engine, 'products', sample_data)

Connection URI Examples

# Basic connection
engine = create_engine('impala://hostname:21050/database')

# With authentication
engine = create_engine('impala://user:pass@hostname:21050/database')

# With SSL
engine = create_engine('impala://hostname:21050/database?use_ssl=true')

# HTTP transport
engine = create_engine(
    'impala://hostname:28000/database',
    connect_args={
        'use_http_transport': True,
        'http_path': 'cliservice'
    }
)

# Multiple connection parameters
engine = create_engine(
    'impala://hostname:21050/database',
    connect_args={
        'auth_mechanism': 'GSSAPI',
        'kerberos_service_name': 'impala',
        'use_ssl': True,
        'timeout': 120
    }
)

Install with Tessl CLI

npx tessl i tessl/pypi-impyla

docs

core-database-api.md

data-utilities.md

error-handling.md

index.md

sqlalchemy-integration.md

tile.json