CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pydruid

A Python connector for Apache Druid with synchronous and asynchronous clients, database API support, and comprehensive query building utilities.

Pending
Overview
Eval results
Files

query-utilities.mddocs/

Query Building Utilities

Comprehensive utility modules for constructing Druid queries including aggregations, filters, dimensions, having clauses, and post-aggregations. These utilities provide a declarative API that generates the appropriate JSON structures for Druid's native query format.

Capabilities

Aggregations

Aggregation functions that compute metrics during query execution.

# Numeric aggregations
def doublesum(raw_metric: str) -> dict:
    """Sum aggregation for double/float values."""

def longsum(raw_metric: str) -> dict:
    """Sum aggregation for long/integer values."""

def doublemin(raw_metric: str) -> dict:
    """Minimum aggregation for double/float values."""

def doublemax(raw_metric: str) -> dict:
    """Maximum aggregation for double/float values."""

def longmin(raw_metric: str) -> dict:
    """Minimum aggregation for long/integer values."""

def longmax(raw_metric: str) -> dict:
    """Maximum aggregation for long/integer values."""

def count(raw_metric: str) -> dict:
    """Count aggregation."""

# Approximate aggregations
def hyperunique(raw_metric: str) -> dict:
    """HyperLogLog-based unique count approximation."""

def cardinality(raw_column: str, by_row: bool = False) -> dict:
    """
    Cardinality estimation using HyperLogLog.
    
    Parameters:
    - raw_column: Column to compute cardinality for
    - by_row: Whether to compute cardinality by row (default: False)
    """

def thetasketch(raw_column: str, isinputthetasketch: bool = False, size: int = 16384) -> dict:
    """
    Theta sketch aggregation for set operations.
    
    Parameters:
    - raw_column: Column containing sketch data
    - isinputthetasketch: Whether input is already a theta sketch
    - size: Sketch size parameter
    """

def quantilesDoublesSketch(raw_column: str, k: int = 128) -> dict:
    """
    Quantiles sketch aggregation for approximate quantile computation.
    
    Parameters:
    - raw_column: Column to compute quantiles for
    - k: Sketch accuracy parameter (higher = more accurate)
    """

# String aggregations
def stringfirst(raw_metric: str) -> dict:
    """First string value aggregation."""

def stringlast(raw_metric: str) -> dict:
    """Last string value aggregation."""

# Conditional aggregation
def filtered(filter: 'Filter', agg: dict) -> dict:
    """
    Apply filter condition to aggregation.
    
    Parameters:
    - filter: Filter condition to apply
    - agg: Aggregation to wrap with filter
    """

# Custom aggregation
def javascript(columns_list: list, fn_aggregate: str, fn_combine: str, fn_reset: str) -> dict:
    """
    JavaScript-based custom aggregation.
    
    Parameters:
    - columns_list: List of column names to aggregate
    - fn_aggregate: JavaScript function for aggregating individual rows
    - fn_combine: JavaScript function for combining partial aggregates
    - fn_reset: JavaScript function for resetting aggregation state
    """

Filters

Filter conditions for limiting query results.

class Filter:
    """Base filter class supporting logical operations."""
    
    def __init__(self, extraction_function=None, ordering: str = "lexicographic", **args) -> None:
        """
        Initialize filter.
        
        Parameters:
        - extraction_function: Optional extraction function to apply
        - ordering: Sort ordering for bound filters
        - **args: Filter-specific arguments
        """
    
    def __and__(self, other: 'Filter') -> 'Filter':
        """Logical AND operation between filters."""
    
    def __or__(self, other: 'Filter') -> 'Filter':
        """Logical OR operation between filters."""
    
    def __invert__(self) -> 'Filter':
        """Logical NOT operation on filter."""

class Dimension:
    """Helper for creating dimension-based filters."""
    
    def __init__(self, dim: str) -> None:
        """
        Initialize dimension filter helper.
        
        Parameters:
        - dim: Dimension name
        """
    
    def __eq__(self, other) -> Filter:
        """Create equality filter (dimension == value)."""
    
    def __ne__(self, other) -> Filter:
        """Create inequality filter (dimension != value)."""
    
    def in_(self, values: list) -> Filter:
        """Create 'in' filter (dimension in [values])."""

class JavaScript:
    """Helper for creating JavaScript-based filters."""
    
    def __init__(self, dim: str) -> None:
        """
        Initialize JavaScript filter helper.
        
        Parameters:
        - dim: Dimension name
        """
    
    def __eq__(self, js_function: str) -> Filter:
        """Create JavaScript filter with custom function."""

class Bound:
    """Range-based filter for numeric and string comparisons."""
    
    def __init__(
        self,
        dimension: str,
        lower: str = None,
        upper: str = None,
        lowerStrict: bool = False,
        upperStrict: bool = False,
        alphaNumeric: bool = False,
        ordering: str = "lexicographic",
        extraction_function = None
    ) -> None:
        """
        Initialize bound filter.
        
        Parameters:
        - dimension: Dimension to filter
        - lower: Lower bound value (optional)
        - upper: Upper bound value (optional)
        - lowerStrict: Whether lower bound is exclusive
        - upperStrict: Whether upper bound is exclusive
        - alphaNumeric: Whether to use alphanumeric ordering
        - ordering: Sort ordering type
        - extraction_function: Optional extraction function
        """

class Interval:
    """Time interval filter."""
    
    def __init__(self, dimension: str, intervals: list, extraction_function = None) -> None:
        """
        Initialize interval filter.
        
        Parameters:
        - dimension: Time dimension to filter
        - intervals: List of ISO-8601 interval strings
        - extraction_function: Optional extraction function
        """

class Spatial:
    """Geographic/spatial filter."""
    
    def __init__(self, dimension: str, bound_type: str, **args) -> None:
        """
        Initialize spatial filter.
        
        Parameters:
        - dimension: Spatial dimension to filter
        - bound_type: Type of spatial bound ('rectangular', etc.)
        - **args: Spatial bound parameters
        """

Dimensions and Extraction Functions

Dimension specifications and value transformation functions.

class DimensionSpec:
    """Dimension specification with optional extraction function."""
    
    def __init__(
        self,
        dimension: str,
        output_name: str,
        extraction_function = None,
        filter_spec = None
    ) -> None:
        """
        Initialize dimension specification.
        
        Parameters:
        - dimension: Source dimension name
        - output_name: Output dimension name
        - extraction_function: Optional value extraction function
        - filter_spec: Optional dimension filtering specification
        """

# Extraction Functions
class RegexExtraction:
    """Extract values using regular expressions."""
    
    def __init__(self, expr: str) -> None:
        """
        Parameters:
        - expr: Regular expression pattern
        """

class TimeFormatExtraction:
    """Extract and format time values."""
    
    def __init__(self, format: str, locale: str = None, time_zone: str = None) -> None:
        """
        Parameters:
        - format: Time format string
        - locale: Locale for formatting (optional)
        - time_zone: Time zone for formatting (optional)
        """

class MapLookupExtraction:
    """Transform values using key-value mapping."""
    
    def __init__(
        self,
        mapping: dict,
        retain_missing_values: bool = False,
        replace_missing_values: str = None,
        injective: bool = False
    ) -> None:
        """
        Parameters:
        - mapping: Dictionary mapping input values to output values
        - retain_missing_values: Whether to keep unmapped values
        - replace_missing_values: Default value for unmapped inputs
        - injective: Whether the mapping is one-to-one
        """

Having Clauses

Having clauses for filtering grouped query results.

class Having:
    """Having clause builder for groupBy queries."""
    
    def __init__(self, **args) -> None:
        """Initialize having clause with filter conditions."""
    
    def __and__(self, other: 'Having') -> 'Having':
        """Logical AND operation between having clauses."""
    
    def __or__(self, other: 'Having') -> 'Having':
        """Logical OR operation between having clauses."""
    
    def __invert__(self) -> 'Having':
        """Logical NOT operation on having clause."""

class Aggregation:
    """Having clause based on aggregation values."""
    
    def __init__(self, agg: str) -> None:
        """
        Parameters:
        - agg: Aggregation name to filter on
        """
    
    def __eq__(self, value) -> Having:
        """Create equality having clause (aggregation == value)."""
    
    def __lt__(self, value) -> Having:
        """Create less-than having clause (aggregation < value)."""
    
    def __gt__(self, value) -> Having:
        """Create greater-than having clause (aggregation > value)."""

Post-Aggregations

Post-aggregation operations computed after initial aggregation.

class Postaggregator:
    """Base post-aggregator with arithmetic operations."""
    
    def __init__(self, fn: str, fields: list, name: str) -> None:
        """
        Parameters:
        - fn: Post-aggregation function name
        - fields: List of field names to operate on
        - name: Output name for post-aggregation result
        """
    
    def __add__(self, other) -> 'Postaggregator':
        """Addition operation."""
    
    def __sub__(self, other) -> 'Postaggregator':
        """Subtraction operation."""
    
    def __mul__(self, other) -> 'Postaggregator':
        """Multiplication operation."""
    
    def __truediv__(self, other) -> 'Postaggregator':
        """Division operation."""

class Field:
    """Access aggregated field values."""
    
    def __init__(self, name: str) -> None:
        """
        Parameters:
        - name: Name of aggregated field to access
        """

class Const:
    """Constant value for post-aggregation operations."""
    
    def __init__(self, value: float, output_name: str = None) -> None:
        """
        Parameters:
        - value: Constant numeric value
        - output_name: Optional output name
        """

# Sketch-based post-aggregations
class ThetaSketch:
    """Access theta sketch aggregation results."""
    
    def __init__(self, name: str) -> None:
        """
        Parameters:
        - name: Name of theta sketch aggregation
        """
    
    def __and__(self, other: 'ThetaSketch') -> 'ThetaSketchOp':
        """Intersection operation between theta sketches."""
    
    def __or__(self, other: 'ThetaSketch') -> 'ThetaSketchOp':
        """Union operation between theta sketches."""

class ThetaSketchEstimate:
    """Estimate cardinality from theta sketch."""
    
    def __init__(self, fields: 'ThetaSketchOp') -> None:
        """
        Parameters:
        - fields: Theta sketch operation to estimate
        """

class QuantilesDoublesSketchToQuantile:
    """Extract quantile value from quantiles sketch."""
    
    def __init__(self, name: str, field_name: str, fraction: float) -> None:
        """
        Parameters:
        - name: Output name for the quantile value
        - field_name: Name of the quantiles sketch aggregation
        - fraction: Quantile fraction (0.0 to 1.0)
        """

class Quantile:
    """Access quantile value from quantiles aggregation."""
    
    def __init__(self, name: str, probability: float) -> None:
        """
        Parameters:
        - name: Name of quantiles aggregation field
        - probability: Quantile probability (0.0 to 1.0)
        """

class Quantiles:
    """Access multiple quantile values from quantiles aggregation."""
    
    def __init__(self, name: str, probabilities: list) -> None:
        """
        Parameters:
        - name: Name of quantiles aggregation field
        - probabilities: List of quantile probabilities (0.0 to 1.0)
        """

class HyperUniqueCardinality:
    """Extract cardinality estimate from hyperunique aggregation."""
    
    def __init__(self, name: str) -> None:
        """
        Parameters:
        - name: Name of hyperunique aggregation field
        """

class DoubleGreatest:
    """Return the greatest value among double post-aggregators."""
    
    def __init__(self, fields: list, output_name: str = None) -> None:
        """
        Parameters:
        - fields: List of post-aggregator objects to compare
        - output_name: Optional output name (defaults to 'doubleGreatest')
        """

class DoubleLeast:
    """Return the least value among double post-aggregators."""
    
    def __init__(self, fields: list, output_name: str = None) -> None:
        """
        Parameters:
        - fields: List of post-aggregator objects to compare
        - output_name: Optional output name (defaults to 'doubleLeast')
        """

class LongGreatest:
    """Return the greatest value among long post-aggregators."""
    
    def __init__(self, fields: list, output_name: str = None) -> None:
        """
        Parameters:
        - fields: List of post-aggregator objects to compare
        - output_name: Optional output name (defaults to 'longGreatest')
        """

class LongLeast:
    """Return the least value among long post-aggregators."""
    
    def __init__(self, fields: list, output_name: str = None) -> None:
        """
        Parameters:
        - fields: List of post-aggregator objects to compare
        - output_name: Optional output name (defaults to 'longLeast')
        """

class ThetaSketchOp:
    """Theta sketch set operations for combining sketches."""
    
    def __init__(self, fn: str, fields: list, name: str) -> None:
        """
        Parameters:
        - fn: Set operation function ('UNION', 'INTERSECT', 'NOT')
        - fields: List of theta sketch fields to operate on
        - name: Output name for the operation result
        """
    
    def __or__(self, other: 'ThetaSketchOp') -> 'ThetaSketchOp':
        """Union operation between theta sketches."""
    
    def __and__(self, other: 'ThetaSketchOp') -> 'ThetaSketchOp':
        """Intersection operation between theta sketches."""
    
    def __ne__(self, other: 'ThetaSketchOp') -> 'ThetaSketchOp':
        """Not operation (set difference) between theta sketches."""

Usage Examples

from pydruid.utils.aggregators import doublesum, count, filtered
from pydruid.utils.filters import Dimension, Filter
from pydruid.utils.postaggregator import Field
from pydruid.utils.having import Aggregation

# Complex filter example
filter_condition = (
    (Dimension('user_lang') == 'en') & 
    (Dimension('first_hashtag') == 'oscars') & 
    ~(Dimension('user_mention_name') == 'No Mention')
)

# Aggregations with post-aggregation
aggregations = {
    'total_tweets': count('count'),
    'total_length': doublesum('tweet_length'),
    'filtered_tweets': filtered(
        Dimension('verified') == 'true',
        count('count')
    )
}

post_aggregations = {
    'avg_length': Field('total_length') / Field('total_tweets'),
    'verified_ratio': Field('filtered_tweets') / Field('total_tweets')
}

# Having clause
having_clause = (Aggregation('total_tweets') > 100) & (Aggregation('avg_length') < 200)

Install with Tessl CLI

npx tessl i tessl/pypi-pydruid

docs

asynchronous-client.md

command-line-interface.md

database-api.md

index.md

query-utilities.md

sqlalchemy-integration.md

synchronous-client.md

tile.json