Python client for the Impala distributed query engine and HiveServer2 implementations
—
SQLAlchemy dialect support for Impala, enabling ORM and core SQLAlchemy functionality with Impala and Hive backends. This integration allows developers to use familiar SQLAlchemy patterns for database operations.
SQLAlchemy dialects that provide the interface between SQLAlchemy and Impala/Hive.
class ImpalaDialect:
"""
SQLAlchemy dialect for Impala.
Provides SQLAlchemy integration for Impala databases, supporting
core SQLAlchemy functionality including table reflection, query
construction, and result handling.
Attributes:
name (str): Dialect name 'impala'
driver (str): Driver name 'impyla'
supports_alter (bool): Whether ALTER statements are supported
max_identifier_length (int): Maximum identifier length (128)
supports_sane_rowcount (bool): Whether rowcount is reliable
supports_sane_multi_rowcount (bool): Whether multi-statement rowcount works
supports_native_decimal (bool): Whether native decimal is supported
default_schema_name (str): Default schema name
supports_default_values (bool): Whether DEFAULT values are supported
supports_sequences (bool): Whether sequences are supported
sequences_optional (bool): Whether sequences are optional
preexecute_autoincrement_sequences (bool): Autoincrement sequence behavior
postfetch_lastrowid (bool): Whether to fetch last row ID after insert
"""
class Impala4Dialect(ImpalaDialect):
"""
SQLAlchemy dialect for Impala 4.x.
Specialized dialect for Impala 4.x versions with enhanced
features and optimizations specific to newer Impala releases.
Inherits from ImpalaDialect with additional features.
"""Impala-specific SQL types for proper data type mapping in SQLAlchemy.
class TINYINT:
"""
Impala TINYINT data type.
Represents Impala's TINYINT type (8-bit signed integer) in SQLAlchemy.
Maps to Python int with range validation.
"""
class INT:
"""
Impala INT data type.
Represents Impala's INT type (32-bit signed integer) in SQLAlchemy.
Maps to Python int.
"""
class DOUBLE:
"""
Impala DOUBLE data type.
Represents Impala's DOUBLE type (64-bit floating point) in SQLAlchemy.
Maps to Python float.
"""
class STRING:
"""
Impala STRING data type.
Represents Impala's STRING type (variable-length string) in SQLAlchemy.
Maps to Python str.
"""Supporting classes for the Impala SQLAlchemy dialect implementation.
class ImpalaDDLCompiler:
"""
DDL compiler for Impala dialect.
Handles compilation of Data Definition Language (DDL) statements
specific to Impala's SQL syntax and capabilities.
"""
class ImpalaTypeCompiler:
"""
Type compiler for Impala dialect.
Handles compilation of SQLAlchemy types to Impala-specific
type representations in SQL statements.
"""
class Impala4TypeCompiler(ImpalaTypeCompiler):
"""
Type compiler for Impala 4.x dialect.
Enhanced type compiler with support for Impala 4.x specific
data types and type representations.
"""
class ImpalaIdentifierPreparer:
"""
Identifier preparer for Impala dialect.
Handles proper quoting and escaping of identifiers (table names,
column names, etc.) according to Impala's identifier rules.
"""
class ImpalaExecutionContext:
"""
Execution context for Impala dialect.
Manages the execution context for SQLAlchemy operations,
including parameter binding and result processing.
"""from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# Create engine using impyla dialect
engine = create_engine(
'impala://impala-host:21050/default',
echo=True # Enable SQL logging
)
# Test connection
with engine.connect() as conn:
result = conn.execute(text("SELECT version()"))
version = result.fetchone()
print(f"Connected to: {version[0]}")from sqlalchemy import create_engine
# Kerberos authentication
engine = create_engine(
'impala://impala-host:21050/default',
connect_args={
'auth_mechanism': 'GSSAPI',
'kerberos_service_name': 'impala'
}
)
# LDAP authentication
engine = create_engine(
'impala://username:password@impala-host:21050/default',
connect_args={
'auth_mechanism': 'LDAP'
}
)
# SSL connection
engine = create_engine(
'impala://impala-host:21050/default',
connect_args={
'use_ssl': True,
'ca_cert': '/path/to/ca-cert.pem'
}
)from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.orm import sessionmaker
engine = create_engine('impala://impala-host:21050/sales_db')
# Reflect existing tables
metadata = MetaData()
metadata.reflect(bind=engine)
# Access reflected tables
customers = metadata.tables['customers']
orders = metadata.tables['orders']
print("Customers table columns:")
for column in customers.columns:
print(f" {column.name}: {column.type}")
# Query using reflected table
with engine.connect() as conn:
# Select all customers
result = conn.execute(customers.select().limit(10))
for row in result:
print(row)from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
Base = declarative_base()
class Customer(Base):
__tablename__ = 'customers'
customer_id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String)
created_at = Column(DateTime)
class Order(Base):
__tablename__ = 'orders'
order_id = Column(Integer, primary_key=True)
customer_id = Column(Integer)
order_date = Column(DateTime)
total_amount = Column(Float)
status = Column(String)
# Setup
engine = create_engine('impala://impala-host:21050/ecommerce')
Session = sessionmaker(bind=engine)
session = Session()
# Query using ORM
recent_customers = session.query(Customer).filter(
Customer.created_at >= datetime(2023, 1, 1)
).limit(10).all()
for customer in recent_customers:
print(f"Customer: {customer.name} ({customer.email})")
# Aggregate queries
from sqlalchemy import func
monthly_sales = session.query(
func.date_trunc('month', Order.order_date).label('month'),
func.sum(Order.total_amount).label('total_sales'),
func.count(Order.order_id).label('order_count')
).group_by(
func.date_trunc('month', Order.order_date)
).order_by('month').all()
for month, sales, count in monthly_sales:
print(f"{month}: ${sales:,.2f} ({count} orders)")
session.close()from sqlalchemy import create_engine, text, select, func, and_, or_
from sqlalchemy import MetaData, Table
engine = create_engine('impala://impala-host:21050/analytics')
# Reflect tables
metadata = MetaData()
metadata.reflect(bind=engine)
sales = metadata.tables['sales']
products = metadata.tables['products']
with engine.connect() as conn:
# Complex query with joins
query = select([
products.c.category,
func.sum(sales.c.amount).label('total_sales'),
func.count(sales.c.sale_id).label('sale_count'),
func.avg(sales.c.amount).label('avg_sale')
]).select_from(
sales.join(products, sales.c.product_id == products.c.product_id)
).where(
and_(
sales.c.sale_date >= '2023-01-01',
products.c.category.in_(['Electronics', 'Clothing', 'Books'])
)
).group_by(
products.c.category
).order_by(
func.sum(sales.c.amount).desc()
)
result = conn.execute(query)
print("Sales by Category:")
for row in result:
print(f"{row.category}: ${row.total_sales:,.2f} "
f"({row.sale_count} sales, avg: ${row.avg_sale:.2f})")from sqlalchemy import create_engine, Column, Table, MetaData
from impala.sqlalchemy import TINYINT, INT, DOUBLE, STRING
engine = create_engine('impala://impala-host:21050/test_db')
metadata = MetaData()
# Define table with Impala-specific types
sensor_data = Table('sensor_readings', metadata,
Column('sensor_id', INT, primary_key=True),
Column('reading_type', TINYINT), # 0-255 range
Column('temperature', DOUBLE),
Column('location', STRING),
Column('notes', STRING)
)
# Create table (if it doesn't exist)
with engine.connect() as conn:
# Note: CREATE TABLE IF NOT EXISTS may need to be done manually
# as Impala has specific syntax requirements
# Insert data using the defined types
conn.execute(sensor_data.insert().values([
{'sensor_id': 1, 'reading_type': 1, 'temperature': 23.5,
'location': 'Building A', 'notes': 'Normal reading'},
{'sensor_id': 2, 'reading_type': 2, 'temperature': 25.1,
'location': 'Building B', 'notes': 'Slightly elevated'}
]))
# Query back the data
result = conn.execute(sensor_data.select())
for row in result:
print(f"Sensor {row.sensor_id}: {row.temperature}°C at {row.location}")from sqlalchemy import create_engine, text
from sqlalchemy.pool import StaticPool
# Connection pooling configuration
engine = create_engine(
'impala://impala-host:21050/warehouse',
poolclass=StaticPool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Validate connections
connect_args={
'timeout': 60,
'retries': 3
}
)
# Partition-aware queries
with engine.connect() as conn:
# Query with partition pruning
partitioned_query = text("""
SELECT
product_category,
SUM(sales_amount) as total_sales
FROM sales_partitioned
WHERE
year = :year
AND month = :month
GROUP BY product_category
""")
result = conn.execute(partitioned_query, year=2023, month=6)
for row in result:
print(f"{row.product_category}: ${row.total_sales:,.2f}")
# Bulk operations
def bulk_insert_with_sqlalchemy(engine, table_name, data_rows):
"""Efficient bulk insert using SQLAlchemy."""
metadata = MetaData()
metadata.reflect(bind=engine)
table = metadata.tables[table_name]
with engine.connect() as conn:
# Use bulk insert for better performance
conn.execute(table.insert(), data_rows)
print(f"Inserted {len(data_rows)} rows into {table_name}")
# Usage
sample_data = [
{'product_id': 1, 'name': 'Widget A', 'price': 19.99},
{'product_id': 2, 'name': 'Widget B', 'price': 24.99},
{'product_id': 3, 'name': 'Widget C', 'price': 29.99}
]
bulk_insert_with_sqlalchemy(engine, 'products', sample_data)# Basic connection
engine = create_engine('impala://hostname:21050/database')
# With authentication
engine = create_engine('impala://user:pass@hostname:21050/database')
# With SSL
engine = create_engine('impala://hostname:21050/database?use_ssl=true')
# HTTP transport
engine = create_engine(
'impala://hostname:28000/database',
connect_args={
'use_http_transport': True,
'http_path': 'cliservice'
}
)
# Multiple connection parameters
engine = create_engine(
'impala://hostname:21050/database',
connect_args={
'auth_mechanism': 'GSSAPI',
'kerberos_service_name': 'impala',
'use_ssl': True,
'timeout': 120
}
)Install with Tessl CLI
npx tessl i tessl/pypi-impyla