A Python connector for Apache Druid with synchronous and asynchronous clients, database API support, and comprehensive query building utilities.
—
The PyDruid synchronous client provides a comprehensive interface for executing Druid queries with support for all query types, authentication, proxy configuration, and flexible result export capabilities.
Creates a new PyDruid client instance for connecting to a Druid broker.
class PyDruid:
def __init__(self, url: str, endpoint: str, cafile: str = None) -> None:
"""
Initialize PyDruid client.
Parameters:
- url: URL of Broker node in the Druid cluster
- endpoint: Endpoint that Broker listens for queries on (typically 'druid/v2/')
- cafile: Path to CA certificate file for SSL verification (optional)
"""Configure client authentication and proxy settings.
def set_basic_auth_credentials(self, username: str, password: str) -> None:
"""
Set HTTP Basic Authentication credentials.
Parameters:
- username: Username for authentication
- password: Password for authentication
"""
def set_proxies(self, proxies: dict) -> None:
"""
Configure proxy settings for HTTP requests.
Parameters:
- proxies: Dictionary mapping protocol names to proxy URLs
Example: {'http': 'http://proxy.example.com:8080'}
"""Execute TopN queries to retrieve the top values for a dimension sorted by a metric.
def topn(
self,
datasource: str,
granularity: str,
intervals: str | list,
aggregations: dict,
dimension: str,
metric: str,
threshold: int,
filter: 'Filter' = None,
post_aggregations: dict = None,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a TopN query.
Parameters:
- datasource: Data source to query
- granularity: Time granularity ('all', 'day', 'hour', 'minute', etc.)
- intervals: ISO-8601 intervals ('2014-02-02/p4w' or list of intervals)
- aggregations: Dict mapping aggregator names to aggregator specifications
- dimension: Dimension to run the query against
- metric: Metric to sort the dimension values by
- threshold: Number of top items to return
- filter: Filter to apply to the data (optional)
- post_aggregations: Dict of post-aggregations to compute (optional)
- context: Query context parameters (optional)
Returns:
Query object containing results and metadata
"""Example usage:
from pydruid.client import PyDruid
from pydruid.utils.aggregators import doublesum
from pydruid.utils.filters import Dimension
client = PyDruid('http://localhost:8082', 'druid/v2/')
top = client.topn(
datasource='twitterstream',
granularity='all',
intervals='2014-03-03/p1d',
aggregations={'count': doublesum('count')},
dimension='user_mention_name',
filter=(Dimension('user_lang') == 'en') & (Dimension('first_hashtag') == 'oscars'),
metric='count',
threshold=10
)
df = client.export_pandas()Execute timeseries queries to retrieve aggregated data over time intervals.
def timeseries(
self,
datasource: str,
granularity: str,
intervals: str | list,
aggregations: dict,
filter: 'Filter' = None,
post_aggregations: dict = None,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a timeseries query.
Parameters:
- datasource: Data source to query
- granularity: Time granularity for aggregation
- intervals: ISO-8601 intervals to query
- aggregations: Dict mapping aggregator names to aggregator specifications
- filter: Filter to apply to the data (optional)
- post_aggregations: Dict of post-aggregations to compute (optional)
- context: Query context parameters (optional)
Returns:
Query object containing time-series results
"""Execute groupBy queries to group data by one or more dimensions with aggregations.
def groupby(
self,
datasource: str,
granularity: str,
intervals: str | list,
dimensions: list,
aggregations: dict,
filter: 'Filter' = None,
having: 'Having' = None,
post_aggregations: dict = None,
limit_spec: dict = None,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a groupBy query.
Parameters:
- datasource: Data source to query
- granularity: Time granularity for grouping
- intervals: ISO-8601 intervals to query
- dimensions: List of dimensions to group by
- aggregations: Dict mapping aggregator names to aggregator specifications
- filter: Filter to apply to the data (optional)
- having: Having clause for filtering grouped results (optional)
- post_aggregations: Dict of post-aggregations to compute (optional)
- limit_spec: Specification for limiting and ordering results (optional)
- context: Query context parameters (optional)
Returns:
Query object containing grouped results
"""Query metadata about datasources and segments.
def segment_metadata(
self,
datasource: str,
intervals: str | list = None,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a segment metadata query.
Parameters:
- datasource: Data source to analyze
- intervals: ISO-8601 intervals to analyze (optional, defaults to all)
- context: Query context parameters (optional)
Returns:
Query object containing segment metadata
"""
def time_boundary(
self,
datasource: str,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a time boundary query.
Parameters:
- datasource: Data source to query
- context: Query context parameters (optional)
Returns:
Query object containing time boundary information
"""Execute select, scan, and sub-query operations for raw data access and query composition.
def subquery(
self,
**kwargs
) -> dict:
"""
Create a sub-query for use in nested queries.
Parameters:
- **kwargs: Query parameters (datasource, granularity, intervals, etc.)
Returns:
Dictionary representation of query (not executed)
Note:
This method returns a query dictionary without executing it,
allowing it to be used as a datasource in other queries.
"""Execute select and scan queries for raw data access.
def select(
self,
datasource: str,
granularity: str,
intervals: str | list,
dimensions: list = None,
metrics: list = None,
filter: 'Filter' = None,
paging_spec: dict = None,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a select query for raw data access.
Parameters:
- datasource: Data source to query
- granularity: Time granularity
- intervals: ISO-8601 intervals to query
- dimensions: List of dimensions to include (optional)
- metrics: List of metrics to include (optional)
- filter: Filter to apply (optional)
- paging_spec: Paging specification for large result sets (optional)
- context: Query context parameters (optional)
Returns:
Query object containing raw data
"""
def scan(
self,
datasource: str,
granularity: str,
intervals: str | list,
limit: int,
columns: list = None,
metrics: list = None,
filter: 'Filter' = None,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a scan query for raw data access.
Parameters:
- datasource: Data source to query
- granularity: Time granularity
- intervals: ISO-8601 intervals to query
- limit: Maximum number of rows to return
- columns: List of columns to select (optional, all columns if empty)
- metrics: List of metrics to select (optional, all metrics if empty)
- filter: Filter to apply (optional)
- context: Query context parameters (optional)
Returns:
Query object containing scan results
"""
def search(
self,
datasource: str,
granularity: str,
intervals: str | list,
searchDimensions: list,
query: dict,
limit: int = None,
filter: 'Filter' = None,
sort: dict = None,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a search query to find dimension values matching search specifications.
Parameters:
- datasource: Data source to query
- granularity: Time granularity
- intervals: ISO-8601 intervals to query
- searchDimensions: List of dimensions to search within
- query: Search query specification (e.g., {"type": "insensitive_contains", "value": "text"})
- limit: Maximum number of results to return (optional)
- filter: Filter to apply (optional)
- sort: Sort specification (optional)
- context: Query context parameters (optional)
Returns:
Query object containing search results
"""All query methods return Query objects that provide export capabilities:
# Export to pandas DataFrame (requires pandas)
df = client.export_pandas()
# Export to TSV file
client.export_tsv('results.tsv')
# Access raw results
results = client.result
query_dict = client.query_dictInstall with Tessl CLI
npx tessl i tessl/pypi-pydruid