CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyes

Python Elastic Search driver providing a pythonic interface for interacting with ElasticSearch clusters

Pending
Overview
Eval results
Files

facets-aggregations.mddocs/

PyES Facets and Aggregations

Overview

PyES provides comprehensive support for both legacy facets and modern aggregations for data analysis and summarization. Facets are the older ElasticSearch feature (deprecated in ES 2.x+) while aggregations are the modern replacement. Both provide powerful data analysis capabilities for building analytics dashboards, reporting, and data exploration features.

Facets (Legacy)

FacetFactory

class FacetFactory:
    """
    Factory class for creating and managing multiple facets.
    
    Provides convenient methods to add various facet types to a search.
    """
    
    def __init__(self):
        """Initialize FacetFactory."""
        pass
    
    def add_term_facet(self, name, field, size=10, **kwargs):
        """
        Add term facet for value distribution analysis.
        
        Args:
            name (str): Facet name for results
            field (str): Field to facet on
            size (int): Maximum number of terms to return. Default: 10
            **kwargs: Additional facet parameters
            
        Returns:
            FacetFactory: Self for method chaining
        """
        pass
    
    def add_date_facet(self, name, field, interval, **kwargs):
        """
        Add date histogram facet for time-based analysis.
        
        Args:
            name (str): Facet name for results
            field (str): Date field to facet on
            interval (str): Date interval (year, month, day, hour, minute)
            **kwargs: Additional facet parameters
            
        Returns:
            FacetFactory: Self for method chaining
        """
        pass
    
    def add_geo_facet(self, name, field, origin, ranges, **kwargs):
        """
        Add geo distance facet for location-based analysis.
        
        Args:
            name (str): Facet name for results
            field (str): Geo-point field
            origin (dict): Origin point {"lat": lat, "lon": lon}
            ranges (list): Distance ranges [{"to": "1km"}, {"from": "1km", "to": "5km"}]
            **kwargs: Additional facet parameters
            
        Returns:
            FacetFactory: Self for method chaining
        """
        pass
    
    def add(self, facet):
        """
        Add any facet object.
        
        Args:
            facet (Facet): Facet object to add
            
        Returns:
            FacetFactory: Self for method chaining
        """
        pass
    
    def reset(self):
        """Reset all facets."""
        pass

# Basic facet factory usage
from pyes import Search, MatchAllQuery, FacetFactory

search = Search(MatchAllQuery())
facets = FacetFactory()

# Add multiple facets
facets.add_term_facet("categories", "category.keyword", size=20)
facets.add_term_facet("authors", "author.keyword", size=10)  
facets.add_date_facet("monthly_posts", "published_date", "month")

# Apply facets to search
search.facet(facets)
results = es.search(search, indices=["blog"])

# Access facet results
category_counts = results.facets.categories.terms
monthly_counts = results.facets.monthly_posts.entries

Term Facet

class TermFacet:
    """
    Facet for analyzing term/value distribution.
    
    Provides counts of different values in a field.
    """
    
    def __init__(self, field, size=10, order=None, exclude=None, 
                 regex=None, script=None, **kwargs):
        """
        Initialize TermFacet.
        
        Args:
            field (str): Field to facet on
            size (int): Number of terms to return. Default: 10
            order (str): Sort order (count, term, reverse_count, reverse_term)
            exclude (list): Terms to exclude from results
            regex (str): Regular expression to filter terms
            script (str): Script for custom term calculation
            **kwargs: Additional parameters
        """
        pass

# Term facet analysis
from pyes import TermFacet

# Category distribution
category_facet = TermFacet("category.keyword", size=20, order="count")

# Author distribution excluding bots  
author_facet = TermFacet("author.keyword", size=15, exclude=["bot_user", "system"])

# Tag distribution with regex filter
tag_facet = TermFacet("tags", regex="python.*", size=10)

Date Histogram Facet

class DateHistogramFacet:
    """
    Facet for time-based histogram analysis.
    
    Groups documents by date intervals (year, month, day, etc.).
    """
    
    def __init__(self, field, interval, time_zone=None, pre_zone=None,
                 post_zone=None, pre_offset=None, post_offset=None,
                 factor=None, value_field=None, value_script=None, **kwargs):
        """
        Initialize DateHistogramFacet.
        
        Args:
            field (str): Date field to histogram
            interval (str): Time interval (year, month, week, day, hour, minute)
            time_zone (str, optional): Time zone for date calculations
            pre_zone (str, optional): Time zone before calculation
            post_zone (str, optional): Time zone after calculation  
            pre_offset (str, optional): Offset before calculation
            post_offset (str, optional): Offset after calculation
            factor (float, optional): Multiply timestamps by factor
            value_field (str, optional): Field to sum/count instead of doc count
            value_script (str, optional): Script for value calculation
            **kwargs: Additional parameters
        """
        pass

# Date histogram analysis
from pyes import DateHistogramFacet

# Monthly publication trend
monthly_facet = DateHistogramFacet("published_date", "month", time_zone="-05:00")

# Daily view counts with value field
daily_views_facet = DateHistogramFacet("created_date", "day", 
                                       value_field="view_count")

# Hourly activity pattern
hourly_facet = DateHistogramFacet("timestamp", "hour")

Statistical Facet

class StatisticalFacet:
    """
    Facet for statistical analysis of numeric fields.
    
    Provides count, total, mean, min, max, variance, and std_deviation.
    """
    
    def __init__(self, field=None, script=None, params=None, **kwargs):
        """
        Initialize StatisticalFacet.
        
        Args:
            field (str, optional): Numeric field to analyze
            script (str, optional): Script for custom value calculation
            params (dict, optional): Script parameters
            **kwargs: Additional parameters
        """
        pass

# Statistical analysis
from pyes import StatisticalFacet

# View count statistics
view_stats_facet = StatisticalFacet("view_count")

# Price statistics with currency conversion
price_stats_facet = StatisticalFacet(
    script="doc['price'].value * params.exchange_rate",
    params={"exchange_rate": 1.1}
)

Range Facet

class RangeFacet:
    """
    Facet for range-based distribution analysis.
    
    Groups documents into predefined ranges.
    """
    
    def __init__(self, field, ranges, **kwargs):
        """
        Initialize RangeFacet.
        
        Args:
            field (str): Numeric field to range
            ranges (list): List of range definitions
            **kwargs: Additional parameters
        """
        pass

# Range distribution analysis
from pyes import RangeFacet

# Price range distribution
price_ranges_facet = RangeFacet("price", [
    {"to": 25},
    {"from": 25, "to": 50},
    {"from": 50, "to": 100},
    {"from": 100}
])

# Age group distribution
age_facet = RangeFacet("age", [
    {"to": 18},
    {"from": 18, "to": 25},
    {"from": 25, "to": 35},
    {"from": 35, "to": 50},
    {"from": 50}
])

Aggregations (Modern)

AggFactory

class AggFactory:
    """
    Factory class for creating and managing aggregations.
    
    Modern replacement for FacetFactory with more powerful analysis capabilities.
    """
    
    def __init__(self):
        """Initialize AggFactory."""
        pass
    
    def add(self, agg):
        """
        Add aggregation to factory.
        
        Args:
            agg (Agg): Aggregation object to add
            
        Returns:
            AggFactory: Self for method chaining
        """
        pass
    
    def reset(self):
        """Reset all aggregations."""
        pass

# Basic aggregation factory usage
from pyes import AggFactory, TermsAgg, DateHistogramAgg, StatsAgg

agg_factory = AggFactory()
agg_factory.add(TermsAgg("categories", field="category.keyword"))
agg_factory.add(StatsAgg("view_stats", field="view_count"))

search = Search(MatchAllQuery()).add_aggregation(agg_factory)

Base Aggregation Classes

class Agg:
    """
    Base class for all aggregations.
    """
    
    def __init__(self, name, **kwargs):
        """
        Initialize base aggregation.
        
        Args:
            name (str): Aggregation name for results
            **kwargs: Aggregation-specific parameters
        """
        pass
    
    def add_aggregation(self, agg):
        """
        Add sub-aggregation.
        
        Args:
            agg (Agg): Sub-aggregation to nest
            
        Returns:
            Agg: Self for method chaining
        """
        pass

class BucketAgg(Agg):
    """
    Base class for bucket aggregations.
    
    Bucket aggregations group documents into buckets and can contain sub-aggregations.
    """
    pass

Terms Aggregation

class TermsAgg(BucketAgg):
    """
    Modern replacement for TermFacet with additional capabilities.
    
    Groups documents by distinct values in a field.
    """
    
    def __init__(self, name, field=None, size=10, shard_size=None, 
                 min_doc_count=1, include=None, exclude=None, 
                 order=None, script=None, **kwargs):
        """
        Initialize TermsAgg.
        
        Args:
            name (str): Aggregation name
            field (str, optional): Field to aggregate on
            size (int): Number of buckets to return. Default: 10
            shard_size (int, optional): Number of terms each shard returns
            min_doc_count (int): Minimum document count per bucket. Default: 1
            include (str|list, optional): Terms to include (regex or list)
            exclude (str|list, optional): Terms to exclude (regex or list)
            order (dict, optional): Sort order specification
            script (str, optional): Script for term calculation
            **kwargs: Additional parameters
        """
        pass

# Terms aggregation with sub-aggregations
from pyes import TermsAgg, AvgAgg, MaxAgg

# Category breakdown with average views per category
categories_agg = TermsAgg("categories", field="category.keyword", size=20)
categories_agg.add_aggregation(AvgAgg("avg_views", field="view_count"))
categories_agg.add_aggregation(MaxAgg("max_views", field="view_count"))

search = Search(MatchAllQuery()).add_aggregation(categories_agg)
results = es.search(search, indices=["blog"])

# Access nested results
for bucket in results.aggregations.categories.buckets:
    print(f"Category: {bucket.key}")
    print(f"Documents: {bucket.doc_count}")
    print(f"Average views: {bucket.avg_views.value}")
    print(f"Max views: {bucket.max_views.value}")

Date Histogram Aggregation

class DateHistogramAgg(BucketAgg):
    """
    Modern replacement for DateHistogramFacet with enhanced features.
    
    Creates time-based histograms with flexible intervals and time zones.
    """
    
    def __init__(self, name, field=None, interval=None, format=None,
                 time_zone=None, offset=None, min_doc_count=0,
                 extended_bounds=None, script=None, **kwargs):
        """
        Initialize DateHistogramAgg.
        
        Args:
            name (str): Aggregation name
            field (str, optional): Date field to histogram
            interval (str, optional): Time interval (1y, 1M, 1w, 1d, 1h, 1m, 1s)
            format (str, optional): Date format for buckets
            time_zone (str, optional): Time zone for calculations
            offset (str, optional): Time offset for bucket boundaries
            min_doc_count (int): Minimum docs per bucket. Default: 0
            extended_bounds (dict, optional): Force histogram bounds
            script (str, optional): Script for date calculation
            **kwargs: Additional parameters
        """
        pass

# Date histogram with time analysis
from pyes import DateHistogramAgg, SumAgg, CardinalityAgg

# Monthly trends with engagement metrics
monthly_agg = DateHistogramAgg("monthly_trends", 
                              field="published_date", 
                              interval="1M",
                              format="yyyy-MM",
                              time_zone="America/New_York")

# Add sub-aggregations for detailed analysis
monthly_agg.add_aggregation(SumAgg("total_views", field="view_count"))
monthly_agg.add_aggregation(CardinalityAgg("unique_authors", field="author.keyword"))

search = Search(MatchAllQuery()).add_aggregation(monthly_agg)
results = es.search(search, indices=["blog"])

# Time series analysis
for bucket in results.aggregations.monthly_trends.buckets:
    print(f"Month: {bucket.key_as_string}")
    print(f"Posts: {bucket.doc_count}")  
    print(f"Total views: {bucket.total_views.value}")
    print(f"Unique authors: {bucket.unique_authors.value}")

Range Aggregation

class RangeAgg(BucketAgg):
    """
    Creates buckets for different ranges of values.
    """
    
    def __init__(self, name, field=None, ranges=None, script=None, **kwargs):
        """
        Initialize RangeAgg.
        
        Args:
            name (str): Aggregation name
            field (str, optional): Field to create ranges on
            ranges (list): List of range definitions
            script (str, optional): Script for value calculation
            **kwargs: Additional parameters
        """
        pass

# Range-based bucketing
from pyes import RangeAgg, AvgAgg

# Price tier analysis
price_tiers_agg = RangeAgg("price_tiers", field="price", ranges=[
    {"key": "budget", "to": 25},
    {"key": "mid-range", "from": 25, "to": 100},
    {"key": "premium", "from": 100, "to": 500},
    {"key": "luxury", "from": 500}
])

# Add average rating per price tier
price_tiers_agg.add_aggregation(AvgAgg("avg_rating", field="rating"))

search = Search(MatchAllQuery()).add_aggregation(price_tiers_agg)

Histogram Aggregation

class HistogramAgg(BucketAgg):
    """
    Creates fixed-interval buckets for numeric values.
    """
    
    def __init__(self, name, field=None, interval=None, min_doc_count=0,
                 extended_bounds=None, script=None, **kwargs):
        """
        Initialize HistogramAgg.
        
        Args:
            name (str): Aggregation name
            field (str, optional): Numeric field to histogram
            interval (float): Fixed interval size
            min_doc_count (int): Minimum docs per bucket. Default: 0
            extended_bounds (dict, optional): Force histogram bounds
            script (str, optional): Script for value calculation
            **kwargs: Additional parameters
        """
        pass

# Fixed interval histograms
from pyes import HistogramAgg

# Price distribution in $10 intervals
price_histogram_agg = HistogramAgg("price_distribution", 
                                   field="price", 
                                   interval=10,
                                   extended_bounds={"min": 0, "max": 200})

# Rating distribution in 0.5 intervals
rating_histogram_agg = HistogramAgg("rating_distribution",
                                    field="rating",
                                    interval=0.5,
                                    min_doc_count=1)

Metric Aggregations

Statistical Aggregations

class StatsAgg(Agg):
    """
    Calculates statistics (count, min, max, avg, sum) for numeric field.
    """
    
    def __init__(self, name, field=None, script=None, **kwargs):
        """
        Initialize StatsAgg.
        
        Args:
            name (str): Aggregation name
            field (str, optional): Numeric field to analyze
            script (str, optional): Script for value calculation
            **kwargs: Additional parameters
        """
        pass

class SumAgg(Agg):
    """Calculate sum of numeric field values."""
    
    def __init__(self, name, field=None, script=None, **kwargs):
        pass

class AvgAgg(Agg):
    """Calculate average of numeric field values."""
    
    def __init__(self, name, field=None, script=None, **kwargs):
        pass

class MinAgg(Agg):
    """Find minimum value in numeric field."""
    
    def __init__(self, name, field=None, script=None, **kwargs):
        pass

class MaxAgg(Agg):
    """Find maximum value in numeric field."""
    
    def __init__(self, name, field=None, script=None, **kwargs):
        pass

class ValueCountAgg(Agg):
    """Count non-null values in field."""
    
    def __init__(self, name, field=None, script=None, **kwargs):
        pass

class CardinalityAgg(Agg):
    """
    Approximate count of unique values in field.
    """
    
    def __init__(self, name, field=None, precision_threshold=3000, **kwargs):
        """
        Initialize CardinalityAgg.
        
        Args:
            name (str): Aggregation name
            field (str, optional): Field to count unique values
            precision_threshold (int): Precision vs memory tradeoff. Default: 3000
            **kwargs: Additional parameters
        """
        pass

# Comprehensive metric analysis
from pyes import (StatsAgg, SumAgg, AvgAgg, MinAgg, MaxAgg, 
                  ValueCountAgg, CardinalityAgg)

# Multiple metric aggregations
search = Search(MatchAllQuery())

# Statistical overview
search.add_aggregation(StatsAgg("view_stats", field="view_count"))
search.add_aggregation(StatsAgg("rating_stats", field="rating"))

# Individual metrics  
search.add_aggregation(SumAgg("total_revenue", field="price"))
search.add_aggregation(AvgAgg("avg_response_time", field="response_ms"))
search.add_aggregation(CardinalityAgg("unique_visitors", field="user_id"))
search.add_aggregation(ValueCountAgg("posts_with_tags", field="tags"))

results = es.search(search, indices=["analytics"])

# Access metric results
print(f"Average views: {results.aggregations.view_stats.avg}")
print(f"Total revenue: {results.aggregations.total_revenue.value}")
print(f"Unique visitors: {results.aggregations.unique_visitors.value}")

Advanced Aggregation Patterns

Nested Aggregations

class NestedAgg(BucketAgg):
    """
    Aggregation on nested objects.
    """
    
    def __init__(self, name, path, **kwargs):
        """
        Initialize NestedAgg.
        
        Args:
            name (str): Aggregation name
            path (str): Path to nested objects
            **kwargs: Additional parameters
        """
        pass

class ReverseNestedAgg(BucketAgg):
    """
    Reverse nested aggregation to go back to parent documents.
    """
    
    def __init__(self, name, path=None, **kwargs):
        """
        Initialize ReverseNestedAgg.
        
        Args:
            name (str): Aggregation name  
            path (str, optional): Path to reverse to (root if None)
            **kwargs: Additional parameters
        """
        pass

# Nested object analysis
from pyes import NestedAgg, ReverseNestedAgg, TermsAgg

# Analyze product variants
variants_agg = NestedAgg("variants", path="variants")

# Color distribution within variants
color_agg = TermsAgg("colors", field="variants.color.keyword")
variants_agg.add_aggregation(color_agg)

# Back to parent for product categories
color_agg.add_aggregation(
    ReverseNestedAgg("products").add_aggregation(
        TermsAgg("categories", field="category.keyword")
    )
)

search = Search(MatchAllQuery()).add_aggregation(variants_agg)

Filter Aggregations

class FilterAgg(BucketAgg):
    """
    Single bucket aggregation that filters documents.
    """
    
    def __init__(self, name, filter=None, **kwargs):
        """
        Initialize FilterAgg.
        
        Args:
            name (str): Aggregation name
            filter (Filter): Filter to apply
            **kwargs: Additional parameters  
        """
        pass

class FiltersAgg(BucketAgg):
    """
    Multiple bucket aggregation with different filters per bucket.
    """
    
    def __init__(self, name, filters=None, **kwargs):
        """
        Initialize FiltersAgg.
        
        Args:
            name (str): Aggregation name
            filters (dict): Named filters for buckets
            **kwargs: Additional parameters
        """
        pass

# Filter-based bucketing
from pyes import FilterAgg, FiltersAgg, TermFilter, RangeFilter

# Single filter aggregation
high_rated_agg = FilterAgg("high_rated", 
                          filter=RangeFilter("rating", gte=4.0))
high_rated_agg.add_aggregation(AvgAgg("avg_price", field="price"))

# Multiple filter aggregation
segments_agg = FiltersAgg("segments", filters={
    "premium": RangeFilter("price", gte=100),
    "popular": RangeFilter("view_count", gte=1000),
    "recent": RangeFilter("created_date", gte="now-30d")
})

# Add metrics to each segment
for segment in ["premium", "popular", "recent"]:
    segments_agg.add_aggregation(StatsAgg(f"{segment}_stats", field="rating"))

Missing Values Aggregation

class MissingAgg(BucketAgg):
    """
    Single bucket for documents missing a field value.
    """
    
    def __init__(self, name, field, **kwargs):
        """
        Initialize MissingAgg.
        
        Args:
            name (str): Aggregation name
            field (str): Field to check for missing values
            **kwargs: Additional parameters
        """
        pass

# Missing value analysis
from pyes import MissingAgg

# Documents without ratings
missing_rating_agg = MissingAgg("no_rating", field="rating")
missing_rating_agg.add_aggregation(TermsAgg("categories", field="category.keyword"))

# Documents without tags
missing_tags_agg = MissingAgg("no_tags", field="tags")

Complex Multi-Level Aggregations

E-commerce Analytics Example

# Complex e-commerce analytics aggregation
from pyes import (Search, MatchAllQuery, TermsAgg, DateHistogramAgg, 
                  RangeAgg, StatsAgg, SumAgg, AvgAgg, CardinalityAgg)

def build_ecommerce_analytics():
    """Build comprehensive e-commerce analytics aggregation."""
    
    search = Search(MatchAllQuery())
    
    # Category performance analysis
    categories_agg = TermsAgg("category_performance", 
                             field="category.keyword", 
                             size=20)
    
    # Sales metrics per category
    categories_agg.add_aggregation(SumAgg("total_sales", field="sale_amount"))
    categories_agg.add_aggregation(AvgAgg("avg_price", field="price"))  
    categories_agg.add_aggregation(CardinalityAgg("unique_customers", field="customer_id"))
    
    # Monthly trends per category
    monthly_agg = DateHistogramAgg("monthly_trends",
                                   field="sale_date",
                                   interval="1M")
    monthly_agg.add_aggregation(SumAgg("monthly_revenue", field="sale_amount"))
    categories_agg.add_aggregation(monthly_agg)
    
    # Price tier analysis per category  
    price_tiers_agg = RangeAgg("price_tiers", field="price", ranges=[
        {"key": "budget", "to": 50},
        {"key": "mid", "from": 50, "to": 200},
        {"key": "premium", "from": 200}
    ])
    price_tiers_agg.add_aggregation(SumAgg("tier_revenue", field="sale_amount"))
    categories_agg.add_aggregation(price_tiers_agg)
    
    search.add_aggregation(categories_agg)
    
    # Overall time trends
    daily_trends_agg = DateHistogramAgg("daily_trends",
                                        field="sale_date", 
                                        interval="1d",
                                        min_doc_count=1)
    daily_trends_agg.add_aggregation(SumAgg("daily_revenue", field="sale_amount"))
    daily_trends_agg.add_aggregation(CardinalityAgg("daily_customers", field="customer_id"))
    daily_trends_agg.add_aggregation(AvgAgg("avg_order_value", field="sale_amount"))
    
    search.add_aggregation(daily_trends_agg)
    
    # Customer segmentation
    customer_segments_agg = RangeAgg("customer_segments", 
                                    field="total_spent", ranges=[
        {"key": "bronze", "to": 100},
        {"key": "silver", "from": 100, "to": 500},  
        {"key": "gold", "from": 500, "to": 1000},
        {"key": "platinum", "from": 1000}
    ])
    customer_segments_agg.add_aggregation(CardinalityAgg("segment_size", field="customer_id"))
    customer_segments_agg.add_aggregation(AvgAgg("avg_order_frequency", field="order_frequency"))
    
    search.add_aggregation(customer_segments_agg)
    
    return search

# Execute comprehensive analytics
analytics_search = build_ecommerce_analytics()
results = es.search(analytics_search, indices=["sales"])

# Process multi-level results  
for category in results.aggregations.category_performance.buckets:
    print(f"Category: {category.key}")
    print(f"Total Sales: ${category.total_sales.value:.2f}")
    print(f"Average Price: ${category.avg_price.value:.2f}")
    print(f"Unique Customers: {category.unique_customers.value}")
    
    # Monthly trends for this category
    print("Monthly trends:")
    for month in category.monthly_trends.buckets:
        print(f"  {month.key_as_string}: ${month.monthly_revenue.value:.2f}")
    
    # Price tier breakdown
    print("Price tier performance:")
    for tier in category.price_tiers.buckets:
        print(f"  {tier.key}: ${tier.tier_revenue.value:.2f}")

Blog Analytics Example

# Blog content analytics
def build_blog_analytics():
    """Build comprehensive blog analytics aggregation."""
    
    search = Search(MatchAllQuery())
    
    # Author performance
    authors_agg = TermsAgg("author_performance", 
                          field="author.keyword",
                          size=10,
                          order={"total_views": {"order": "desc"}})
    
    authors_agg.add_aggregation(SumAgg("total_views", field="view_count"))
    authors_agg.add_aggregation(AvgAgg("avg_views", field="view_count")) 
    authors_agg.add_aggregation(ValueCountAgg("post_count", field="_id"))
    authors_agg.add_aggregation(AvgAgg("avg_rating", field="rating"))
    
    # Tag distribution per author
    tags_agg = TermsAgg("top_tags", field="tags.keyword", size=5)
    authors_agg.add_aggregation(tags_agg)
    
    search.add_aggregation(authors_agg)
    
    # Content performance by publish time
    publish_trends_agg = DateHistogramAgg("publish_trends",
                                         field="published_date",
                                         interval="1w")
    publish_trends_agg.add_aggregation(AvgAgg("weekly_avg_views", field="view_count"))
    publish_trends_agg.add_aggregation(MaxAgg("weekly_max_views", field="view_count"))
    
    search.add_aggregation(publish_trends_agg)
    
    # Tag popularity over time
    tags_over_time_agg = TermsAgg("tag_trends", field="tags.keyword", size=20)
    monthly_tag_agg = DateHistogramAgg("monthly_usage", 
                                      field="published_date", 
                                      interval="1M")
    monthly_tag_agg.add_aggregation(SumAgg("tag_views", field="view_count"))
    tags_over_time_agg.add_aggregation(monthly_tag_agg)
    
    search.add_aggregation(tags_over_time_agg)
    
    return search

# Process blog analytics
blog_analytics = build_blog_analytics()
results = es.search(blog_analytics, indices=["blog"])

# Top performing authors
for author in results.aggregations.author_performance.buckets:
    print(f"Author: {author.key}")
    print(f"Posts: {author.post_count.value}")
    print(f"Total Views: {author.total_views.value}")
    print(f"Avg Views per Post: {author.avg_views.value:.1f}")
    print(f"Avg Rating: {author.avg_rating.value:.1f}")
    
    # Top tags for this author
    print("Top tags:")
    for tag in author.top_tags.buckets:
        print(f"  - {tag.key} ({tag.doc_count} posts)")

Performance Optimization

Aggregation Performance Tips

# Optimize aggregation performance
def optimize_aggregations():
    """Best practices for aggregation performance."""
    
    # 1. Use appropriate field types
    # - Use keyword fields for term aggregations
    # - Use numeric fields for range/histogram aggregations
    # - Use date fields for date histograms
    
    # 2. Limit aggregation scope with filters
    filtered_search = Search(MatchAllQuery()).filter(
        RangeFilter("published_date", gte="2023-01-01")  # Reduce dataset first
    )
    
    # 3. Use appropriate sizes for term aggregations
    categories_agg = TermsAgg("categories", 
                             field="category.keyword",
                             size=10,           # Don't over-fetch
                             shard_size=50)     # Control shard processing
    
    # 4. Use min_doc_count to reduce noise
    tags_agg = TermsAgg("popular_tags",
                       field="tags.keyword", 
                       min_doc_count=10)     # Skip rare terms
    
    # 5. Order aggregations efficiently
    ordered_agg = TermsAgg("top_categories",
                          field="category.keyword",
                          order={"avg_rating": {"order": "desc"}})
    ordered_agg.add_aggregation(AvgAgg("avg_rating", field="rating"))
    
    return filtered_search.add_aggregation(categories_agg)

# 6. Cache aggregation results in application when possible
import time
from functools import lru_cache

@lru_cache(maxsize=128)
def get_cached_analytics(cache_key, ttl_minutes=15):
    """Cache expensive aggregation results."""
    # In real implementation, check cache timestamp
    analytics_search = build_ecommerce_analytics()
    return es.search(analytics_search, indices=["sales"])

Both facets and aggregations provide powerful data analysis capabilities in PyES, with aggregations being the modern, more feature-rich approach for building comprehensive analytics and reporting systems.

Install with Tessl CLI

npx tessl i tessl/pypi-pyes

docs

bulk-operations.md

client.md

facets-aggregations.md

filters.md

index.md

mappings.md

query-dsl.md

rivers.md

tile.json