0
# PyES Facets and Aggregations
1
2
## Overview
3
4
PyES provides comprehensive support for both legacy facets and modern aggregations for data analysis and summarization. Facets are the older ElasticSearch feature (deprecated in ES 2.x+) while aggregations are the modern replacement. Both provide powerful data analysis capabilities for building analytics dashboards, reporting, and data exploration features.
5
6
## Facets (Legacy)
7
8
### FacetFactory
9
10
```python { .api }
11
class FacetFactory:
12
"""
13
Factory class for creating and managing multiple facets.
14
15
Provides convenient methods to add various facet types to a search.
16
"""
17
18
def __init__(self):
19
"""Initialize FacetFactory."""
20
pass
21
22
def add_term_facet(self, name, field, size=10, **kwargs):
23
"""
24
Add term facet for value distribution analysis.
25
26
Args:
27
name (str): Facet name for results
28
field (str): Field to facet on
29
size (int): Maximum number of terms to return. Default: 10
30
**kwargs: Additional facet parameters
31
32
Returns:
33
FacetFactory: Self for method chaining
34
"""
35
pass
36
37
def add_date_facet(self, name, field, interval, **kwargs):
38
"""
39
Add date histogram facet for time-based analysis.
40
41
Args:
42
name (str): Facet name for results
43
field (str): Date field to facet on
44
interval (str): Date interval (year, month, day, hour, minute)
45
**kwargs: Additional facet parameters
46
47
Returns:
48
FacetFactory: Self for method chaining
49
"""
50
pass
51
52
def add_geo_facet(self, name, field, origin, ranges, **kwargs):
53
"""
54
Add geo distance facet for location-based analysis.
55
56
Args:
57
name (str): Facet name for results
58
field (str): Geo-point field
59
origin (dict): Origin point {"lat": lat, "lon": lon}
60
ranges (list): Distance ranges [{"to": "1km"}, {"from": "1km", "to": "5km"}]
61
**kwargs: Additional facet parameters
62
63
Returns:
64
FacetFactory: Self for method chaining
65
"""
66
pass
67
68
def add(self, facet):
69
"""
70
Add any facet object.
71
72
Args:
73
facet (Facet): Facet object to add
74
75
Returns:
76
FacetFactory: Self for method chaining
77
"""
78
pass
79
80
def reset(self):
81
"""Reset all facets."""
82
pass
83
84
# Basic facet factory usage
85
from pyes import Search, MatchAllQuery, FacetFactory
86
87
search = Search(MatchAllQuery())
88
facets = FacetFactory()
89
90
# Add multiple facets
91
facets.add_term_facet("categories", "category.keyword", size=20)
92
facets.add_term_facet("authors", "author.keyword", size=10)
93
facets.add_date_facet("monthly_posts", "published_date", "month")
94
95
# Apply facets to search
96
search.facet(facets)
97
results = es.search(search, indices=["blog"])
98
99
# Access facet results
100
category_counts = results.facets.categories.terms
101
monthly_counts = results.facets.monthly_posts.entries
102
```
103
104
### Term Facet
105
106
```python { .api }
107
class TermFacet:
108
"""
109
Facet for analyzing term/value distribution.
110
111
Provides counts of different values in a field.
112
"""
113
114
def __init__(self, field, size=10, order=None, exclude=None,
115
regex=None, script=None, **kwargs):
116
"""
117
Initialize TermFacet.
118
119
Args:
120
field (str): Field to facet on
121
size (int): Number of terms to return. Default: 10
122
order (str): Sort order (count, term, reverse_count, reverse_term)
123
exclude (list): Terms to exclude from results
124
regex (str): Regular expression to filter terms
125
script (str): Script for custom term calculation
126
**kwargs: Additional parameters
127
"""
128
pass
129
130
# Term facet analysis
131
from pyes import TermFacet
132
133
# Category distribution
134
category_facet = TermFacet("category.keyword", size=20, order="count")
135
136
# Author distribution excluding bots
137
author_facet = TermFacet("author.keyword", size=15, exclude=["bot_user", "system"])
138
139
# Tag distribution with regex filter
140
tag_facet = TermFacet("tags", regex="python.*", size=10)
141
```
142
143
### Date Histogram Facet
144
145
```python { .api }
146
class DateHistogramFacet:
147
"""
148
Facet for time-based histogram analysis.
149
150
Groups documents by date intervals (year, month, day, etc.).
151
"""
152
153
def __init__(self, field, interval, time_zone=None, pre_zone=None,
154
post_zone=None, pre_offset=None, post_offset=None,
155
factor=None, value_field=None, value_script=None, **kwargs):
156
"""
157
Initialize DateHistogramFacet.
158
159
Args:
160
field (str): Date field to histogram
161
interval (str): Time interval (year, month, week, day, hour, minute)
162
time_zone (str, optional): Time zone for date calculations
163
pre_zone (str, optional): Time zone before calculation
164
post_zone (str, optional): Time zone after calculation
165
pre_offset (str, optional): Offset before calculation
166
post_offset (str, optional): Offset after calculation
167
factor (float, optional): Multiply timestamps by factor
168
value_field (str, optional): Field to sum/count instead of doc count
169
value_script (str, optional): Script for value calculation
170
**kwargs: Additional parameters
171
"""
172
pass
173
174
# Date histogram analysis
175
from pyes import DateHistogramFacet
176
177
# Monthly publication trend
178
monthly_facet = DateHistogramFacet("published_date", "month", time_zone="-05:00")
179
180
# Daily view counts with value field
181
daily_views_facet = DateHistogramFacet("created_date", "day",
182
value_field="view_count")
183
184
# Hourly activity pattern
185
hourly_facet = DateHistogramFacet("timestamp", "hour")
186
```
187
188
### Statistical Facet
189
190
```python { .api }
191
class StatisticalFacet:
192
"""
193
Facet for statistical analysis of numeric fields.
194
195
Provides count, total, mean, min, max, variance, and std_deviation.
196
"""
197
198
def __init__(self, field=None, script=None, params=None, **kwargs):
199
"""
200
Initialize StatisticalFacet.
201
202
Args:
203
field (str, optional): Numeric field to analyze
204
script (str, optional): Script for custom value calculation
205
params (dict, optional): Script parameters
206
**kwargs: Additional parameters
207
"""
208
pass
209
210
# Statistical analysis
211
from pyes import StatisticalFacet
212
213
# View count statistics
214
view_stats_facet = StatisticalFacet("view_count")
215
216
# Price statistics with currency conversion
217
price_stats_facet = StatisticalFacet(
218
script="doc['price'].value * params.exchange_rate",
219
params={"exchange_rate": 1.1}
220
)
221
```
222
223
### Range Facet
224
225
```python { .api }
226
class RangeFacet:
227
"""
228
Facet for range-based distribution analysis.
229
230
Groups documents into predefined ranges.
231
"""
232
233
def __init__(self, field, ranges, **kwargs):
234
"""
235
Initialize RangeFacet.
236
237
Args:
238
field (str): Numeric field to range
239
ranges (list): List of range definitions
240
**kwargs: Additional parameters
241
"""
242
pass
243
244
# Range distribution analysis
245
from pyes import RangeFacet
246
247
# Price range distribution
248
price_ranges_facet = RangeFacet("price", [
249
{"to": 25},
250
{"from": 25, "to": 50},
251
{"from": 50, "to": 100},
252
{"from": 100}
253
])
254
255
# Age group distribution
256
age_facet = RangeFacet("age", [
257
{"to": 18},
258
{"from": 18, "to": 25},
259
{"from": 25, "to": 35},
260
{"from": 35, "to": 50},
261
{"from": 50}
262
])
263
```
264
265
## Aggregations (Modern)
266
267
### AggFactory
268
269
```python { .api }
270
class AggFactory:
271
"""
272
Factory class for creating and managing aggregations.
273
274
Modern replacement for FacetFactory with more powerful analysis capabilities.
275
"""
276
277
def __init__(self):
278
"""Initialize AggFactory."""
279
pass
280
281
def add(self, agg):
282
"""
283
Add aggregation to factory.
284
285
Args:
286
agg (Agg): Aggregation object to add
287
288
Returns:
289
AggFactory: Self for method chaining
290
"""
291
pass
292
293
def reset(self):
294
"""Reset all aggregations."""
295
pass
296
297
# Basic aggregation factory usage
298
from pyes import AggFactory, TermsAgg, DateHistogramAgg, StatsAgg
299
300
agg_factory = AggFactory()
301
agg_factory.add(TermsAgg("categories", field="category.keyword"))
302
agg_factory.add(StatsAgg("view_stats", field="view_count"))
303
304
search = Search(MatchAllQuery()).add_aggregation(agg_factory)
305
```
306
307
### Base Aggregation Classes
308
309
```python { .api }
310
class Agg:
311
"""
312
Base class for all aggregations.
313
"""
314
315
def __init__(self, name, **kwargs):
316
"""
317
Initialize base aggregation.
318
319
Args:
320
name (str): Aggregation name for results
321
**kwargs: Aggregation-specific parameters
322
"""
323
pass
324
325
def add_aggregation(self, agg):
326
"""
327
Add sub-aggregation.
328
329
Args:
330
agg (Agg): Sub-aggregation to nest
331
332
Returns:
333
Agg: Self for method chaining
334
"""
335
pass
336
337
class BucketAgg(Agg):
338
"""
339
Base class for bucket aggregations.
340
341
Bucket aggregations group documents into buckets and can contain sub-aggregations.
342
"""
343
pass
344
```
345
346
### Terms Aggregation
347
348
```python { .api }
349
class TermsAgg(BucketAgg):
350
"""
351
Modern replacement for TermFacet with additional capabilities.
352
353
Groups documents by distinct values in a field.
354
"""
355
356
def __init__(self, name, field=None, size=10, shard_size=None,
357
min_doc_count=1, include=None, exclude=None,
358
order=None, script=None, **kwargs):
359
"""
360
Initialize TermsAgg.
361
362
Args:
363
name (str): Aggregation name
364
field (str, optional): Field to aggregate on
365
size (int): Number of buckets to return. Default: 10
366
shard_size (int, optional): Number of terms each shard returns
367
min_doc_count (int): Minimum document count per bucket. Default: 1
368
include (str|list, optional): Terms to include (regex or list)
369
exclude (str|list, optional): Terms to exclude (regex or list)
370
order (dict, optional): Sort order specification
371
script (str, optional): Script for term calculation
372
**kwargs: Additional parameters
373
"""
374
pass
375
376
# Terms aggregation with sub-aggregations
377
from pyes import TermsAgg, AvgAgg, MaxAgg
378
379
# Category breakdown with average views per category
380
categories_agg = TermsAgg("categories", field="category.keyword", size=20)
381
categories_agg.add_aggregation(AvgAgg("avg_views", field="view_count"))
382
categories_agg.add_aggregation(MaxAgg("max_views", field="view_count"))
383
384
search = Search(MatchAllQuery()).add_aggregation(categories_agg)
385
results = es.search(search, indices=["blog"])
386
387
# Access nested results
388
for bucket in results.aggregations.categories.buckets:
389
print(f"Category: {bucket.key}")
390
print(f"Documents: {bucket.doc_count}")
391
print(f"Average views: {bucket.avg_views.value}")
392
print(f"Max views: {bucket.max_views.value}")
393
```
394
395
### Date Histogram Aggregation
396
397
```python { .api }
398
class DateHistogramAgg(BucketAgg):
399
"""
400
Modern replacement for DateHistogramFacet with enhanced features.
401
402
Creates time-based histograms with flexible intervals and time zones.
403
"""
404
405
def __init__(self, name, field=None, interval=None, format=None,
406
time_zone=None, offset=None, min_doc_count=0,
407
extended_bounds=None, script=None, **kwargs):
408
"""
409
Initialize DateHistogramAgg.
410
411
Args:
412
name (str): Aggregation name
413
field (str, optional): Date field to histogram
414
interval (str, optional): Time interval (1y, 1M, 1w, 1d, 1h, 1m, 1s)
415
format (str, optional): Date format for buckets
416
time_zone (str, optional): Time zone for calculations
417
offset (str, optional): Time offset for bucket boundaries
418
min_doc_count (int): Minimum docs per bucket. Default: 0
419
extended_bounds (dict, optional): Force histogram bounds
420
script (str, optional): Script for date calculation
421
**kwargs: Additional parameters
422
"""
423
pass
424
425
# Date histogram with time analysis
426
from pyes import DateHistogramAgg, SumAgg, CardinalityAgg
427
428
# Monthly trends with engagement metrics
429
monthly_agg = DateHistogramAgg("monthly_trends",
430
field="published_date",
431
interval="1M",
432
format="yyyy-MM",
433
time_zone="America/New_York")
434
435
# Add sub-aggregations for detailed analysis
436
monthly_agg.add_aggregation(SumAgg("total_views", field="view_count"))
437
monthly_agg.add_aggregation(CardinalityAgg("unique_authors", field="author.keyword"))
438
439
search = Search(MatchAllQuery()).add_aggregation(monthly_agg)
440
results = es.search(search, indices=["blog"])
441
442
# Time series analysis
443
for bucket in results.aggregations.monthly_trends.buckets:
444
print(f"Month: {bucket.key_as_string}")
445
print(f"Posts: {bucket.doc_count}")
446
print(f"Total views: {bucket.total_views.value}")
447
print(f"Unique authors: {bucket.unique_authors.value}")
448
```
449
450
### Range Aggregation
451
452
```python { .api }
453
class RangeAgg(BucketAgg):
454
"""
455
Creates buckets for different ranges of values.
456
"""
457
458
def __init__(self, name, field=None, ranges=None, script=None, **kwargs):
459
"""
460
Initialize RangeAgg.
461
462
Args:
463
name (str): Aggregation name
464
field (str, optional): Field to create ranges on
465
ranges (list): List of range definitions
466
script (str, optional): Script for value calculation
467
**kwargs: Additional parameters
468
"""
469
pass
470
471
# Range-based bucketing
472
from pyes import RangeAgg, AvgAgg
473
474
# Price tier analysis
475
price_tiers_agg = RangeAgg("price_tiers", field="price", ranges=[
476
{"key": "budget", "to": 25},
477
{"key": "mid-range", "from": 25, "to": 100},
478
{"key": "premium", "from": 100, "to": 500},
479
{"key": "luxury", "from": 500}
480
])
481
482
# Add average rating per price tier
483
price_tiers_agg.add_aggregation(AvgAgg("avg_rating", field="rating"))
484
485
search = Search(MatchAllQuery()).add_aggregation(price_tiers_agg)
486
```
487
488
### Histogram Aggregation
489
490
```python { .api }
491
class HistogramAgg(BucketAgg):
492
"""
493
Creates fixed-interval buckets for numeric values.
494
"""
495
496
def __init__(self, name, field=None, interval=None, min_doc_count=0,
497
extended_bounds=None, script=None, **kwargs):
498
"""
499
Initialize HistogramAgg.
500
501
Args:
502
name (str): Aggregation name
503
field (str, optional): Numeric field to histogram
504
interval (float): Fixed interval size
505
min_doc_count (int): Minimum docs per bucket. Default: 0
506
extended_bounds (dict, optional): Force histogram bounds
507
script (str, optional): Script for value calculation
508
**kwargs: Additional parameters
509
"""
510
pass
511
512
# Fixed interval histograms
513
from pyes import HistogramAgg
514
515
# Price distribution in $10 intervals
516
price_histogram_agg = HistogramAgg("price_distribution",
517
field="price",
518
interval=10,
519
extended_bounds={"min": 0, "max": 200})
520
521
# Rating distribution in 0.5 intervals
522
rating_histogram_agg = HistogramAgg("rating_distribution",
523
field="rating",
524
interval=0.5,
525
min_doc_count=1)
526
```
527
528
## Metric Aggregations
529
530
### Statistical Aggregations
531
532
```python { .api }
533
class StatsAgg(Agg):
534
"""
535
Calculates statistics (count, min, max, avg, sum) for numeric field.
536
"""
537
538
def __init__(self, name, field=None, script=None, **kwargs):
539
"""
540
Initialize StatsAgg.
541
542
Args:
543
name (str): Aggregation name
544
field (str, optional): Numeric field to analyze
545
script (str, optional): Script for value calculation
546
**kwargs: Additional parameters
547
"""
548
pass
549
550
class SumAgg(Agg):
551
"""Calculate sum of numeric field values."""
552
553
def __init__(self, name, field=None, script=None, **kwargs):
554
pass
555
556
class AvgAgg(Agg):
557
"""Calculate average of numeric field values."""
558
559
def __init__(self, name, field=None, script=None, **kwargs):
560
pass
561
562
class MinAgg(Agg):
563
"""Find minimum value in numeric field."""
564
565
def __init__(self, name, field=None, script=None, **kwargs):
566
pass
567
568
class MaxAgg(Agg):
569
"""Find maximum value in numeric field."""
570
571
def __init__(self, name, field=None, script=None, **kwargs):
572
pass
573
574
class ValueCountAgg(Agg):
575
"""Count non-null values in field."""
576
577
def __init__(self, name, field=None, script=None, **kwargs):
578
pass
579
580
class CardinalityAgg(Agg):
581
"""
582
Approximate count of unique values in field.
583
"""
584
585
def __init__(self, name, field=None, precision_threshold=3000, **kwargs):
586
"""
587
Initialize CardinalityAgg.
588
589
Args:
590
name (str): Aggregation name
591
field (str, optional): Field to count unique values
592
precision_threshold (int): Precision vs memory tradeoff. Default: 3000
593
**kwargs: Additional parameters
594
"""
595
pass
596
597
# Comprehensive metric analysis
598
from pyes import (StatsAgg, SumAgg, AvgAgg, MinAgg, MaxAgg,
599
ValueCountAgg, CardinalityAgg)
600
601
# Multiple metric aggregations
602
search = Search(MatchAllQuery())
603
604
# Statistical overview
605
search.add_aggregation(StatsAgg("view_stats", field="view_count"))
606
search.add_aggregation(StatsAgg("rating_stats", field="rating"))
607
608
# Individual metrics
609
search.add_aggregation(SumAgg("total_revenue", field="price"))
610
search.add_aggregation(AvgAgg("avg_response_time", field="response_ms"))
611
search.add_aggregation(CardinalityAgg("unique_visitors", field="user_id"))
612
search.add_aggregation(ValueCountAgg("posts_with_tags", field="tags"))
613
614
results = es.search(search, indices=["analytics"])
615
616
# Access metric results
617
print(f"Average views: {results.aggregations.view_stats.avg}")
618
print(f"Total revenue: {results.aggregations.total_revenue.value}")
619
print(f"Unique visitors: {results.aggregations.unique_visitors.value}")
620
```
621
622
## Advanced Aggregation Patterns
623
624
### Nested Aggregations
625
626
```python { .api }
627
class NestedAgg(BucketAgg):
628
"""
629
Aggregation on nested objects.
630
"""
631
632
def __init__(self, name, path, **kwargs):
633
"""
634
Initialize NestedAgg.
635
636
Args:
637
name (str): Aggregation name
638
path (str): Path to nested objects
639
**kwargs: Additional parameters
640
"""
641
pass
642
643
class ReverseNestedAgg(BucketAgg):
644
"""
645
Reverse nested aggregation to go back to parent documents.
646
"""
647
648
def __init__(self, name, path=None, **kwargs):
649
"""
650
Initialize ReverseNestedAgg.
651
652
Args:
653
name (str): Aggregation name
654
path (str, optional): Path to reverse to (root if None)
655
**kwargs: Additional parameters
656
"""
657
pass
658
659
# Nested object analysis
660
from pyes import NestedAgg, ReverseNestedAgg, TermsAgg
661
662
# Analyze product variants
663
variants_agg = NestedAgg("variants", path="variants")
664
665
# Color distribution within variants
666
color_agg = TermsAgg("colors", field="variants.color.keyword")
667
variants_agg.add_aggregation(color_agg)
668
669
# Back to parent for product categories
670
color_agg.add_aggregation(
671
ReverseNestedAgg("products").add_aggregation(
672
TermsAgg("categories", field="category.keyword")
673
)
674
)
675
676
search = Search(MatchAllQuery()).add_aggregation(variants_agg)
677
```
678
679
### Filter Aggregations
680
681
```python { .api }
682
class FilterAgg(BucketAgg):
683
"""
684
Single bucket aggregation that filters documents.
685
"""
686
687
def __init__(self, name, filter=None, **kwargs):
688
"""
689
Initialize FilterAgg.
690
691
Args:
692
name (str): Aggregation name
693
filter (Filter): Filter to apply
694
**kwargs: Additional parameters
695
"""
696
pass
697
698
class FiltersAgg(BucketAgg):
699
"""
700
Multiple bucket aggregation with different filters per bucket.
701
"""
702
703
def __init__(self, name, filters=None, **kwargs):
704
"""
705
Initialize FiltersAgg.
706
707
Args:
708
name (str): Aggregation name
709
filters (dict): Named filters for buckets
710
**kwargs: Additional parameters
711
"""
712
pass
713
714
# Filter-based bucketing
715
from pyes import FilterAgg, FiltersAgg, TermFilter, RangeFilter
716
717
# Single filter aggregation
718
high_rated_agg = FilterAgg("high_rated",
719
filter=RangeFilter("rating", gte=4.0))
720
high_rated_agg.add_aggregation(AvgAgg("avg_price", field="price"))
721
722
# Multiple filter aggregation
723
segments_agg = FiltersAgg("segments", filters={
724
"premium": RangeFilter("price", gte=100),
725
"popular": RangeFilter("view_count", gte=1000),
726
"recent": RangeFilter("created_date", gte="now-30d")
727
})
728
729
# Add metrics to each segment
730
for segment in ["premium", "popular", "recent"]:
731
segments_agg.add_aggregation(StatsAgg(f"{segment}_stats", field="rating"))
732
```
733
734
### Missing Values Aggregation
735
736
```python { .api }
737
class MissingAgg(BucketAgg):
738
"""
739
Single bucket for documents missing a field value.
740
"""
741
742
def __init__(self, name, field, **kwargs):
743
"""
744
Initialize MissingAgg.
745
746
Args:
747
name (str): Aggregation name
748
field (str): Field to check for missing values
749
**kwargs: Additional parameters
750
"""
751
pass
752
753
# Missing value analysis
754
from pyes import MissingAgg
755
756
# Documents without ratings
757
missing_rating_agg = MissingAgg("no_rating", field="rating")
758
missing_rating_agg.add_aggregation(TermsAgg("categories", field="category.keyword"))
759
760
# Documents without tags
761
missing_tags_agg = MissingAgg("no_tags", field="tags")
762
```
763
764
## Complex Multi-Level Aggregations
765
766
### E-commerce Analytics Example
767
768
```python { .api }
769
# Complex e-commerce analytics aggregation
770
from pyes import (Search, MatchAllQuery, TermsAgg, DateHistogramAgg,
771
RangeAgg, StatsAgg, SumAgg, AvgAgg, CardinalityAgg)
772
773
def build_ecommerce_analytics():
774
"""Build comprehensive e-commerce analytics aggregation."""
775
776
search = Search(MatchAllQuery())
777
778
# Category performance analysis
779
categories_agg = TermsAgg("category_performance",
780
field="category.keyword",
781
size=20)
782
783
# Sales metrics per category
784
categories_agg.add_aggregation(SumAgg("total_sales", field="sale_amount"))
785
categories_agg.add_aggregation(AvgAgg("avg_price", field="price"))
786
categories_agg.add_aggregation(CardinalityAgg("unique_customers", field="customer_id"))
787
788
# Monthly trends per category
789
monthly_agg = DateHistogramAgg("monthly_trends",
790
field="sale_date",
791
interval="1M")
792
monthly_agg.add_aggregation(SumAgg("monthly_revenue", field="sale_amount"))
793
categories_agg.add_aggregation(monthly_agg)
794
795
# Price tier analysis per category
796
price_tiers_agg = RangeAgg("price_tiers", field="price", ranges=[
797
{"key": "budget", "to": 50},
798
{"key": "mid", "from": 50, "to": 200},
799
{"key": "premium", "from": 200}
800
])
801
price_tiers_agg.add_aggregation(SumAgg("tier_revenue", field="sale_amount"))
802
categories_agg.add_aggregation(price_tiers_agg)
803
804
search.add_aggregation(categories_agg)
805
806
# Overall time trends
807
daily_trends_agg = DateHistogramAgg("daily_trends",
808
field="sale_date",
809
interval="1d",
810
min_doc_count=1)
811
daily_trends_agg.add_aggregation(SumAgg("daily_revenue", field="sale_amount"))
812
daily_trends_agg.add_aggregation(CardinalityAgg("daily_customers", field="customer_id"))
813
daily_trends_agg.add_aggregation(AvgAgg("avg_order_value", field="sale_amount"))
814
815
search.add_aggregation(daily_trends_agg)
816
817
# Customer segmentation
818
customer_segments_agg = RangeAgg("customer_segments",
819
field="total_spent", ranges=[
820
{"key": "bronze", "to": 100},
821
{"key": "silver", "from": 100, "to": 500},
822
{"key": "gold", "from": 500, "to": 1000},
823
{"key": "platinum", "from": 1000}
824
])
825
customer_segments_agg.add_aggregation(CardinalityAgg("segment_size", field="customer_id"))
826
customer_segments_agg.add_aggregation(AvgAgg("avg_order_frequency", field="order_frequency"))
827
828
search.add_aggregation(customer_segments_agg)
829
830
return search
831
832
# Execute comprehensive analytics
833
analytics_search = build_ecommerce_analytics()
834
results = es.search(analytics_search, indices=["sales"])
835
836
# Process multi-level results
837
for category in results.aggregations.category_performance.buckets:
838
print(f"Category: {category.key}")
839
print(f"Total Sales: ${category.total_sales.value:.2f}")
840
print(f"Average Price: ${category.avg_price.value:.2f}")
841
print(f"Unique Customers: {category.unique_customers.value}")
842
843
# Monthly trends for this category
844
print("Monthly trends:")
845
for month in category.monthly_trends.buckets:
846
print(f" {month.key_as_string}: ${month.monthly_revenue.value:.2f}")
847
848
# Price tier breakdown
849
print("Price tier performance:")
850
for tier in category.price_tiers.buckets:
851
print(f" {tier.key}: ${tier.tier_revenue.value:.2f}")
852
```
853
854
### Blog Analytics Example
855
856
```python { .api }
857
# Blog content analytics
858
def build_blog_analytics():
859
"""Build comprehensive blog analytics aggregation."""
860
861
search = Search(MatchAllQuery())
862
863
# Author performance
864
authors_agg = TermsAgg("author_performance",
865
field="author.keyword",
866
size=10,
867
order={"total_views": {"order": "desc"}})
868
869
authors_agg.add_aggregation(SumAgg("total_views", field="view_count"))
870
authors_agg.add_aggregation(AvgAgg("avg_views", field="view_count"))
871
authors_agg.add_aggregation(ValueCountAgg("post_count", field="_id"))
872
authors_agg.add_aggregation(AvgAgg("avg_rating", field="rating"))
873
874
# Tag distribution per author
875
tags_agg = TermsAgg("top_tags", field="tags.keyword", size=5)
876
authors_agg.add_aggregation(tags_agg)
877
878
search.add_aggregation(authors_agg)
879
880
# Content performance by publish time
881
publish_trends_agg = DateHistogramAgg("publish_trends",
882
field="published_date",
883
interval="1w")
884
publish_trends_agg.add_aggregation(AvgAgg("weekly_avg_views", field="view_count"))
885
publish_trends_agg.add_aggregation(MaxAgg("weekly_max_views", field="view_count"))
886
887
search.add_aggregation(publish_trends_agg)
888
889
# Tag popularity over time
890
tags_over_time_agg = TermsAgg("tag_trends", field="tags.keyword", size=20)
891
monthly_tag_agg = DateHistogramAgg("monthly_usage",
892
field="published_date",
893
interval="1M")
894
monthly_tag_agg.add_aggregation(SumAgg("tag_views", field="view_count"))
895
tags_over_time_agg.add_aggregation(monthly_tag_agg)
896
897
search.add_aggregation(tags_over_time_agg)
898
899
return search
900
901
# Process blog analytics
902
blog_analytics = build_blog_analytics()
903
results = es.search(blog_analytics, indices=["blog"])
904
905
# Top performing authors
906
for author in results.aggregations.author_performance.buckets:
907
print(f"Author: {author.key}")
908
print(f"Posts: {author.post_count.value}")
909
print(f"Total Views: {author.total_views.value}")
910
print(f"Avg Views per Post: {author.avg_views.value:.1f}")
911
print(f"Avg Rating: {author.avg_rating.value:.1f}")
912
913
# Top tags for this author
914
print("Top tags:")
915
for tag in author.top_tags.buckets:
916
print(f" - {tag.key} ({tag.doc_count} posts)")
917
```
918
919
## Performance Optimization
920
921
### Aggregation Performance Tips
922
923
```python { .api }
924
# Optimize aggregation performance
925
def optimize_aggregations():
926
"""Best practices for aggregation performance."""
927
928
# 1. Use appropriate field types
929
# - Use keyword fields for term aggregations
930
# - Use numeric fields for range/histogram aggregations
931
# - Use date fields for date histograms
932
933
# 2. Limit aggregation scope with filters
934
filtered_search = Search(MatchAllQuery()).filter(
935
RangeFilter("published_date", gte="2023-01-01") # Reduce dataset first
936
)
937
938
# 3. Use appropriate sizes for term aggregations
939
categories_agg = TermsAgg("categories",
940
field="category.keyword",
941
size=10, # Don't over-fetch
942
shard_size=50) # Control shard processing
943
944
# 4. Use min_doc_count to reduce noise
945
tags_agg = TermsAgg("popular_tags",
946
field="tags.keyword",
947
min_doc_count=10) # Skip rare terms
948
949
# 5. Order aggregations efficiently
950
ordered_agg = TermsAgg("top_categories",
951
field="category.keyword",
952
order={"avg_rating": {"order": "desc"}})
953
ordered_agg.add_aggregation(AvgAgg("avg_rating", field="rating"))
954
955
return filtered_search.add_aggregation(categories_agg)
956
957
# 6. Cache aggregation results in application when possible
958
import time
959
from functools import lru_cache
960
961
@lru_cache(maxsize=128)
962
def get_cached_analytics(cache_key, ttl_minutes=15):
963
"""Cache expensive aggregation results."""
964
# In real implementation, check cache timestamp
965
analytics_search = build_ecommerce_analytics()
966
return es.search(analytics_search, indices=["sales"])
967
```
968
969
Both facets and aggregations provide powerful data analysis capabilities in PyES, with aggregations being the modern, more feature-rich approach for building comprehensive analytics and reporting systems.