A Python connector for Apache Druid with synchronous and asynchronous clients, database API support, and comprehensive query building utilities.
—
Comprehensive utility modules for constructing Druid queries including aggregations, filters, dimensions, having clauses, and post-aggregations. These utilities provide a declarative API that generates the appropriate JSON structures for Druid's native query format.
Aggregation functions that compute metrics during query execution.
# Numeric aggregations
def doublesum(raw_metric: str) -> dict:
"""Sum aggregation for double/float values."""
def longsum(raw_metric: str) -> dict:
"""Sum aggregation for long/integer values."""
def doublemin(raw_metric: str) -> dict:
"""Minimum aggregation for double/float values."""
def doublemax(raw_metric: str) -> dict:
"""Maximum aggregation for double/float values."""
def longmin(raw_metric: str) -> dict:
"""Minimum aggregation for long/integer values."""
def longmax(raw_metric: str) -> dict:
"""Maximum aggregation for long/integer values."""
def count(raw_metric: str) -> dict:
"""Count aggregation."""
# Approximate aggregations
def hyperunique(raw_metric: str) -> dict:
"""HyperLogLog-based unique count approximation."""
def cardinality(raw_column: str, by_row: bool = False) -> dict:
"""
Cardinality estimation using HyperLogLog.
Parameters:
- raw_column: Column to compute cardinality for
- by_row: Whether to compute cardinality by row (default: False)
"""
def thetasketch(raw_column: str, isinputthetasketch: bool = False, size: int = 16384) -> dict:
"""
Theta sketch aggregation for set operations.
Parameters:
- raw_column: Column containing sketch data
- isinputthetasketch: Whether input is already a theta sketch
- size: Sketch size parameter
"""
def quantilesDoublesSketch(raw_column: str, k: int = 128) -> dict:
"""
Quantiles sketch aggregation for approximate quantile computation.
Parameters:
- raw_column: Column to compute quantiles for
- k: Sketch accuracy parameter (higher = more accurate)
"""
# String aggregations
def stringfirst(raw_metric: str) -> dict:
"""First string value aggregation."""
def stringlast(raw_metric: str) -> dict:
"""Last string value aggregation."""
# Conditional aggregation
def filtered(filter: 'Filter', agg: dict) -> dict:
"""
Apply filter condition to aggregation.
Parameters:
- filter: Filter condition to apply
- agg: Aggregation to wrap with filter
"""
# Custom aggregation
def javascript(columns_list: list, fn_aggregate: str, fn_combine: str, fn_reset: str) -> dict:
"""
JavaScript-based custom aggregation.
Parameters:
- columns_list: List of column names to aggregate
- fn_aggregate: JavaScript function for aggregating individual rows
- fn_combine: JavaScript function for combining partial aggregates
- fn_reset: JavaScript function for resetting aggregation state
"""Filter conditions for limiting query results.
class Filter:
"""Base filter class supporting logical operations."""
def __init__(self, extraction_function=None, ordering: str = "lexicographic", **args) -> None:
"""
Initialize filter.
Parameters:
- extraction_function: Optional extraction function to apply
- ordering: Sort ordering for bound filters
- **args: Filter-specific arguments
"""
def __and__(self, other: 'Filter') -> 'Filter':
"""Logical AND operation between filters."""
def __or__(self, other: 'Filter') -> 'Filter':
"""Logical OR operation between filters."""
def __invert__(self) -> 'Filter':
"""Logical NOT operation on filter."""
class Dimension:
"""Helper for creating dimension-based filters."""
def __init__(self, dim: str) -> None:
"""
Initialize dimension filter helper.
Parameters:
- dim: Dimension name
"""
def __eq__(self, other) -> Filter:
"""Create equality filter (dimension == value)."""
def __ne__(self, other) -> Filter:
"""Create inequality filter (dimension != value)."""
def in_(self, values: list) -> Filter:
"""Create 'in' filter (dimension in [values])."""
class JavaScript:
"""Helper for creating JavaScript-based filters."""
def __init__(self, dim: str) -> None:
"""
Initialize JavaScript filter helper.
Parameters:
- dim: Dimension name
"""
def __eq__(self, js_function: str) -> Filter:
"""Create JavaScript filter with custom function."""
class Bound:
"""Range-based filter for numeric and string comparisons."""
def __init__(
self,
dimension: str,
lower: str = None,
upper: str = None,
lowerStrict: bool = False,
upperStrict: bool = False,
alphaNumeric: bool = False,
ordering: str = "lexicographic",
extraction_function = None
) -> None:
"""
Initialize bound filter.
Parameters:
- dimension: Dimension to filter
- lower: Lower bound value (optional)
- upper: Upper bound value (optional)
- lowerStrict: Whether lower bound is exclusive
- upperStrict: Whether upper bound is exclusive
- alphaNumeric: Whether to use alphanumeric ordering
- ordering: Sort ordering type
- extraction_function: Optional extraction function
"""
class Interval:
"""Time interval filter."""
def __init__(self, dimension: str, intervals: list, extraction_function = None) -> None:
"""
Initialize interval filter.
Parameters:
- dimension: Time dimension to filter
- intervals: List of ISO-8601 interval strings
- extraction_function: Optional extraction function
"""
class Spatial:
"""Geographic/spatial filter."""
def __init__(self, dimension: str, bound_type: str, **args) -> None:
"""
Initialize spatial filter.
Parameters:
- dimension: Spatial dimension to filter
- bound_type: Type of spatial bound ('rectangular', etc.)
- **args: Spatial bound parameters
"""Dimension specifications and value transformation functions.
class DimensionSpec:
"""Dimension specification with optional extraction function."""
def __init__(
self,
dimension: str,
output_name: str,
extraction_function = None,
filter_spec = None
) -> None:
"""
Initialize dimension specification.
Parameters:
- dimension: Source dimension name
- output_name: Output dimension name
- extraction_function: Optional value extraction function
- filter_spec: Optional dimension filtering specification
"""
# Extraction Functions
class RegexExtraction:
"""Extract values using regular expressions."""
def __init__(self, expr: str) -> None:
"""
Parameters:
- expr: Regular expression pattern
"""
class TimeFormatExtraction:
"""Extract and format time values."""
def __init__(self, format: str, locale: str = None, time_zone: str = None) -> None:
"""
Parameters:
- format: Time format string
- locale: Locale for formatting (optional)
- time_zone: Time zone for formatting (optional)
"""
class MapLookupExtraction:
"""Transform values using key-value mapping."""
def __init__(
self,
mapping: dict,
retain_missing_values: bool = False,
replace_missing_values: str = None,
injective: bool = False
) -> None:
"""
Parameters:
- mapping: Dictionary mapping input values to output values
- retain_missing_values: Whether to keep unmapped values
- replace_missing_values: Default value for unmapped inputs
- injective: Whether the mapping is one-to-one
"""Having clauses for filtering grouped query results.
class Having:
"""Having clause builder for groupBy queries."""
def __init__(self, **args) -> None:
"""Initialize having clause with filter conditions."""
def __and__(self, other: 'Having') -> 'Having':
"""Logical AND operation between having clauses."""
def __or__(self, other: 'Having') -> 'Having':
"""Logical OR operation between having clauses."""
def __invert__(self) -> 'Having':
"""Logical NOT operation on having clause."""
class Aggregation:
"""Having clause based on aggregation values."""
def __init__(self, agg: str) -> None:
"""
Parameters:
- agg: Aggregation name to filter on
"""
def __eq__(self, value) -> Having:
"""Create equality having clause (aggregation == value)."""
def __lt__(self, value) -> Having:
"""Create less-than having clause (aggregation < value)."""
def __gt__(self, value) -> Having:
"""Create greater-than having clause (aggregation > value)."""Post-aggregation operations computed after initial aggregation.
class Postaggregator:
"""Base post-aggregator with arithmetic operations."""
def __init__(self, fn: str, fields: list, name: str) -> None:
"""
Parameters:
- fn: Post-aggregation function name
- fields: List of field names to operate on
- name: Output name for post-aggregation result
"""
def __add__(self, other) -> 'Postaggregator':
"""Addition operation."""
def __sub__(self, other) -> 'Postaggregator':
"""Subtraction operation."""
def __mul__(self, other) -> 'Postaggregator':
"""Multiplication operation."""
def __truediv__(self, other) -> 'Postaggregator':
"""Division operation."""
class Field:
"""Access aggregated field values."""
def __init__(self, name: str) -> None:
"""
Parameters:
- name: Name of aggregated field to access
"""
class Const:
"""Constant value for post-aggregation operations."""
def __init__(self, value: float, output_name: str = None) -> None:
"""
Parameters:
- value: Constant numeric value
- output_name: Optional output name
"""
# Sketch-based post-aggregations
class ThetaSketch:
"""Access theta sketch aggregation results."""
def __init__(self, name: str) -> None:
"""
Parameters:
- name: Name of theta sketch aggregation
"""
def __and__(self, other: 'ThetaSketch') -> 'ThetaSketchOp':
"""Intersection operation between theta sketches."""
def __or__(self, other: 'ThetaSketch') -> 'ThetaSketchOp':
"""Union operation between theta sketches."""
class ThetaSketchEstimate:
"""Estimate cardinality from theta sketch."""
def __init__(self, fields: 'ThetaSketchOp') -> None:
"""
Parameters:
- fields: Theta sketch operation to estimate
"""
class QuantilesDoublesSketchToQuantile:
"""Extract quantile value from quantiles sketch."""
def __init__(self, name: str, field_name: str, fraction: float) -> None:
"""
Parameters:
- name: Output name for the quantile value
- field_name: Name of the quantiles sketch aggregation
- fraction: Quantile fraction (0.0 to 1.0)
"""
class Quantile:
"""Access quantile value from quantiles aggregation."""
def __init__(self, name: str, probability: float) -> None:
"""
Parameters:
- name: Name of quantiles aggregation field
- probability: Quantile probability (0.0 to 1.0)
"""
class Quantiles:
"""Access multiple quantile values from quantiles aggregation."""
def __init__(self, name: str, probabilities: list) -> None:
"""
Parameters:
- name: Name of quantiles aggregation field
- probabilities: List of quantile probabilities (0.0 to 1.0)
"""
class HyperUniqueCardinality:
"""Extract cardinality estimate from hyperunique aggregation."""
def __init__(self, name: str) -> None:
"""
Parameters:
- name: Name of hyperunique aggregation field
"""
class DoubleGreatest:
"""Return the greatest value among double post-aggregators."""
def __init__(self, fields: list, output_name: str = None) -> None:
"""
Parameters:
- fields: List of post-aggregator objects to compare
- output_name: Optional output name (defaults to 'doubleGreatest')
"""
class DoubleLeast:
"""Return the least value among double post-aggregators."""
def __init__(self, fields: list, output_name: str = None) -> None:
"""
Parameters:
- fields: List of post-aggregator objects to compare
- output_name: Optional output name (defaults to 'doubleLeast')
"""
class LongGreatest:
"""Return the greatest value among long post-aggregators."""
def __init__(self, fields: list, output_name: str = None) -> None:
"""
Parameters:
- fields: List of post-aggregator objects to compare
- output_name: Optional output name (defaults to 'longGreatest')
"""
class LongLeast:
"""Return the least value among long post-aggregators."""
def __init__(self, fields: list, output_name: str = None) -> None:
"""
Parameters:
- fields: List of post-aggregator objects to compare
- output_name: Optional output name (defaults to 'longLeast')
"""
class ThetaSketchOp:
"""Theta sketch set operations for combining sketches."""
def __init__(self, fn: str, fields: list, name: str) -> None:
"""
Parameters:
- fn: Set operation function ('UNION', 'INTERSECT', 'NOT')
- fields: List of theta sketch fields to operate on
- name: Output name for the operation result
"""
def __or__(self, other: 'ThetaSketchOp') -> 'ThetaSketchOp':
"""Union operation between theta sketches."""
def __and__(self, other: 'ThetaSketchOp') -> 'ThetaSketchOp':
"""Intersection operation between theta sketches."""
def __ne__(self, other: 'ThetaSketchOp') -> 'ThetaSketchOp':
"""Not operation (set difference) between theta sketches."""from pydruid.utils.aggregators import doublesum, count, filtered
from pydruid.utils.filters import Dimension, Filter
from pydruid.utils.postaggregator import Field
from pydruid.utils.having import Aggregation
# Complex filter example
filter_condition = (
(Dimension('user_lang') == 'en') &
(Dimension('first_hashtag') == 'oscars') &
~(Dimension('user_mention_name') == 'No Mention')
)
# Aggregations with post-aggregation
aggregations = {
'total_tweets': count('count'),
'total_length': doublesum('tweet_length'),
'filtered_tweets': filtered(
Dimension('verified') == 'true',
count('count')
)
}
post_aggregations = {
'avg_length': Field('total_length') / Field('total_tweets'),
'verified_ratio': Field('filtered_tweets') / Field('total_tweets')
}
# Having clause
having_clause = (Aggregation('total_tweets') > 100) & (Aggregation('avg_length') < 200)Install with Tessl CLI
npx tessl i tessl/pypi-pydruid