A Python connector for Apache Druid with synchronous and asynchronous clients, database API support, and comprehensive query building utilities.
—
Python DB API 2.0 compliant interface for SQL-based interaction with Druid, providing standard database connectivity patterns that work with existing Python database tools and frameworks.
Establish and manage database connections to Druid.
def connect(
host: str = "localhost",
port: int = 8082,
path: str = "/druid/v2/sql/",
scheme: str = "http",
user: str = None,
password: str = None,
context: dict = None,
header: bool = False,
ssl_verify_cert: bool = True,
ssl_client_cert: str = None,
proxies: dict = None,
jwt: str = None
) -> Connection:
"""
Create a connection to Druid database.
Parameters:
- host: Druid broker hostname
- port: Druid broker port
- path: SQL endpoint path
- scheme: Connection scheme ('http' or 'https')
- user: Username for authentication (optional)
- password: Password for authentication (optional)
- context: Query context parameters (optional)
- header: Whether to request column headers in responses
- ssl_verify_cert: Whether to verify SSL certificates
- ssl_client_cert: Path to client SSL certificate (optional)
- proxies: Proxy configuration dictionary (optional)
- jwt: JWT token for authentication (optional)
Returns:
Connection object for executing queries
"""DB API 2.0 compliant connection object.
class Connection:
"""Database connection implementing DB API 2.0 interface."""
def close(self) -> None:
"""Close the connection and free resources."""
def commit(self) -> None:
"""
Commit pending transactions.
Note: Druid does not support transactions, so this is a no-op.
"""
def cursor(self) -> 'Cursor':
"""Create a new cursor object for executing queries."""
def execute(self, operation: str, parameters: dict = None) -> 'Cursor':
"""
Execute SQL query directly on connection.
Parameters:
- operation: SQL query string
- parameters: Query parameters for parameterized queries (optional)
Returns:
Cursor object with query results
"""DB API 2.0 compliant cursor for query execution and result retrieval.
class Cursor:
"""Database cursor implementing DB API 2.0 interface."""
# Properties
rowcount: int = None # Number of rows affected by last operation
description: list = None # Column description tuples
arraysize: int = 1 # Default number of rows to fetch with fetchmany()
def close(self) -> None:
"""Close the cursor and free resources."""
def execute(self, operation: str, parameters: dict = None) -> None:
"""
Execute SQL query.
Parameters:
- operation: SQL query string
- parameters: Query parameters for parameterized queries (optional)
Uses pyformat style: {'param': value}
"""
def fetchone(self) -> tuple | None:
"""
Fetch next row from query results.
Returns:
Tuple representing single row, or None if no more rows
"""
def fetchmany(self, size: int = None) -> list:
"""
Fetch multiple rows from query results.
Parameters:
- size: Number of rows to fetch (defaults to arraysize)
Returns:
List of tuples representing rows
"""
def fetchall(self) -> list:
"""
Fetch all remaining rows from query results.
Returns:
List of tuples representing all remaining rows
"""
def __iter__(self):
"""Allow iteration over cursor results."""
return self
def __next__(self):
"""Get next row when iterating over cursor."""Data type constants for column description.
class Type:
"""Data type constants for DB API compatibility."""
STRING: int = 1
NUMBER: int = 2
BOOLEAN: int = 3DB API 2.0 compliant exception hierarchy.
# Base exception
class Error(Exception):
"""Base class for all database exceptions."""
# Warning
class Warning(Exception):
"""Exception for important warnings."""
# Interface errors
class InterfaceError(Error):
"""Exception for database interface errors."""
# Compile errors
class CompileError(Error):
"""Exception for query compilation errors."""
# Database errors
class DatabaseError(Error):
"""Exception for database-related errors."""
class InternalError(DatabaseError):
"""Exception for internal database errors."""
class OperationalError(DatabaseError):
"""Exception for operational database errors."""
class ProgrammingError(DatabaseError):
"""Exception for programming errors in SQL."""
class IntegrityError(DatabaseError):
"""Exception for data integrity errors."""
class DataError(DatabaseError):
"""Exception for data processing errors."""
class NotSupportedError(CompileError):
"""Exception for unsupported database operations."""Additional authentication mechanisms.
class BearerAuth:
"""Bearer token authentication for HTTP requests."""
def __init__(self, token: str) -> None:
"""
Initialize bearer authentication.
Parameters:
- token: Bearer token string
"""from pydruid.db import connect
# Connect to Druid
conn = connect(host='localhost', port=8082, path='/druid/v2/sql/', scheme='http')
cursor = conn.cursor()
# Execute SQL query
cursor.execute("""
SELECT place,
CAST(REGEXP_EXTRACT(place, '(.*),', 1) AS FLOAT) AS lat,
CAST(REGEXP_EXTRACT(place, ',(.*)', 1) AS FLOAT) AS lon
FROM places
LIMIT 10
""")
# Fetch results
for row in cursor:
print(row)
# Or fetch all at once
cursor.execute("SELECT COUNT(*) FROM places")
result = cursor.fetchone()
print(f"Total places: {result[0]}")
# Close resources
cursor.close()
conn.close()from pydruid.db import connect
conn = connect(host='localhost', port=8082)
cursor = conn.cursor()
# Use parameterized queries (pyformat style)
cursor.execute("""
SELECT user_name, COUNT(*) as tweet_count
FROM twitterstream
WHERE user_lang = %(language)s
AND __time >= %(start_time)s
GROUP BY user_name
ORDER BY tweet_count DESC
LIMIT %(limit)s
""", {
'language': 'en',
'start_time': '2014-03-01',
'limit': 10
})
results = cursor.fetchall()
for row in results:
print(f"User: {row[0]}, Tweets: {row[1]}")from pydruid.db import connect, DatabaseError, ProgrammingError
try:
conn = connect(host='localhost', port=8082)
cursor = conn.cursor()
cursor.execute("SELECT * FROM nonexistent_table")
results = cursor.fetchall()
except ProgrammingError as e:
print(f"SQL error: {e}")
except DatabaseError as e:
print(f"Database error: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()from pydruid.db import connect
# Basic authentication
conn = connect(
host='druid.example.com',
port=8082,
scheme='https',
user='username',
password='password',
ssl_verify_cert=True
)
# JWT authentication
conn = connect(
host='druid.example.com',
port=8082,
scheme='https',
jwt='your-jwt-token-here'
)
# With query context
conn = connect(
host='localhost',
port=8082,
context={'timeout': 60000, 'maxScatterGatherBytes': 1000000}
)This implementation follows the Python Database API Specification v2.0 (PEP 249):
The implementation provides standard database connectivity that works with existing Python database tools, ORMs, and frameworks that expect DB API 2.0 compliant interfaces.
Install with Tessl CLI
npx tessl i tessl/pypi-pydruid