A Python connector for Apache Druid with synchronous and asynchronous clients, database API support, and comprehensive query building utilities.
—
Full SQLAlchemy dialect support enabling both Core and ORM usage patterns with Druid as a backend database. The integration provides table introspection, query compilation, and type mapping for seamless integration with SQLAlchemy applications.
PyDruid automatically registers SQLAlchemy dialects through entry points.
# Available dialect URLs
druid://host:port/path/to/sql/endpoint
druid+http://host:port/path/to/sql/endpoint
druid+https://host:port/path/to/sql/endpointCreate SQLAlchemy engines for Druid connectivity.
from sqlalchemy import create_engine
# Basic HTTP connection
engine = create_engine('druid://localhost:8082/druid/v2/sql/')
# Explicit HTTP (equivalent to above)
engine = create_engine('druid+http://localhost:8082/druid/v2/sql/')
# HTTPS connection
engine = create_engine('druid+https://localhost:8082/druid/v2/sql/')
# With authentication
engine = create_engine('druid://username:password@localhost:8082/druid/v2/sql/')
# With query parameters for configuration
engine = create_engine('druid://localhost:8082/druid/v2/sql/?header=true&timeout=60000')Core dialect implementation classes.
class DruidDialect:
"""Main SQLAlchemy dialect for Druid."""
name: str = "druid"
scheme: str = "http"
def dbapi(self) -> type:
"""Return the DB API module for this dialect."""
def create_connect_args(self, url) -> tuple:
"""
Build connection arguments from SQLAlchemy URL.
Parameters:
- url: SQLAlchemy URL object
Returns:
Tuple of (args, kwargs) for connect() function
"""
def do_ping(self, dbapi_connection) -> bool:
"""
Test if connection is alive.
Parameters:
- dbapi_connection: DB API connection object
Returns:
True if connection is alive, False otherwise
"""
def get_schema_names(self, connection, **kwargs) -> list:
"""Get list of schema names."""
def get_table_names(self, connection, schema: str = None, **kwargs) -> list:
"""
Get list of table names in schema.
Parameters:
- connection: SQLAlchemy connection
- schema: Schema name (optional)
Returns:
List of table names
"""
def get_columns(self, connection, table_name: str, schema: str = None, **kwargs) -> list:
"""
Get column information for a table.
Parameters:
- connection: SQLAlchemy connection
- table_name: Name of table to introspect
- schema: Schema name (optional)
Returns:
List of column dictionaries with metadata
"""
class DruidHTTPDialect(DruidDialect):
"""HTTP-specific Druid dialect (alias for DruidDialect)."""
scheme: str = "http"
class DruidHTTPSDialect(DruidDialect):
"""HTTPS-specific Druid dialect."""
scheme: str = "https"Custom SQL compilation for Druid-specific features.
class DruidCompiler(compiler.SQLCompiler):
"""SQL compiler for Druid dialect (inherits from SQLAlchemy SQLCompiler)."""
pass
class DruidTypeCompiler(compiler.GenericTypeCompiler):
"""Type compiler for mapping SQLAlchemy types to Druid types."""
def visit_REAL(self, type_, **kwargs) -> str:
"""Map REAL type to Druid DOUBLE."""
return "DOUBLE"
def visit_NUMERIC(self, type_, **kwargs) -> str:
"""Map NUMERIC type to Druid LONG."""
return "LONG"
def visit_CHAR(self, type_, **kwargs) -> str:
"""Map CHAR type to Druid STRING."""
return "STRING"
def visit_DATETIME(self, type_, **kwargs) -> str:
"""Map DATETIME type to Druid LONG."""
return "LONG"
def visit_BLOB(self, type_, **kwargs) -> str:
"""Map BLOB type to Druid COMPLEX."""
return "COMPLEX"
class DruidIdentifierPreparer(compiler.IdentifierPreparer):
"""Identifier preparation for Druid SQL with universal reserved words."""
reserved_words = UniversalSet()
class UniversalSet:
"""Set that contains all items (no reserved words are enforced)."""
def __contains__(self, item) -> bool:
"""Always returns True - no reserved words."""
return TrueJDBC type mapping for column introspection.
jdbc_type_map: dict = {
-6: types.BigInteger, # TINYINT
-5: types.BigInteger, # BIGINT
1: types.String, # CHAR
3: types.Float, # DECIMAL
4: types.BigInteger, # INTEGER
5: types.BigInteger, # SMALLINT
6: types.Float, # FLOAT
7: types.Float, # REAL
8: types.Float, # DOUBLE
12: types.String, # VARCHAR
16: types.Boolean, # BOOLEAN
91: types.DATE, # DATE
93: types.TIMESTAMP, # TIMESTAMP
1111: types.BLOB, # OTHER
}from sqlalchemy import create_engine, text
# Create engine
engine = create_engine('druid://localhost:8082/druid/v2/sql/')
# Execute raw SQL
with engine.connect() as conn:
result = conn.execute(text("SELECT COUNT(*) FROM places"))
count = result.scalar()
print(f"Total places: {count}")from sqlalchemy import create_engine, MetaData, Table, select
engine = create_engine('druid://localhost:8082/druid/v2/sql/')
metadata = MetaData()
# Reflect existing table structure
places = Table('places', metadata, autoload_with=engine)
# Inspect columns
for column in places.columns:
print(f"Column: {column.name}, Type: {column.type}")
# Execute query using reflected table
with engine.connect() as conn:
stmt = select(places.c.place_name).limit(10)
result = conn.execute(stmt)
for row in result:
print(row[0])from sqlalchemy import create_engine, MetaData, Table, select, func, and_
engine = create_engine('druid://localhost:8082/druid/v2/sql/')
metadata = MetaData()
# Reflect tables
twitterstream = Table('twitterstream', metadata, autoload_with=engine)
# Build complex query
stmt = select(
twitterstream.c.user_name,
func.count().label('tweet_count')
).where(
and_(
twitterstream.c.user_lang == 'en',
twitterstream.c.__time >= '2014-03-01'
)
).group_by(
twitterstream.c.user_name
).order_by(
func.count().desc()
).limit(10)
# Execute query
with engine.connect() as conn:
result = conn.execute(stmt)
for row in result:
print(f"User: {row.user_name}, Tweets: {row.tweet_count}")from sqlalchemy import create_engine
# With column headers (recommended for Druid >= 0.13.0)
engine = create_engine('druid://localhost:8082/druid/v2/sql/?header=true')
# With authentication
engine = create_engine('druid://user:pass@localhost:8082/druid/v2/sql/')
# HTTPS with SSL verification
engine = create_engine('druid+https://localhost:8082/druid/v2/sql/')
# With connection pool settings
engine = create_engine(
'druid://localhost:8082/druid/v2/sql/',
pool_size=10,
max_overflow=20,
pool_timeout=30
)from sqlalchemy import create_engine, Column, String, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class TwitterStream(Base):
__tablename__ = 'twitterstream'
# Define primary key (required for ORM, though Druid doesn't have real PKs)
__time = Column(DateTime, primary_key=True)
user_name = Column(String)
tweet_text = Column(String)
user_lang = Column(String)
engine = create_engine('druid://localhost:8082/druid/v2/sql/')
Session = sessionmaker(bind=engine)
# Note: ORM usage with Druid is limited due to Druid's nature as an analytical database
# Core usage is generally recommended for most use casesfrom sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError, DatabaseError
engine = create_engine('druid://localhost:8082/druid/v2/sql/')
try:
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM nonexistent_table"))
rows = result.fetchall()
except DatabaseError as e:
print(f"Database error: {e}")
except SQLAlchemyError as e:
print(f"SQLAlchemy error: {e}")Example:
engine = create_engine(
'druid://localhost:8082/druid/v2/sql/?header=true&timeout=60000&context={"timeout":60000}'
)Standard SQLAlchemy engine options apply:
Due to Druid's nature as an analytical database:
The SQLAlchemy integration is optimized for analytical queries and data exploration rather than transactional operations.
Install with Tessl CLI
npx tessl i tessl/pypi-pydruid