CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pydruid

A Python connector for Apache Druid with synchronous and asynchronous clients, database API support, and comprehensive query building utilities.

Pending
Overview
Eval results
Files

sqlalchemy-integration.mddocs/

SQLAlchemy Integration

Full SQLAlchemy dialect support enabling both Core and ORM usage patterns with Druid as a backend database. The integration provides table introspection, query compilation, and type mapping for seamless integration with SQLAlchemy applications.

Capabilities

Dialect Registration

PyDruid automatically registers SQLAlchemy dialects through entry points.

# Available dialect URLs
druid://host:port/path/to/sql/endpoint
druid+http://host:port/path/to/sql/endpoint  
druid+https://host:port/path/to/sql/endpoint

Engine Creation

Create SQLAlchemy engines for Druid connectivity.

from sqlalchemy import create_engine

# Basic HTTP connection
engine = create_engine('druid://localhost:8082/druid/v2/sql/')

# Explicit HTTP (equivalent to above)
engine = create_engine('druid+http://localhost:8082/druid/v2/sql/')

# HTTPS connection
engine = create_engine('druid+https://localhost:8082/druid/v2/sql/')

# With authentication
engine = create_engine('druid://username:password@localhost:8082/druid/v2/sql/')

# With query parameters for configuration
engine = create_engine('druid://localhost:8082/druid/v2/sql/?header=true&timeout=60000')

Dialect Classes

Core dialect implementation classes.

class DruidDialect:
    """Main SQLAlchemy dialect for Druid."""
    
    name: str = "druid"
    scheme: str = "http"
    
    def dbapi(self) -> type:
        """Return the DB API module for this dialect."""
    
    def create_connect_args(self, url) -> tuple:
        """
        Build connection arguments from SQLAlchemy URL.
        
        Parameters:
        - url: SQLAlchemy URL object
        
        Returns:
        Tuple of (args, kwargs) for connect() function
        """
    
    def do_ping(self, dbapi_connection) -> bool:
        """
        Test if connection is alive.
        
        Parameters:
        - dbapi_connection: DB API connection object
        
        Returns:
        True if connection is alive, False otherwise
        """
    
    def get_schema_names(self, connection, **kwargs) -> list:
        """Get list of schema names."""
    
    def get_table_names(self, connection, schema: str = None, **kwargs) -> list:
        """
        Get list of table names in schema.
        
        Parameters:
        - connection: SQLAlchemy connection
        - schema: Schema name (optional)
        
        Returns:
        List of table names
        """
    
    def get_columns(self, connection, table_name: str, schema: str = None, **kwargs) -> list:
        """
        Get column information for a table.
        
        Parameters:
        - connection: SQLAlchemy connection
        - table_name: Name of table to introspect
        - schema: Schema name (optional)
        
        Returns:
        List of column dictionaries with metadata
        """

class DruidHTTPDialect(DruidDialect):
    """HTTP-specific Druid dialect (alias for DruidDialect)."""
    scheme: str = "http"

class DruidHTTPSDialect(DruidDialect):
    """HTTPS-specific Druid dialect."""
    scheme: str = "https"

SQL Compilation

Custom SQL compilation for Druid-specific features.

class DruidCompiler(compiler.SQLCompiler):
    """SQL compiler for Druid dialect (inherits from SQLAlchemy SQLCompiler)."""
    pass

class DruidTypeCompiler(compiler.GenericTypeCompiler):
    """Type compiler for mapping SQLAlchemy types to Druid types."""
    
    def visit_REAL(self, type_, **kwargs) -> str:
        """Map REAL type to Druid DOUBLE."""
        return "DOUBLE"
    
    def visit_NUMERIC(self, type_, **kwargs) -> str:
        """Map NUMERIC type to Druid LONG."""
        return "LONG"
    
    def visit_CHAR(self, type_, **kwargs) -> str:
        """Map CHAR type to Druid STRING."""
        return "STRING"
    
    def visit_DATETIME(self, type_, **kwargs) -> str:
        """Map DATETIME type to Druid LONG."""
        return "LONG"
    
    def visit_BLOB(self, type_, **kwargs) -> str:
        """Map BLOB type to Druid COMPLEX."""
        return "COMPLEX"

class DruidIdentifierPreparer(compiler.IdentifierPreparer):
    """Identifier preparation for Druid SQL with universal reserved words."""
    reserved_words = UniversalSet()

class UniversalSet:
    """Set that contains all items (no reserved words are enforced)."""
    
    def __contains__(self, item) -> bool:
        """Always returns True - no reserved words."""
        return True

Type Mapping

JDBC type mapping for column introspection.

jdbc_type_map: dict = {
    -6: types.BigInteger,    # TINYINT
    -5: types.BigInteger,    # BIGINT
    1: types.String,         # CHAR
    3: types.Float,          # DECIMAL
    4: types.BigInteger,     # INTEGER
    5: types.BigInteger,     # SMALLINT
    6: types.Float,          # FLOAT
    7: types.Float,          # REAL
    8: types.Float,          # DOUBLE
    12: types.String,        # VARCHAR
    16: types.Boolean,       # BOOLEAN
    91: types.DATE,          # DATE
    93: types.TIMESTAMP,     # TIMESTAMP
    1111: types.BLOB,        # OTHER
}

Usage Examples

Basic Engine Usage

from sqlalchemy import create_engine, text

# Create engine
engine = create_engine('druid://localhost:8082/druid/v2/sql/')

# Execute raw SQL
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM places"))
    count = result.scalar()
    print(f"Total places: {count}")

Table Reflection

from sqlalchemy import create_engine, MetaData, Table, select

engine = create_engine('druid://localhost:8082/druid/v2/sql/')
metadata = MetaData()

# Reflect existing table structure
places = Table('places', metadata, autoload_with=engine)

# Inspect columns
for column in places.columns:
    print(f"Column: {column.name}, Type: {column.type}")

# Execute query using reflected table
with engine.connect() as conn:
    stmt = select(places.c.place_name).limit(10)
    result = conn.execute(stmt)
    for row in result:
        print(row[0])

Core Usage with Query Builder

from sqlalchemy import create_engine, MetaData, Table, select, func, and_

engine = create_engine('druid://localhost:8082/druid/v2/sql/')
metadata = MetaData()

# Reflect tables
twitterstream = Table('twitterstream', metadata, autoload_with=engine)

# Build complex query
stmt = select(
    twitterstream.c.user_name,
    func.count().label('tweet_count')
).where(
    and_(
        twitterstream.c.user_lang == 'en',
        twitterstream.c.__time >= '2014-03-01'
    )
).group_by(
    twitterstream.c.user_name
).order_by(
    func.count().desc()
).limit(10)

# Execute query
with engine.connect() as conn:
    result = conn.execute(stmt)
    for row in result:
        print(f"User: {row.user_name}, Tweets: {row.tweet_count}")

Connection Configuration

from sqlalchemy import create_engine

# With column headers (recommended for Druid >= 0.13.0)
engine = create_engine('druid://localhost:8082/druid/v2/sql/?header=true')

# With authentication
engine = create_engine('druid://user:pass@localhost:8082/druid/v2/sql/')

# HTTPS with SSL verification
engine = create_engine('druid+https://localhost:8082/druid/v2/sql/')

# With connection pool settings
engine = create_engine(
    'druid://localhost:8082/druid/v2/sql/',
    pool_size=10,
    max_overflow=20,
    pool_timeout=30
)

Advanced Usage with ORM

from sqlalchemy import create_engine, Column, String, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class TwitterStream(Base):
    __tablename__ = 'twitterstream'
    
    # Define primary key (required for ORM, though Druid doesn't have real PKs)
    __time = Column(DateTime, primary_key=True)
    user_name = Column(String)
    tweet_text = Column(String)
    user_lang = Column(String)

engine = create_engine('druid://localhost:8082/druid/v2/sql/')
Session = sessionmaker(bind=engine)

# Note: ORM usage with Druid is limited due to Druid's nature as an analytical database
# Core usage is generally recommended for most use cases

Error Handling

from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError, DatabaseError

engine = create_engine('druid://localhost:8082/druid/v2/sql/')

try:
    with engine.connect() as conn:
        result = conn.execute(text("SELECT * FROM nonexistent_table"))
        rows = result.fetchall()
except DatabaseError as e:
    print(f"Database error: {e}")
except SQLAlchemyError as e:
    print(f"SQLAlchemy error: {e}")

Configuration Options

URL Parameters

  • header: Set to 'true' to request column headers (recommended for Druid >= 0.13.0)
  • timeout: Query timeout in milliseconds
  • context: JSON-encoded query context parameters

Example:

engine = create_engine(
    'druid://localhost:8082/druid/v2/sql/?header=true&timeout=60000&context={"timeout":60000}'
)

Engine Options

Standard SQLAlchemy engine options apply:

  • pool_size: Size of connection pool
  • max_overflow: Maximum overflow connections
  • pool_timeout: Connection timeout
  • echo: Enable SQL logging

Limitations

Due to Druid's nature as an analytical database:

  • No transactions: COMMIT/ROLLBACK operations are no-ops
  • Read-only: INSERT/UPDATE/DELETE operations are not supported
  • Limited ORM support: Core usage is recommended over ORM
  • No foreign keys: Druid doesn't support relational constraints
  • Time-based partitioning: Tables are typically partitioned by time

The SQLAlchemy integration is optimized for analytical queries and data exploration rather than transactional operations.

Install with Tessl CLI

npx tessl i tessl/pypi-pydruid

docs

asynchronous-client.md

command-line-interface.md

database-api.md

index.md

query-utilities.md

sqlalchemy-integration.md

synchronous-client.md

tile.json