High-level Python library for Elasticsearch providing an idiomatic way to write and manipulate queries.
—
Statistical analysis and data grouping with comprehensive support for metric aggregations, bucket aggregations, and pipeline aggregations. Enables complex analytical queries, data summarization, and business intelligence operations on Elasticsearch data.
Main aggregation factory function for creating aggregation objects.
def A(name, **params):
"""
Create aggregation object.
Args:
name (str): Aggregation type name
**params: Aggregation parameters
Returns:
Aggregation: Aggregation object
Examples:
A('terms', field='category')
A('avg', field='price')
A('date_histogram', field='timestamp', calendar_interval='1d')
"""Container for managing multiple aggregations and sub-aggregations.
class Aggs:
"""
Aggregation container for organizing multiple aggregations.
"""
def bucket(self, name, agg_type, **params):
"""
Add bucket aggregation.
Args:
name (str): Aggregation name
agg_type (str): Bucket aggregation type
**params: Aggregation parameters
Returns:
Aggs: Sub-aggregation container
"""
def metric(self, name, agg_type, **params):
"""
Add metric aggregation.
Args:
name (str): Aggregation name
agg_type (str): Metric aggregation type
**params: Aggregation parameters
Returns:
Aggs: Current aggregation container
"""
def pipeline(self, name, agg_type, **params):
"""
Add pipeline aggregation.
Args:
name (str): Aggregation name
agg_type (str): Pipeline aggregation type
**params: Aggregation parameters
Returns:
Aggs: Current aggregation container
"""
def to_dict(self):
"""
Convert aggregations to dictionary.
Returns:
dict: Aggregations as dictionary
"""Aggregations that compute metrics over a set of documents.
class Avg:
"""
Average aggregation.
"""
def __init__(self, field=None, script=None, **kwargs):
"""
Args:
field (str, optional): Field to compute average for
script (dict, optional): Script to compute values
**kwargs: Additional parameters
Parameters:
missing (float): Value for documents missing the field
format (str): Output format for the value
"""
class Max:
"""
Maximum value aggregation.
"""
def __init__(self, field=None, script=None, **kwargs):
"""Args and parameters same as Avg."""
class Min:
"""
Minimum value aggregation.
"""
def __init__(self, field=None, script=None, **kwargs):
"""Args and parameters same as Avg."""
class Sum:
"""
Sum aggregation.
"""
def __init__(self, field=None, script=None, **kwargs):
"""Args and parameters same as Avg."""
class Stats:
"""
Basic statistics aggregation (count, min, max, avg, sum).
"""
def __init__(self, field=None, script=None, **kwargs):
"""Args and parameters same as Avg."""
class ExtendedStats:
"""
Extended statistics aggregation (includes variance, std deviation, etc.).
"""
def __init__(self, field=None, script=None, **kwargs):
"""
Args and parameters same as Avg, plus:
sigma (float): Standard deviations for bounds calculation
"""
class ValueCount:
"""
Count of values aggregation.
"""
def __init__(self, field=None, script=None, **kwargs):
"""Args and parameters same as Avg."""
class Cardinality:
"""
Cardinality (unique count) approximation aggregation.
"""
def __init__(self, field=None, script=None, **kwargs):
"""
Args and parameters same as Avg, plus:
precision_threshold (int): Precision threshold for accuracy/memory tradeoff
"""
class Percentiles:
"""
Percentiles aggregation.
"""
def __init__(self, field=None, script=None, **kwargs):
"""
Args and parameters same as Avg, plus:
percents (list): List of percentiles to calculate (default: [1,5,25,50,75,95,99])
compression (int): Compression parameter for TDigest algorithm
method (str): Algorithm to use ('tdigest' or 'hdr')
"""
class PercentileRanks:
"""
Percentile ranks aggregation.
"""
def __init__(self, field=None, values=None, **kwargs):
"""
Args:
field (str, optional): Field to compute percentile ranks for
values (list): Values to find percentile ranks for
**kwargs: Additional parameters same as Percentiles
"""
class GeoBounds:
"""
Geographic bounding box aggregation.
"""
def __init__(self, field, **kwargs):
"""
Args:
field (str): Geographic field
**kwargs: Additional parameters
Parameters:
wrap_longitude (bool): Wrap longitude values
"""
class GeoCentroid:
"""
Geographic centroid aggregation.
"""
def __init__(self, field, **kwargs):
"""
Args:
field (str): Geographic field
**kwargs: Additional parameters
"""
class WeightedAvg:
"""
Weighted average aggregation.
"""
def __init__(self, value=None, weight=None, **kwargs):
"""
Args:
value (dict): Value configuration (field or script)
weight (dict): Weight configuration (field or script)
**kwargs: Additional parameters
Parameters:
format (str): Output format for the value
"""
class ScriptedMetric:
"""
Scripted metric aggregation for custom calculations.
"""
def __init__(self, **kwargs):
"""
Args:
**kwargs: Scripted metric parameters
Parameters:
init_script (dict): Initialization script
map_script (dict): Map script (executed per document)
combine_script (dict): Combine script (executed per shard)
reduce_script (dict): Reduce script (executed on coordinator)
params (dict): Script parameters
"""
class TopHits:
"""
Top hits aggregation for retrieving sample documents.
"""
def __init__(self, **kwargs):
"""
Args:
**kwargs: Top hits parameters
Parameters:
from_ (int): Starting offset
size (int): Number of hits to return
sort (list): Sort configuration
_source (dict): Source field filtering
highlight (dict): Highlighting configuration
explain (bool): Include explanation
version (bool): Include document version
seq_no_primary_term (bool): Include sequence number and primary term
stored_fields (list): Stored fields to retrieve
docvalue_fields (list): Doc value fields to retrieve
script_fields (dict): Script fields to compute
"""Aggregations that group documents into buckets.
class Terms:
"""
Terms aggregation for grouping by field values.
"""
def __init__(self, field=None, script=None, **kwargs):
"""
Args:
field (str, optional): Field to group by
script (dict, optional): Script to generate terms
**kwargs: Terms aggregation parameters
Parameters:
size (int): Number of buckets to return (default: 10)
shard_size (int): Number of buckets per shard
show_term_doc_count_error (bool): Show document count error
order (dict): Sort order for buckets
min_doc_count (int): Minimum document count per bucket
shard_min_doc_count (int): Minimum document count per shard
include (str or list): Include terms pattern/list
exclude (str or list): Exclude terms pattern/list
missing (str): Value for documents missing the field
execution_hint (str): Execution hint ('map' or 'global_ordinals')
collect_mode (str): Collection mode ('depth_first' or 'breadth_first')
"""
class Histogram:
"""
Histogram aggregation for numeric ranges.
"""
def __init__(self, field=None, interval=None, **kwargs):
"""
Args:
field (str, optional): Numeric field to histogram
interval (float): Histogram interval
**kwargs: Histogram parameters
Parameters:
min_doc_count (int): Minimum document count per bucket
extended_bounds (dict): Extended bounds (min, max)
hard_bounds (dict): Hard bounds (min, max)
order (dict): Sort order for buckets
keyed (bool): Return buckets as hash instead of array
missing (float): Value for documents missing the field
"""
class DateHistogram:
"""
Date histogram aggregation for date ranges.
"""
def __init__(self, field=None, **kwargs):
"""
Args:
field (str, optional): Date field to histogram
**kwargs: Date histogram parameters
Parameters:
calendar_interval (str): Calendar-aware interval ('1d', '1w', '1M', etc.)
fixed_interval (str): Fixed interval ('60s', '1h', etc.)
interval (str): Deprecated interval parameter
time_zone (str): Time zone for bucketing
offset (str): Offset for bucket boundaries
format (str): Date format for bucket keys
min_doc_count (int): Minimum document count per bucket
extended_bounds (dict): Extended bounds
hard_bounds (dict): Hard bounds
order (dict): Sort order for buckets
keyed (bool): Return buckets as hash instead of array
missing (str): Value for documents missing the field
"""
class AutoDateHistogram:
"""
Auto-interval date histogram aggregation.
"""
def __init__(self, field, buckets=None, **kwargs):
"""
Args:
field (str): Date field to histogram
buckets (int, optional): Target number of buckets
**kwargs: Parameters same as DateHistogram plus:
minimum_interval (str): Minimum allowed interval
"""
class Range:
"""
Range aggregation for custom numeric ranges.
"""
def __init__(self, field=None, ranges=None, **kwargs):
"""
Args:
field (str, optional): Numeric field for ranges
ranges (list): List of range definitions
**kwargs: Range parameters
Parameters:
script (dict): Script to generate values
keyed (bool): Return buckets as hash instead of array
missing (float): Value for documents missing the field
Range format: {'from': 0, 'to': 100, 'key': 'low'}
"""
class DateRange:
"""
Date range aggregation.
"""
def __init__(self, field=None, ranges=None, **kwargs):
"""
Args and parameters same as Range, plus:
format (str): Date format for range boundaries
time_zone (str): Time zone for date ranges
"""
class IpRange:
"""
IP address range aggregation.
"""
def __init__(self, field=None, ranges=None, **kwargs):
"""
Args:
field (str, optional): IP field for ranges
ranges (list): List of IP range definitions
**kwargs: Parameters same as Range
Range format: {'from': '192.168.1.0', 'to': '192.168.1.255'}
or {'mask': '192.168.1.0/24'}
"""
class GeoDistance:
"""
Geographic distance bucket aggregation.
"""
def __init__(self, field, origin, ranges=None, **kwargs):
"""
Args:
field (str): Geographic field
origin (dict): Origin point for distance calculation
ranges (list): List of distance range definitions
**kwargs: Geographic distance parameters
Parameters:
unit (str): Distance unit ('m', 'km', 'mi', etc.)
distance_type (str): Distance calculation type
keyed (bool): Return buckets as hash instead of array
Range format: {'from': 0, 'to': 100, 'key': 'near'}
"""
class GeoHashGrid:
"""
Geohash grid aggregation.
"""
def __init__(self, field, precision=None, **kwargs):
"""
Args:
field (str): Geographic field
precision (int, optional): Geohash precision level
**kwargs: Geohash grid parameters
Parameters:
size (int): Maximum number of buckets
shard_size (int): Maximum buckets per shard
bounds (dict): Bounding box for grid
"""
class GeoTileGrid:
"""
Geo-tile grid aggregation.
"""
def __init__(self, field, precision=None, **kwargs):
"""
Args:
field (str): Geographic field
precision (int, optional): Tile zoom level precision
**kwargs: Parameters same as GeoHashGrid
"""
class Filter:
"""
Filter aggregation.
"""
def __init__(self, filter=None, **kwargs):
"""
Args:
filter (Query): Filter query
**kwargs: Additional parameters
"""
class Filters:
"""
Multiple filters aggregation.
"""
def __init__(self, filters=None, **kwargs):
"""
Args:
filters (dict): Named filters mapping
**kwargs: Filters parameters
Parameters:
other_bucket (bool): Include other bucket for unmatched docs
other_bucket_key (str): Key for other bucket
"""
class Missing:
"""
Missing values aggregation.
"""
def __init__(self, field, **kwargs):
"""
Args:
field (str): Field to check for missing values
**kwargs: Additional parameters
"""
class Nested:
"""
Nested aggregation for nested objects.
"""
def __init__(self, path, **kwargs):
"""
Args:
path (str): Path to nested object
**kwargs: Additional parameters
"""
class ReverseNested:
"""
Reverse nested aggregation.
"""
def __init__(self, path=None, **kwargs):
"""
Args:
path (str, optional): Path to reverse to (default: root)
**kwargs: Additional parameters
"""
class Global:
"""
Global aggregation ignoring query context.
"""
def __init__(self, **kwargs):
"""
Args:
**kwargs: Additional parameters
"""
class Sampler:
"""
Sampler aggregation for sampling documents.
"""
def __init__(self, shard_size=None, **kwargs):
"""
Args:
shard_size (int, optional): Sample size per shard
**kwargs: Additional parameters
"""
class DiversifiedSampler:
"""
Diversified sampler aggregation.
"""
def __init__(self, field=None, shard_size=None, **kwargs):
"""
Args:
field (str, optional): Field to diversify on
shard_size (int, optional): Sample size per shard
**kwargs: Parameters same as Sampler plus:
max_docs_per_value (int): Max docs per field value
execution_hint (str): Execution hint
"""
class SignificantTerms:
"""
Significant terms aggregation.
"""
def __init__(self, field=None, **kwargs):
"""
Args:
field (str, optional): Field to find significant terms in
**kwargs: Significant terms parameters
Parameters:
size (int): Number of terms to return
shard_size (int): Number of terms per shard
min_doc_count (int): Minimum document count
shard_min_doc_count (int): Minimum document count per shard
chi_square (dict): Chi square significance test
gnd (dict): Google normalized distance test
mutual_information (dict): Mutual information test
percentage (dict): Percentage test
script_heuristic (dict): Custom script test
background_filter (Query): Background filter
include (str or list): Include terms pattern/list
exclude (str or list): Exclude terms pattern/list
execution_hint (str): Execution hint
"""
class RareTerms:
"""
Rare terms aggregation.
"""
def __init__(self, field=None, **kwargs):
"""
Args:
field (str, optional): Field to find rare terms in
**kwargs: Rare terms parameters
Parameters:
max_doc_count (int): Maximum document count for rare terms
precision (float): Precision for rarity calculation
include (str or list): Include terms pattern/list
exclude (str or list): Exclude terms pattern/list
missing (str): Value for documents missing the field
"""
class Composite:
"""
Composite aggregation for pagination of bucket aggregations.
"""
def __init__(self, sources, **kwargs):
"""
Args:
sources (list): List of source configurations
**kwargs: Composite parameters
Parameters:
size (int): Number of buckets to return
after (dict): After key for pagination
"""
class MultiTerms:
"""
Multi-field terms aggregation.
"""
def __init__(self, terms, **kwargs):
"""
Args:
terms (list): List of term configurations
**kwargs: Multi-terms parameters
Parameters:
size (int): Number of buckets to return
shard_size (int): Number of buckets per shard
show_term_doc_count_error (bool): Show document count error
order (dict): Sort order for buckets
min_doc_count (int): Minimum document count per bucket
shard_min_doc_count (int): Minimum document count per shard
"""
class Adjacency:
"""
Adjacency matrix aggregation.
"""
def __init__(self, filters=None, **kwargs):
"""
Args:
filters (dict): Named filters for adjacency matrix
**kwargs: Adjacency parameters
Parameters:
separator (str): Separator for bucket keys
"""
class Parent:
"""
Parent aggregation for parent-child relationships.
"""
def __init__(self, type, **kwargs):
"""
Args:
type (str): Child document type
**kwargs: Parent aggregation parameters
"""
class Children:
"""
Children aggregation for parent-child relationships.
"""
def __init__(self, type, **kwargs):
"""
Args:
type (str): Child document type
**kwargs: Children aggregation parameters
"""
class VariableWidthHistogram:
"""
Variable width histogram aggregation.
"""
def __init__(self, field=None, buckets=None, **kwargs):
"""
Args:
field (str, optional): Field to histogram
buckets (int, optional): Target number of buckets
**kwargs: Variable width histogram parameters
Parameters:
shard_size (int): Shard size for sampling
initial_buffer (int): Initial buffer size
"""
class CategorizeText:
"""
Categorize text aggregation for ML-based text categorization.
"""
def __init__(self, field, **kwargs):
"""
Args:
field (str): Text field to categorize
**kwargs: Categorize text parameters
Parameters:
max_unique_tokens (int): Maximum unique tokens
max_matched_tokens (int): Maximum matched tokens per category
similarity_threshold (float): Similarity threshold for categorization
categorization_filters (list): Filters for categorization
"""Aggregations that process the output of other aggregations.
class AvgBucket:
"""
Average bucket pipeline aggregation.
"""
def __init__(self, buckets_path, **kwargs):
"""
Args:
buckets_path (str): Path to buckets to average
**kwargs: Pipeline parameters
Parameters:
gap_policy (str): Policy for data gaps ('skip' or 'insert_zeros')
format (str): Output format
"""
class MaxBucket:
"""
Max bucket pipeline aggregation.
"""
def __init__(self, buckets_path, **kwargs):
"""Args and parameters same as AvgBucket."""
class MinBucket:
"""
Min bucket pipeline aggregation.
"""
def __init__(self, buckets_path, **kwargs):
"""Args and parameters same as AvgBucket."""
class SumBucket:
"""
Sum bucket pipeline aggregation.
"""
def __init__(self, buckets_path, **kwargs):
"""Args and parameters same as AvgBucket."""
class StatsBucket:
"""
Stats bucket pipeline aggregation.
"""
def __init__(self, buckets_path, **kwargs):
"""Args and parameters same as AvgBucket."""
class ExtendedStatsBucket:
"""
Extended stats bucket pipeline aggregation.
"""
def __init__(self, buckets_path, **kwargs):
"""
Args and parameters same as AvgBucket, plus:
sigma (float): Standard deviations for bounds
"""
class PercentilesBucket:
"""
Percentiles bucket pipeline aggregation.
"""
def __init__(self, buckets_path, **kwargs):
"""
Args and parameters same as AvgBucket, plus:
percents (list): Percentiles to calculate
"""
class MovingAvg:
"""
Moving average pipeline aggregation.
"""
def __init__(self, buckets_path, **kwargs):
"""
Args:
buckets_path (str): Path to values for moving average
**kwargs: Moving average parameters
Parameters:
window (int): Size of moving window
model (str): Moving average model ('simple', 'linear', 'ewma', 'holt', 'holt_winters')
gap_policy (str): Policy for data gaps
predict (int): Number of predictions to make
settings (dict): Model-specific settings
"""
class MovingFn:
"""
Moving function pipeline aggregation.
"""
def __init__(self, buckets_path, script, window, **kwargs):
"""
Args:
buckets_path (str): Path to values for moving function
script (dict): Script to execute
window (int): Size of moving window
**kwargs: Parameters same as MovingAvg
"""
class Derivative:
"""
Derivative pipeline aggregation.
"""
def __init__(self, buckets_path, **kwargs):
"""
Args:
buckets_path (str): Path to values for derivative
**kwargs: Derivative parameters
Parameters:
gap_policy (str): Policy for data gaps
format (str): Output format
unit (str): Unit for derivative calculation
"""
class SerialDiff:
"""
Serial differencing pipeline aggregation.
"""
def __init__(self, buckets_path, **kwargs):
"""
Args:
buckets_path (str): Path to values for differencing
**kwargs: Parameters same as Derivative plus:
lag (int): Lag for differencing calculation
"""
class CumulativeSum:
"""
Cumulative sum pipeline aggregation.
"""
def __init__(self, buckets_path, **kwargs):
"""
Args:
buckets_path (str): Path to values for cumulative sum
**kwargs: Parameters same as Derivative
"""
class BucketScript:
"""
Bucket script pipeline aggregation.
"""
def __init__(self, buckets_path=None, script=None, **kwargs):
"""
Args:
buckets_path (dict): Named paths to bucket values
script (dict): Script to execute
**kwargs: Bucket script parameters
Parameters:
gap_policy (str): Policy for data gaps
format (str): Output format
"""
class BucketSelector:
"""
Bucket selector pipeline aggregation.
"""
def __init__(self, buckets_path=None, script=None, **kwargs):
"""
Args:
buckets_path (dict): Named paths to bucket values
script (dict): Script to execute (should return boolean)
**kwargs: Parameters same as BucketScript
"""
class BucketSort:
"""
Bucket sort pipeline aggregation.
"""
def __init__(self, **kwargs):
"""
Args:
**kwargs: Bucket sort parameters
Parameters:
sort (list): Sort configuration
from_ (int): Starting offset
size (int): Number of buckets to return
gap_policy (str): Policy for data gaps
"""from elasticsearch_dsl import Search, A
# Simple metric aggregations
search = Search(index='sales')
search.aggs.metric('avg_price', 'avg', field='price')
search.aggs.metric('total_revenue', 'sum', field='revenue')
search.aggs.metric('price_stats', 'stats', field='price')
response = search.execute()
print(f"Average price: {response.aggregations.avg_price.value}")
print(f"Total revenue: {response.aggregations.total_revenue.value}")
print(f"Price stats: {response.aggregations.price_stats}")# Terms aggregation with nested metrics
search = Search(index='sales')
category_agg = search.aggs.bucket('categories', 'terms', field='category', size=10)
category_agg.metric('avg_price', 'avg', field='price')
category_agg.metric('total_sales', 'sum', field='quantity')
response = search.execute()
for bucket in response.aggregations.categories.buckets:
print(f"Category: {bucket.key}")
print(f" Average price: {bucket.avg_price.value}")
print(f" Total sales: {bucket.total_sales.value}")# Date histogram with multiple metrics
search = Search(index='sales')
date_agg = search.aggs.bucket(
'sales_over_time',
'date_histogram',
field='date',
calendar_interval='1d',
time_zone='America/New_York'
)
date_agg.metric('daily_revenue', 'sum', field='revenue')
date_agg.metric('daily_orders', 'value_count', field='order_id')
date_agg.metric('avg_order_value', 'avg', field='order_value')
response = search.execute()
for bucket in response.aggregations.sales_over_time.buckets:
print(f"Date: {bucket.key_as_string}")
print(f" Revenue: ${bucket.daily_revenue.value:.2f}")
print(f" Orders: {bucket.daily_orders.value}")
print(f" AOV: ${bucket.avg_order_value.value:.2f}")# Multi-level nested aggregations
search = Search(index='sales')
# Top level: group by category
category_agg = search.aggs.bucket('categories', 'terms', field='category')
# Second level: group by date within each category
date_agg = category_agg.bucket(
'monthly_sales',
'date_histogram',
field='date',
calendar_interval='1M'
)
# Third level: metrics within each date bucket
date_agg.metric('revenue', 'sum', field='revenue')
date_agg.metric('avg_price', 'avg', field='price')
# Also add category-level metrics
category_agg.metric('total_category_revenue', 'sum', field='revenue')
response = search.execute()
for category in response.aggregations.categories.buckets:
print(f"Category: {category.key}")
print(f"Total revenue: ${category.total_category_revenue.value:.2f}")
for month in category.monthly_sales.buckets:
print(f" {month.key_as_string}: ${month.revenue.value:.2f}")# Pipeline aggregations for trend analysis
search = Search(index='sales')
# Base date histogram
date_agg = search.aggs.bucket(
'sales_over_time',
'date_histogram',
field='date',
calendar_interval='1d'
)
date_agg.metric('daily_sales', 'sum', field='revenue')
# Pipeline aggregations
search.aggs.pipeline(
'avg_daily_sales',
'avg_bucket',
buckets_path='sales_over_time>daily_sales'
)
search.aggs.pipeline(
'sales_derivative',
'derivative',
buckets_path='sales_over_time>daily_sales'
)
search.aggs.pipeline(
'cumulative_sales',
'cumulative_sum',
buckets_path='sales_over_time>daily_sales'
)
response = search.execute()
print(f"Average daily sales: ${response.aggregations.avg_daily_sales.value:.2f}")
for bucket in response.aggregations.sales_over_time.buckets:
print(f"Date: {bucket.key_as_string}")
print(f" Daily sales: ${bucket.daily_sales.value:.2f}")
if hasattr(bucket, 'sales_derivative'):
print(f" Change: ${bucket.sales_derivative.value:.2f}")
if hasattr(bucket, 'cumulative_sales'):
print(f" Cumulative: ${bucket.cumulative_sales.value:.2f}")# Significant terms with background filter
search = Search(index='reviews')
search = search.filter('range', rating={'lt': 3}) # Low rated reviews
search.aggs.bucket(
'negative_review_terms',
'significant_terms',
field='review_text',
size=20,
min_doc_count=5,
background_filter=Q('match_all')
)
# Percentiles aggregation
search.aggs.metric(
'response_time_percentiles',
'percentiles',
field='response_time_ms',
percents=[50, 90, 95, 99]
)
# Geo distance aggregation
search.aggs.bucket(
'distance_from_center',
'geo_distance',
field='location',
origin={'lat': 40.7128, 'lon': -74.0060},
ranges=[
{'to': 1000, 'key': 'nearby'},
{'from': 1000, 'to': 5000, 'key': 'close'},
{'from': 5000, 'key': 'far'}
],
unit='m'
)
response = search.execute()Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch-dsl