CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pydruid

A Python connector for Apache Druid with synchronous and asynchronous clients, database API support, and comprehensive query building utilities.

Pending
Overview
Eval results
Files

database-api.mddocs/

Database API

Python DB API 2.0 compliant interface for SQL-based interaction with Druid, providing standard database connectivity patterns that work with existing Python database tools and frameworks.

Capabilities

Connection Management

Establish and manage database connections to Druid.

def connect(
    host: str = "localhost",
    port: int = 8082,
    path: str = "/druid/v2/sql/",
    scheme: str = "http",
    user: str = None,
    password: str = None,
    context: dict = None,
    header: bool = False,
    ssl_verify_cert: bool = True,
    ssl_client_cert: str = None,
    proxies: dict = None,
    jwt: str = None
) -> Connection:
    """
    Create a connection to Druid database.
    
    Parameters:
    - host: Druid broker hostname
    - port: Druid broker port  
    - path: SQL endpoint path
    - scheme: Connection scheme ('http' or 'https')
    - user: Username for authentication (optional)
    - password: Password for authentication (optional)
    - context: Query context parameters (optional)
    - header: Whether to request column headers in responses
    - ssl_verify_cert: Whether to verify SSL certificates
    - ssl_client_cert: Path to client SSL certificate (optional)
    - proxies: Proxy configuration dictionary (optional)
    - jwt: JWT token for authentication (optional)
    
    Returns:
    Connection object for executing queries
    """

Connection Interface

DB API 2.0 compliant connection object.

class Connection:
    """Database connection implementing DB API 2.0 interface."""
    
    def close(self) -> None:
        """Close the connection and free resources."""
    
    def commit(self) -> None:
        """
        Commit pending transactions.
        Note: Druid does not support transactions, so this is a no-op.
        """
    
    def cursor(self) -> 'Cursor':
        """Create a new cursor object for executing queries."""
    
    def execute(self, operation: str, parameters: dict = None) -> 'Cursor':
        """
        Execute SQL query directly on connection.
        
        Parameters:
        - operation: SQL query string
        - parameters: Query parameters for parameterized queries (optional)
        
        Returns:
        Cursor object with query results
        """

Cursor Interface

DB API 2.0 compliant cursor for query execution and result retrieval.

class Cursor:
    """Database cursor implementing DB API 2.0 interface."""
    
    # Properties
    rowcount: int = None  # Number of rows affected by last operation
    description: list = None  # Column description tuples
    arraysize: int = 1  # Default number of rows to fetch with fetchmany()
    
    def close(self) -> None:
        """Close the cursor and free resources."""
    
    def execute(self, operation: str, parameters: dict = None) -> None:
        """
        Execute SQL query.
        
        Parameters:
        - operation: SQL query string  
        - parameters: Query parameters for parameterized queries (optional)
                     Uses pyformat style: {'param': value}
        """
    
    def fetchone(self) -> tuple | None:
        """
        Fetch next row from query results.
        
        Returns:
        Tuple representing single row, or None if no more rows
        """
    
    def fetchmany(self, size: int = None) -> list:
        """
        Fetch multiple rows from query results.
        
        Parameters:
        - size: Number of rows to fetch (defaults to arraysize)
        
        Returns:
        List of tuples representing rows
        """
    
    def fetchall(self) -> list:
        """
        Fetch all remaining rows from query results.
        
        Returns:
        List of tuples representing all remaining rows
        """
    
    def __iter__(self):
        """Allow iteration over cursor results."""
        return self
    
    def __next__(self):
        """Get next row when iterating over cursor."""

Data Types

Data type constants for column description.

class Type:
    """Data type constants for DB API compatibility."""
    STRING: int = 1
    NUMBER: int = 2  
    BOOLEAN: int = 3

Exception Hierarchy

DB API 2.0 compliant exception hierarchy.

# Base exception
class Error(Exception):
    """Base class for all database exceptions."""

# Warning
class Warning(Exception):
    """Exception for important warnings."""

# Interface errors
class InterfaceError(Error):
    """Exception for database interface errors."""

# Compile errors
class CompileError(Error):
    """Exception for query compilation errors."""

# Database errors
class DatabaseError(Error):
    """Exception for database-related errors."""

class InternalError(DatabaseError):
    """Exception for internal database errors."""

class OperationalError(DatabaseError):
    """Exception for operational database errors."""

class ProgrammingError(DatabaseError):
    """Exception for programming errors in SQL."""

class IntegrityError(DatabaseError):
    """Exception for data integrity errors."""

class DataError(DatabaseError):
    """Exception for data processing errors."""

class NotSupportedError(CompileError):
    """Exception for unsupported database operations."""

Authentication Support

Additional authentication mechanisms.

class BearerAuth:
    """Bearer token authentication for HTTP requests."""
    
    def __init__(self, token: str) -> None:
        """
        Initialize bearer authentication.
        
        Parameters:
        - token: Bearer token string
        """

Usage Examples

Basic Usage

from pydruid.db import connect

# Connect to Druid
conn = connect(host='localhost', port=8082, path='/druid/v2/sql/', scheme='http')
cursor = conn.cursor()

# Execute SQL query
cursor.execute("""
    SELECT place,
           CAST(REGEXP_EXTRACT(place, '(.*),', 1) AS FLOAT) AS lat,
           CAST(REGEXP_EXTRACT(place, ',(.*)', 1) AS FLOAT) AS lon
      FROM places
     LIMIT 10
""")

# Fetch results
for row in cursor:
    print(row)

# Or fetch all at once
cursor.execute("SELECT COUNT(*) FROM places")
result = cursor.fetchone()
print(f"Total places: {result[0]}")

# Close resources
cursor.close()
conn.close()

Parameterized Queries

from pydruid.db import connect

conn = connect(host='localhost', port=8082)
cursor = conn.cursor()

# Use parameterized queries (pyformat style)
cursor.execute("""
    SELECT user_name, COUNT(*) as tweet_count
      FROM twitterstream
     WHERE user_lang = %(language)s
       AND __time >= %(start_time)s
     GROUP BY user_name
     ORDER BY tweet_count DESC
     LIMIT %(limit)s
""", {
    'language': 'en',
    'start_time': '2014-03-01',
    'limit': 10
})

results = cursor.fetchall()
for row in results:
    print(f"User: {row[0]}, Tweets: {row[1]}")

Error Handling

from pydruid.db import connect, DatabaseError, ProgrammingError

try:
    conn = connect(host='localhost', port=8082)
    cursor = conn.cursor()
    
    cursor.execute("SELECT * FROM nonexistent_table")
    results = cursor.fetchall()
    
except ProgrammingError as e:
    print(f"SQL error: {e}")
except DatabaseError as e:
    print(f"Database error: {e}")
finally:
    if cursor:
        cursor.close()
    if conn:
        conn.close()

Connection with Authentication

from pydruid.db import connect

# Basic authentication
conn = connect(
    host='druid.example.com',
    port=8082,
    scheme='https',
    user='username',
    password='password',
    ssl_verify_cert=True
)

# JWT authentication  
conn = connect(
    host='druid.example.com',
    port=8082,
    scheme='https',
    jwt='your-jwt-token-here'
)

# With query context
conn = connect(
    host='localhost',
    port=8082,
    context={'timeout': 60000, 'maxScatterGatherBytes': 1000000}
)

DB API Compliance

This implementation follows the Python Database API Specification v2.0 (PEP 249):

  • apilevel: "2.0" - Supports DB API 2.0
  • threadsafety: 2 - Threads may share the module and connections
  • paramstyle: "pyformat" - Uses %(name)s parameter style

The implementation provides standard database connectivity that works with existing Python database tools, ORMs, and frameworks that expect DB API 2.0 compliant interfaces.

Install with Tessl CLI

npx tessl i tessl/pypi-pydruid

docs

asynchronous-client.md

command-line-interface.md

database-api.md

index.md

query-utilities.md

sqlalchemy-integration.md

synchronous-client.md

tile.json