A Python connector for Apache Druid with synchronous and asynchronous clients, database API support, and comprehensive query building utilities.
—
The AsyncPyDruid client provides asynchronous query execution using Tornado's HTTP client, enabling non-blocking operations suitable for high-concurrency applications and async frameworks.
Creates a new AsyncPyDruid client instance for asynchronous Druid queries.
class AsyncPyDruid:
def __init__(
self,
url: str,
endpoint: str,
defaults: dict = None,
http_client: str = None
) -> None:
"""
Initialize AsyncPyDruid client.
Parameters:
- url: URL of Broker node in the Druid cluster
- endpoint: Endpoint that Broker listens for queries on (typically 'druid/v2/')
- defaults: Default parameters for the Tornado HTTP client (optional)
- http_client: Tornado HTTP client implementation to use (optional)
"""Configure client authentication and proxy settings (inherited from base client).
def set_basic_auth_credentials(self, username: str, password: str) -> None:
"""
Set HTTP Basic Authentication credentials.
Parameters:
- username: Username for authentication
- password: Password for authentication
"""
def set_proxies(self, proxies: dict) -> None:
"""
Configure proxy settings for HTTP requests.
Parameters:
- proxies: Dictionary mapping protocol names to proxy URLs
"""Execute TopN queries asynchronously.
async def topn(
self,
datasource: str,
granularity: str,
intervals: str | list,
aggregations: dict,
dimension: str,
metric: str,
threshold: int,
filter: 'Filter' = None,
post_aggregations: dict = None,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a TopN query asynchronously.
Parameters:
- datasource: Data source to query
- granularity: Time granularity ('all', 'day', 'hour', 'minute', etc.)
- intervals: ISO-8601 intervals ('2014-02-02/p4w' or list of intervals)
- aggregations: Dict mapping aggregator names to aggregator specifications
- dimension: Dimension to run the query against
- metric: Metric to sort the dimension values by
- threshold: Number of top items to return
- filter: Filter to apply to the data (optional)
- post_aggregations: Dict of post-aggregations to compute (optional)
- context: Query context parameters (optional)
Returns:
Query object containing results and metadata
"""Execute timeseries queries asynchronously.
async def timeseries(
self,
datasource: str,
granularity: str,
intervals: str | list,
aggregations: dict,
filter: 'Filter' = None,
post_aggregations: dict = None,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a timeseries query asynchronously.
Parameters:
- datasource: Data source to query
- granularity: Time granularity for aggregation
- intervals: ISO-8601 intervals to query
- aggregations: Dict mapping aggregator names to aggregator specifications
- filter: Filter to apply to the data (optional)
- post_aggregations: Dict of post-aggregations to compute (optional)
- context: Query context parameters (optional)
Returns:
Query object containing time-series results
"""Execute groupBy queries asynchronously.
async def groupby(
self,
datasource: str,
granularity: str,
intervals: str | list,
dimensions: list,
aggregations: dict,
filter: 'Filter' = None,
having: 'Having' = None,
post_aggregations: dict = None,
limit_spec: dict = None,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a groupBy query asynchronously.
Parameters:
- datasource: Data source to query
- granularity: Time granularity for grouping
- intervals: ISO-8601 intervals to query
- dimensions: List of dimensions to group by
- aggregations: Dict mapping aggregator names to aggregator specifications
- filter: Filter to apply to the data (optional)
- having: Having clause for filtering grouped results (optional)
- post_aggregations: Dict of post-aggregations to compute (optional)
- limit_spec: Specification for limiting and ordering results (optional)
- context: Query context parameters (optional)
Returns:
Query object containing grouped results
"""Query metadata about datasources and segments asynchronously.
async def segment_metadata(
self,
datasource: str,
intervals: str | list = None,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a segment metadata query asynchronously.
Parameters:
- datasource: Data source to analyze
- intervals: ISO-8601 intervals to analyze (optional, defaults to all)
- context: Query context parameters (optional)
Returns:
Query object containing segment metadata
"""
async def time_boundary(
self,
datasource: str,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a time boundary query asynchronously.
Parameters:
- datasource: Data source to query
- context: Query context parameters (optional)
Returns:
Query object containing time boundary information
"""Execute select queries for raw data access asynchronously.
Note: The AsyncPyDruid client does not support scan queries. Use the synchronous client for scan operations.
async def select(
self,
datasource: str,
granularity: str,
intervals: str | list,
dimensions: list = None,
metrics: list = None,
filter: 'Filter' = None,
paging_spec: dict = None,
context: dict = None,
**kwargs
) -> Query:
"""
Execute a select query for raw data access asynchronously.
Parameters:
- datasource: Data source to query
- granularity: Time granularity
- intervals: ISO-8601 intervals to query
- dimensions: List of dimensions to include (optional)
- metrics: List of metrics to include (optional)
- filter: Filter to apply (optional)
- paging_spec: Paging specification for large result sets (optional)
- context: Query context parameters (optional)
Returns:
Query object containing raw data
"""from tornado import gen
from pydruid.async_client import AsyncPyDruid
from pydruid.utils.aggregators import doublesum
from pydruid.utils.filters import Dimension
client = AsyncPyDruid('http://localhost:8082', 'druid/v2/')
@gen.coroutine
def execute_async_query():
top_mentions = yield client.topn(
datasource='twitterstream',
granularity='all',
intervals='2014-03-03/p1d',
aggregations={'count': doublesum('count')},
dimension='user_mention_name',
filter=(Dimension('user_lang') == 'en') & (Dimension('first_hashtag') == 'oscars'),
metric='count',
threshold=10
)
# Process results
print(top_mentions.result)
# Export to pandas (if available)
df = top_mentions.export_pandas()
# Return results (Python 3.x: can use 'return top_mentions')
raise gen.Return(top_mentions)
# Modern async/await syntax (Python 3.5+)
async def modern_async_query():
top_mentions = await client.topn(
datasource='twitterstream',
granularity='all',
intervals='2014-03-03/p1d',
aggregations={'count': doublesum('count')},
dimension='user_mention_name',
filter=(Dimension('user_lang') == 'en') & (Dimension('first_hashtag') == 'oscars'),
metric='count',
threshold=10
)
return top_mentionsThe asynchronous client provides significant performance benefits for applications that:
The async client uses Tornado's optimized HTTP client and allows the event loop to handle other tasks while waiting for Druid responses, resulting in much better resource utilization and throughput compared to the synchronous client in concurrent scenarios.
Install with Tessl CLI
npx tessl i tessl/pypi-pydruid