A Python connector for Apache Druid with synchronous and asynchronous clients, database API support, and comprehensive query building utilities.
npx @tessl/cli install tessl/pypi-pydruid@0.6.0A comprehensive Python connector for Apache Druid that provides both synchronous and asynchronous clients for creating, executing, and analyzing Druid queries. PyDruid enables tight integration between Druid and the SciPy stack by parsing query results into Pandas DataFrame objects, supports multiple query types, implements the Python DB API 2.0 specification with SQLAlchemy dialect support, and provides flexible export capabilities.
pip install pydruidpip install pydruid[async,pandas,sqlalchemy,cli]from pydruid.client import PyDruid
from pydruid.async_client import AsyncPyDruidFor query building utilities:
from pydruid.utils.aggregators import doublesum, count, longsum
from pydruid.utils.filters import Dimension, Filter
from pydruid.utils.postaggregator import FieldFor database API:
from pydruid.db import connectFor SQLAlchemy:
from sqlalchemy import create_enginefrom pydruid.client import PyDruid
from pydruid.utils.aggregators import doublesum
from pydruid.utils.filters import Dimension
# Create client
query = PyDruid('http://localhost:8082', 'druid/v2/')
# Execute a timeseries query
ts = query.timeseries(
datasource='twitterstream',
granularity='day',
intervals='2014-02-02/p4w',
aggregations={'tweet_count': doublesum('count')},
filter=Dimension('first_hashtag') == 'sochi2014'
)
# Export results to pandas DataFrame
df = query.export_pandas()
print(df)
# Or export to TSV
query.export_tsv('results.tsv')PyDruid provides three main interaction patterns:
The utility modules (aggregators, filters, dimensions, having, postaggregator) provide declarative query building capabilities that generate the appropriate JSON structures for Druid's native query format.
Primary client for executing Druid queries synchronously with support for all query types, authentication, proxy configuration, and result export capabilities.
class PyDruid:
def __init__(self, url: str, endpoint: str, cafile: str = None) -> None: ...
def set_basic_auth_credentials(self, username: str, password: str) -> None: ...
def set_proxies(self, proxies: dict) -> None: ...
def topn(self, **kwargs) -> Query: ...
def timeseries(self, **kwargs) -> Query: ...
def groupby(self, **kwargs) -> Query: ...
def search(self, **kwargs) -> Query: ...
def subquery(self, **kwargs) -> dict: ...Tornado-based asynchronous client for non-blocking query execution, suitable for high-concurrency applications and async frameworks.
class AsyncPyDruid:
def __init__(self, url: str, endpoint: str, defaults: dict = None, http_client: str = None) -> None: ...
async def topn(self, **kwargs) -> Query: ...
async def timeseries(self, **kwargs) -> Query: ...
async def groupby(self, **kwargs) -> Query: ...Comprehensive utilities for constructing Druid queries including aggregations, filters, dimensions, having clauses, and post-aggregations.
# Aggregations
def doublesum(raw_metric: str) -> dict: ...
def count(raw_metric: str) -> dict: ...
def longsum(raw_metric: str) -> dict: ...
# Filters
class Dimension:
def __init__(self, dim: str) -> None: ...
def __eq__(self, other) -> Filter: ...
class Filter:
def __and__(self, other: 'Filter') -> 'Filter': ...
def __or__(self, other: 'Filter') -> 'Filter': ...Python DB API 2.0 compliant interface for SQL-based interaction with Druid, providing standard database connectivity patterns.
def connect(
host: str = "localhost",
port: int = 8082,
path: str = "/druid/v2/sql/",
scheme: str = "http",
user: str = None,
password: str = None,
context: dict = None,
header: bool = False,
ssl_verify_cert: bool = True,
ssl_client_cert: str = None,
proxies: dict = None,
jwt: str = None
) -> Connection: ...Full SQLAlchemy dialect support enabling both Core and ORM usage patterns with Druid as a backend database.
class DruidDialect:
name = "druid"
def dbapi(self) -> type: ...
def create_connect_args(self, url) -> tuple: ...Interactive command-line tool for executing SQL queries against Druid with syntax highlighting and autocompletion.
def main() -> None: ...class Query:
"""
Query result wrapper with export capabilities.
All query methods return Query objects that provide access to results
and export functionality. Query objects act as wrappers over raw result
lists and implement MutableSequence interface.
"""
result: list = None # Query result parsed into list of dicts
result_json: str = None # Raw JSON response from Druid
query_type: str = None # Type of query executed (topN, timeseries, etc.)
query_dict: dict = None # JSON representation of the query
def __init__(self, query_dict: dict, query_type: str) -> None:
"""Initialize Query object with query metadata."""
def parse(self, data: str) -> None:
"""
Parse raw JSON response data into result list.
Parameters:
- data: Raw JSON response from Druid
Raises:
IOError: If parsing fails
"""
def export_tsv(self, dest_path: str) -> None:
"""
Export query results to TSV file.
Parameters:
- dest_path: File path to write TSV data
"""
def export_pandas(self) -> 'pandas.DataFrame':
"""
Export query results to pandas DataFrame.
Returns:
pandas DataFrame with query results
Requires:
pandas package to be installed
Note:
This method is deprecated. Use export_pandas() method directly
on the client object instead.
"""
# MutableSequence interface methods
def __len__(self) -> int: ...
def __getitem__(self, index: int): ...
def __setitem__(self, index: int, value): ...
def __delitem__(self, index: int): ...
def insert(self, index: int, value): ...
class Connection:
"""DB API 2.0 connection object."""
def close(self) -> None: ...
def commit(self) -> None: ...
def cursor(self) -> 'Cursor': ...
def execute(self, operation: str, parameters: dict = None) -> 'Cursor': ...
class Cursor:
"""DB API 2.0 cursor object."""
rowcount: int
description: list
arraysize: int
def close(self) -> None: ...
def execute(self, operation: str, parameters: dict = None) -> None: ...
def fetchone(self) -> tuple: ...
def fetchmany(self, size: int = None) -> list: ...
def fetchall(self) -> list: ...