or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

asynchronous-client.mdcommand-line-interface.mddatabase-api.mdindex.mdquery-utilities.mdsqlalchemy-integration.mdsynchronous-client.md
tile.json

tessl/pypi-pydruid

A Python connector for Apache Druid with synchronous and asynchronous clients, database API support, and comprehensive query building utilities.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pydruid@0.6.x

To install, run

npx @tessl/cli install tessl/pypi-pydruid@0.6.0

index.mddocs/

PyDruid

A comprehensive Python connector for Apache Druid that provides both synchronous and asynchronous clients for creating, executing, and analyzing Druid queries. PyDruid enables tight integration between Druid and the SciPy stack by parsing query results into Pandas DataFrame objects, supports multiple query types, implements the Python DB API 2.0 specification with SQLAlchemy dialect support, and provides flexible export capabilities.

Package Information

  • Package Name: pydruid
  • Language: Python
  • Installation: pip install pydruid
  • Optional Dependencies: pip install pydruid[async,pandas,sqlalchemy,cli]

Core Imports

from pydruid.client import PyDruid
from pydruid.async_client import AsyncPyDruid

For query building utilities:

from pydruid.utils.aggregators import doublesum, count, longsum
from pydruid.utils.filters import Dimension, Filter
from pydruid.utils.postaggregator import Field

For database API:

from pydruid.db import connect

For SQLAlchemy:

from sqlalchemy import create_engine

Basic Usage

from pydruid.client import PyDruid
from pydruid.utils.aggregators import doublesum
from pydruid.utils.filters import Dimension

# Create client
query = PyDruid('http://localhost:8082', 'druid/v2/')

# Execute a timeseries query
ts = query.timeseries(
    datasource='twitterstream',
    granularity='day',
    intervals='2014-02-02/p4w',
    aggregations={'tweet_count': doublesum('count')},
    filter=Dimension('first_hashtag') == 'sochi2014'
)

# Export results to pandas DataFrame
df = query.export_pandas()
print(df)

# Or export to TSV
query.export_tsv('results.tsv')

Architecture

PyDruid provides three main interaction patterns:

  • Native Druid Queries: Direct query construction using PyDruid/AsyncPyDruid clients with utility classes for aggregations, filters, and post-aggregations
  • Database API: Standard Python DB API 2.0 interface for SQL-based interaction with Druid
  • SQLAlchemy Integration: Full SQLAlchemy dialect support enabling ORM and Core usage patterns

The utility modules (aggregators, filters, dimensions, having, postaggregator) provide declarative query building capabilities that generate the appropriate JSON structures for Druid's native query format.

Capabilities

Synchronous Client

Primary client for executing Druid queries synchronously with support for all query types, authentication, proxy configuration, and result export capabilities.

class PyDruid:
    def __init__(self, url: str, endpoint: str, cafile: str = None) -> None: ...
    def set_basic_auth_credentials(self, username: str, password: str) -> None: ...
    def set_proxies(self, proxies: dict) -> None: ...
    def topn(self, **kwargs) -> Query: ...
    def timeseries(self, **kwargs) -> Query: ...
    def groupby(self, **kwargs) -> Query: ...
    def search(self, **kwargs) -> Query: ...
    def subquery(self, **kwargs) -> dict: ...

Synchronous Client

Asynchronous Client

Tornado-based asynchronous client for non-blocking query execution, suitable for high-concurrency applications and async frameworks.

class AsyncPyDruid:
    def __init__(self, url: str, endpoint: str, defaults: dict = None, http_client: str = None) -> None: ...
    async def topn(self, **kwargs) -> Query: ...
    async def timeseries(self, **kwargs) -> Query: ...
    async def groupby(self, **kwargs) -> Query: ...

Asynchronous Client

Query Building Utilities

Comprehensive utilities for constructing Druid queries including aggregations, filters, dimensions, having clauses, and post-aggregations.

# Aggregations
def doublesum(raw_metric: str) -> dict: ...
def count(raw_metric: str) -> dict: ...
def longsum(raw_metric: str) -> dict: ...

# Filters
class Dimension:
    def __init__(self, dim: str) -> None: ...
    def __eq__(self, other) -> Filter: ...

class Filter:
    def __and__(self, other: 'Filter') -> 'Filter': ...
    def __or__(self, other: 'Filter') -> 'Filter': ...

Query Building Utilities

Database API

Python DB API 2.0 compliant interface for SQL-based interaction with Druid, providing standard database connectivity patterns.

def connect(
    host: str = "localhost",
    port: int = 8082,
    path: str = "/druid/v2/sql/",
    scheme: str = "http",
    user: str = None,
    password: str = None,
    context: dict = None,
    header: bool = False,
    ssl_verify_cert: bool = True,
    ssl_client_cert: str = None,
    proxies: dict = None,
    jwt: str = None
) -> Connection: ...

Database API

SQLAlchemy Integration

Full SQLAlchemy dialect support enabling both Core and ORM usage patterns with Druid as a backend database.

class DruidDialect:
    name = "druid"
    def dbapi(self) -> type: ...
    def create_connect_args(self, url) -> tuple: ...

SQLAlchemy Integration

Command Line Interface

Interactive command-line tool for executing SQL queries against Druid with syntax highlighting and autocompletion.

def main() -> None: ...

Command Line Interface

Types

class Query:
    """
    Query result wrapper with export capabilities.
    
    All query methods return Query objects that provide access to results
    and export functionality. Query objects act as wrappers over raw result
    lists and implement MutableSequence interface.
    """
    result: list = None  # Query result parsed into list of dicts
    result_json: str = None  # Raw JSON response from Druid
    query_type: str = None  # Type of query executed (topN, timeseries, etc.)
    query_dict: dict = None  # JSON representation of the query
    
    def __init__(self, query_dict: dict, query_type: str) -> None:
        """Initialize Query object with query metadata."""
    
    def parse(self, data: str) -> None:
        """
        Parse raw JSON response data into result list.
        
        Parameters:
        - data: Raw JSON response from Druid
        
        Raises:
        IOError: If parsing fails
        """
    
    def export_tsv(self, dest_path: str) -> None:
        """
        Export query results to TSV file.
        
        Parameters:
        - dest_path: File path to write TSV data
        """
    
    def export_pandas(self) -> 'pandas.DataFrame':
        """
        Export query results to pandas DataFrame.
        
        Returns:
        pandas DataFrame with query results
        
        Requires:
        pandas package to be installed
        
        Note:
        This method is deprecated. Use export_pandas() method directly
        on the client object instead.
        """
    
    # MutableSequence interface methods
    def __len__(self) -> int: ...
    def __getitem__(self, index: int): ...
    def __setitem__(self, index: int, value): ...
    def __delitem__(self, index: int): ...
    def insert(self, index: int, value): ...

class Connection:
    """DB API 2.0 connection object."""
    def close(self) -> None: ...
    def commit(self) -> None: ...
    def cursor(self) -> 'Cursor': ...
    def execute(self, operation: str, parameters: dict = None) -> 'Cursor': ...

class Cursor:
    """DB API 2.0 cursor object."""
    rowcount: int
    description: list
    arraysize: int
    
    def close(self) -> None: ...
    def execute(self, operation: str, parameters: dict = None) -> None: ...
    def fetchone(self) -> tuple: ...
    def fetchmany(self, size: int = None) -> list: ...
    def fetchall(self) -> list: ...