Python Elastic Search driver providing a pythonic interface for interacting with ElasticSearch clusters
—
PyES provides comprehensive support for both legacy facets and modern aggregations for data analysis and summarization. Facets are the older ElasticSearch feature (deprecated in ES 2.x+) while aggregations are the modern replacement. Both provide powerful data analysis capabilities for building analytics dashboards, reporting, and data exploration features.
class FacetFactory:
"""
Factory class for creating and managing multiple facets.
Provides convenient methods to add various facet types to a search.
"""
def __init__(self):
"""Initialize FacetFactory."""
pass
def add_term_facet(self, name, field, size=10, **kwargs):
"""
Add term facet for value distribution analysis.
Args:
name (str): Facet name for results
field (str): Field to facet on
size (int): Maximum number of terms to return. Default: 10
**kwargs: Additional facet parameters
Returns:
FacetFactory: Self for method chaining
"""
pass
def add_date_facet(self, name, field, interval, **kwargs):
"""
Add date histogram facet for time-based analysis.
Args:
name (str): Facet name for results
field (str): Date field to facet on
interval (str): Date interval (year, month, day, hour, minute)
**kwargs: Additional facet parameters
Returns:
FacetFactory: Self for method chaining
"""
pass
def add_geo_facet(self, name, field, origin, ranges, **kwargs):
"""
Add geo distance facet for location-based analysis.
Args:
name (str): Facet name for results
field (str): Geo-point field
origin (dict): Origin point {"lat": lat, "lon": lon}
ranges (list): Distance ranges [{"to": "1km"}, {"from": "1km", "to": "5km"}]
**kwargs: Additional facet parameters
Returns:
FacetFactory: Self for method chaining
"""
pass
def add(self, facet):
"""
Add any facet object.
Args:
facet (Facet): Facet object to add
Returns:
FacetFactory: Self for method chaining
"""
pass
def reset(self):
"""Reset all facets."""
pass
# Basic facet factory usage
from pyes import Search, MatchAllQuery, FacetFactory
search = Search(MatchAllQuery())
facets = FacetFactory()
# Add multiple facets
facets.add_term_facet("categories", "category.keyword", size=20)
facets.add_term_facet("authors", "author.keyword", size=10)
facets.add_date_facet("monthly_posts", "published_date", "month")
# Apply facets to search
search.facet(facets)
results = es.search(search, indices=["blog"])
# Access facet results
category_counts = results.facets.categories.terms
monthly_counts = results.facets.monthly_posts.entriesclass TermFacet:
"""
Facet for analyzing term/value distribution.
Provides counts of different values in a field.
"""
def __init__(self, field, size=10, order=None, exclude=None,
regex=None, script=None, **kwargs):
"""
Initialize TermFacet.
Args:
field (str): Field to facet on
size (int): Number of terms to return. Default: 10
order (str): Sort order (count, term, reverse_count, reverse_term)
exclude (list): Terms to exclude from results
regex (str): Regular expression to filter terms
script (str): Script for custom term calculation
**kwargs: Additional parameters
"""
pass
# Term facet analysis
from pyes import TermFacet
# Category distribution
category_facet = TermFacet("category.keyword", size=20, order="count")
# Author distribution excluding bots
author_facet = TermFacet("author.keyword", size=15, exclude=["bot_user", "system"])
# Tag distribution with regex filter
tag_facet = TermFacet("tags", regex="python.*", size=10)class DateHistogramFacet:
"""
Facet for time-based histogram analysis.
Groups documents by date intervals (year, month, day, etc.).
"""
def __init__(self, field, interval, time_zone=None, pre_zone=None,
post_zone=None, pre_offset=None, post_offset=None,
factor=None, value_field=None, value_script=None, **kwargs):
"""
Initialize DateHistogramFacet.
Args:
field (str): Date field to histogram
interval (str): Time interval (year, month, week, day, hour, minute)
time_zone (str, optional): Time zone for date calculations
pre_zone (str, optional): Time zone before calculation
post_zone (str, optional): Time zone after calculation
pre_offset (str, optional): Offset before calculation
post_offset (str, optional): Offset after calculation
factor (float, optional): Multiply timestamps by factor
value_field (str, optional): Field to sum/count instead of doc count
value_script (str, optional): Script for value calculation
**kwargs: Additional parameters
"""
pass
# Date histogram analysis
from pyes import DateHistogramFacet
# Monthly publication trend
monthly_facet = DateHistogramFacet("published_date", "month", time_zone="-05:00")
# Daily view counts with value field
daily_views_facet = DateHistogramFacet("created_date", "day",
value_field="view_count")
# Hourly activity pattern
hourly_facet = DateHistogramFacet("timestamp", "hour")class StatisticalFacet:
"""
Facet for statistical analysis of numeric fields.
Provides count, total, mean, min, max, variance, and std_deviation.
"""
def __init__(self, field=None, script=None, params=None, **kwargs):
"""
Initialize StatisticalFacet.
Args:
field (str, optional): Numeric field to analyze
script (str, optional): Script for custom value calculation
params (dict, optional): Script parameters
**kwargs: Additional parameters
"""
pass
# Statistical analysis
from pyes import StatisticalFacet
# View count statistics
view_stats_facet = StatisticalFacet("view_count")
# Price statistics with currency conversion
price_stats_facet = StatisticalFacet(
script="doc['price'].value * params.exchange_rate",
params={"exchange_rate": 1.1}
)class RangeFacet:
"""
Facet for range-based distribution analysis.
Groups documents into predefined ranges.
"""
def __init__(self, field, ranges, **kwargs):
"""
Initialize RangeFacet.
Args:
field (str): Numeric field to range
ranges (list): List of range definitions
**kwargs: Additional parameters
"""
pass
# Range distribution analysis
from pyes import RangeFacet
# Price range distribution
price_ranges_facet = RangeFacet("price", [
{"to": 25},
{"from": 25, "to": 50},
{"from": 50, "to": 100},
{"from": 100}
])
# Age group distribution
age_facet = RangeFacet("age", [
{"to": 18},
{"from": 18, "to": 25},
{"from": 25, "to": 35},
{"from": 35, "to": 50},
{"from": 50}
])class AggFactory:
"""
Factory class for creating and managing aggregations.
Modern replacement for FacetFactory with more powerful analysis capabilities.
"""
def __init__(self):
"""Initialize AggFactory."""
pass
def add(self, agg):
"""
Add aggregation to factory.
Args:
agg (Agg): Aggregation object to add
Returns:
AggFactory: Self for method chaining
"""
pass
def reset(self):
"""Reset all aggregations."""
pass
# Basic aggregation factory usage
from pyes import AggFactory, TermsAgg, DateHistogramAgg, StatsAgg
agg_factory = AggFactory()
agg_factory.add(TermsAgg("categories", field="category.keyword"))
agg_factory.add(StatsAgg("view_stats", field="view_count"))
search = Search(MatchAllQuery()).add_aggregation(agg_factory)class Agg:
"""
Base class for all aggregations.
"""
def __init__(self, name, **kwargs):
"""
Initialize base aggregation.
Args:
name (str): Aggregation name for results
**kwargs: Aggregation-specific parameters
"""
pass
def add_aggregation(self, agg):
"""
Add sub-aggregation.
Args:
agg (Agg): Sub-aggregation to nest
Returns:
Agg: Self for method chaining
"""
pass
class BucketAgg(Agg):
"""
Base class for bucket aggregations.
Bucket aggregations group documents into buckets and can contain sub-aggregations.
"""
passclass TermsAgg(BucketAgg):
"""
Modern replacement for TermFacet with additional capabilities.
Groups documents by distinct values in a field.
"""
def __init__(self, name, field=None, size=10, shard_size=None,
min_doc_count=1, include=None, exclude=None,
order=None, script=None, **kwargs):
"""
Initialize TermsAgg.
Args:
name (str): Aggregation name
field (str, optional): Field to aggregate on
size (int): Number of buckets to return. Default: 10
shard_size (int, optional): Number of terms each shard returns
min_doc_count (int): Minimum document count per bucket. Default: 1
include (str|list, optional): Terms to include (regex or list)
exclude (str|list, optional): Terms to exclude (regex or list)
order (dict, optional): Sort order specification
script (str, optional): Script for term calculation
**kwargs: Additional parameters
"""
pass
# Terms aggregation with sub-aggregations
from pyes import TermsAgg, AvgAgg, MaxAgg
# Category breakdown with average views per category
categories_agg = TermsAgg("categories", field="category.keyword", size=20)
categories_agg.add_aggregation(AvgAgg("avg_views", field="view_count"))
categories_agg.add_aggregation(MaxAgg("max_views", field="view_count"))
search = Search(MatchAllQuery()).add_aggregation(categories_agg)
results = es.search(search, indices=["blog"])
# Access nested results
for bucket in results.aggregations.categories.buckets:
print(f"Category: {bucket.key}")
print(f"Documents: {bucket.doc_count}")
print(f"Average views: {bucket.avg_views.value}")
print(f"Max views: {bucket.max_views.value}")class DateHistogramAgg(BucketAgg):
"""
Modern replacement for DateHistogramFacet with enhanced features.
Creates time-based histograms with flexible intervals and time zones.
"""
def __init__(self, name, field=None, interval=None, format=None,
time_zone=None, offset=None, min_doc_count=0,
extended_bounds=None, script=None, **kwargs):
"""
Initialize DateHistogramAgg.
Args:
name (str): Aggregation name
field (str, optional): Date field to histogram
interval (str, optional): Time interval (1y, 1M, 1w, 1d, 1h, 1m, 1s)
format (str, optional): Date format for buckets
time_zone (str, optional): Time zone for calculations
offset (str, optional): Time offset for bucket boundaries
min_doc_count (int): Minimum docs per bucket. Default: 0
extended_bounds (dict, optional): Force histogram bounds
script (str, optional): Script for date calculation
**kwargs: Additional parameters
"""
pass
# Date histogram with time analysis
from pyes import DateHistogramAgg, SumAgg, CardinalityAgg
# Monthly trends with engagement metrics
monthly_agg = DateHistogramAgg("monthly_trends",
field="published_date",
interval="1M",
format="yyyy-MM",
time_zone="America/New_York")
# Add sub-aggregations for detailed analysis
monthly_agg.add_aggregation(SumAgg("total_views", field="view_count"))
monthly_agg.add_aggregation(CardinalityAgg("unique_authors", field="author.keyword"))
search = Search(MatchAllQuery()).add_aggregation(monthly_agg)
results = es.search(search, indices=["blog"])
# Time series analysis
for bucket in results.aggregations.monthly_trends.buckets:
print(f"Month: {bucket.key_as_string}")
print(f"Posts: {bucket.doc_count}")
print(f"Total views: {bucket.total_views.value}")
print(f"Unique authors: {bucket.unique_authors.value}")class RangeAgg(BucketAgg):
"""
Creates buckets for different ranges of values.
"""
def __init__(self, name, field=None, ranges=None, script=None, **kwargs):
"""
Initialize RangeAgg.
Args:
name (str): Aggregation name
field (str, optional): Field to create ranges on
ranges (list): List of range definitions
script (str, optional): Script for value calculation
**kwargs: Additional parameters
"""
pass
# Range-based bucketing
from pyes import RangeAgg, AvgAgg
# Price tier analysis
price_tiers_agg = RangeAgg("price_tiers", field="price", ranges=[
{"key": "budget", "to": 25},
{"key": "mid-range", "from": 25, "to": 100},
{"key": "premium", "from": 100, "to": 500},
{"key": "luxury", "from": 500}
])
# Add average rating per price tier
price_tiers_agg.add_aggregation(AvgAgg("avg_rating", field="rating"))
search = Search(MatchAllQuery()).add_aggregation(price_tiers_agg)class HistogramAgg(BucketAgg):
"""
Creates fixed-interval buckets for numeric values.
"""
def __init__(self, name, field=None, interval=None, min_doc_count=0,
extended_bounds=None, script=None, **kwargs):
"""
Initialize HistogramAgg.
Args:
name (str): Aggregation name
field (str, optional): Numeric field to histogram
interval (float): Fixed interval size
min_doc_count (int): Minimum docs per bucket. Default: 0
extended_bounds (dict, optional): Force histogram bounds
script (str, optional): Script for value calculation
**kwargs: Additional parameters
"""
pass
# Fixed interval histograms
from pyes import HistogramAgg
# Price distribution in $10 intervals
price_histogram_agg = HistogramAgg("price_distribution",
field="price",
interval=10,
extended_bounds={"min": 0, "max": 200})
# Rating distribution in 0.5 intervals
rating_histogram_agg = HistogramAgg("rating_distribution",
field="rating",
interval=0.5,
min_doc_count=1)class StatsAgg(Agg):
"""
Calculates statistics (count, min, max, avg, sum) for numeric field.
"""
def __init__(self, name, field=None, script=None, **kwargs):
"""
Initialize StatsAgg.
Args:
name (str): Aggregation name
field (str, optional): Numeric field to analyze
script (str, optional): Script for value calculation
**kwargs: Additional parameters
"""
pass
class SumAgg(Agg):
"""Calculate sum of numeric field values."""
def __init__(self, name, field=None, script=None, **kwargs):
pass
class AvgAgg(Agg):
"""Calculate average of numeric field values."""
def __init__(self, name, field=None, script=None, **kwargs):
pass
class MinAgg(Agg):
"""Find minimum value in numeric field."""
def __init__(self, name, field=None, script=None, **kwargs):
pass
class MaxAgg(Agg):
"""Find maximum value in numeric field."""
def __init__(self, name, field=None, script=None, **kwargs):
pass
class ValueCountAgg(Agg):
"""Count non-null values in field."""
def __init__(self, name, field=None, script=None, **kwargs):
pass
class CardinalityAgg(Agg):
"""
Approximate count of unique values in field.
"""
def __init__(self, name, field=None, precision_threshold=3000, **kwargs):
"""
Initialize CardinalityAgg.
Args:
name (str): Aggregation name
field (str, optional): Field to count unique values
precision_threshold (int): Precision vs memory tradeoff. Default: 3000
**kwargs: Additional parameters
"""
pass
# Comprehensive metric analysis
from pyes import (StatsAgg, SumAgg, AvgAgg, MinAgg, MaxAgg,
ValueCountAgg, CardinalityAgg)
# Multiple metric aggregations
search = Search(MatchAllQuery())
# Statistical overview
search.add_aggregation(StatsAgg("view_stats", field="view_count"))
search.add_aggregation(StatsAgg("rating_stats", field="rating"))
# Individual metrics
search.add_aggregation(SumAgg("total_revenue", field="price"))
search.add_aggregation(AvgAgg("avg_response_time", field="response_ms"))
search.add_aggregation(CardinalityAgg("unique_visitors", field="user_id"))
search.add_aggregation(ValueCountAgg("posts_with_tags", field="tags"))
results = es.search(search, indices=["analytics"])
# Access metric results
print(f"Average views: {results.aggregations.view_stats.avg}")
print(f"Total revenue: {results.aggregations.total_revenue.value}")
print(f"Unique visitors: {results.aggregations.unique_visitors.value}")class NestedAgg(BucketAgg):
"""
Aggregation on nested objects.
"""
def __init__(self, name, path, **kwargs):
"""
Initialize NestedAgg.
Args:
name (str): Aggregation name
path (str): Path to nested objects
**kwargs: Additional parameters
"""
pass
class ReverseNestedAgg(BucketAgg):
"""
Reverse nested aggregation to go back to parent documents.
"""
def __init__(self, name, path=None, **kwargs):
"""
Initialize ReverseNestedAgg.
Args:
name (str): Aggregation name
path (str, optional): Path to reverse to (root if None)
**kwargs: Additional parameters
"""
pass
# Nested object analysis
from pyes import NestedAgg, ReverseNestedAgg, TermsAgg
# Analyze product variants
variants_agg = NestedAgg("variants", path="variants")
# Color distribution within variants
color_agg = TermsAgg("colors", field="variants.color.keyword")
variants_agg.add_aggregation(color_agg)
# Back to parent for product categories
color_agg.add_aggregation(
ReverseNestedAgg("products").add_aggregation(
TermsAgg("categories", field="category.keyword")
)
)
search = Search(MatchAllQuery()).add_aggregation(variants_agg)class FilterAgg(BucketAgg):
"""
Single bucket aggregation that filters documents.
"""
def __init__(self, name, filter=None, **kwargs):
"""
Initialize FilterAgg.
Args:
name (str): Aggregation name
filter (Filter): Filter to apply
**kwargs: Additional parameters
"""
pass
class FiltersAgg(BucketAgg):
"""
Multiple bucket aggregation with different filters per bucket.
"""
def __init__(self, name, filters=None, **kwargs):
"""
Initialize FiltersAgg.
Args:
name (str): Aggregation name
filters (dict): Named filters for buckets
**kwargs: Additional parameters
"""
pass
# Filter-based bucketing
from pyes import FilterAgg, FiltersAgg, TermFilter, RangeFilter
# Single filter aggregation
high_rated_agg = FilterAgg("high_rated",
filter=RangeFilter("rating", gte=4.0))
high_rated_agg.add_aggregation(AvgAgg("avg_price", field="price"))
# Multiple filter aggregation
segments_agg = FiltersAgg("segments", filters={
"premium": RangeFilter("price", gte=100),
"popular": RangeFilter("view_count", gte=1000),
"recent": RangeFilter("created_date", gte="now-30d")
})
# Add metrics to each segment
for segment in ["premium", "popular", "recent"]:
segments_agg.add_aggregation(StatsAgg(f"{segment}_stats", field="rating"))class MissingAgg(BucketAgg):
"""
Single bucket for documents missing a field value.
"""
def __init__(self, name, field, **kwargs):
"""
Initialize MissingAgg.
Args:
name (str): Aggregation name
field (str): Field to check for missing values
**kwargs: Additional parameters
"""
pass
# Missing value analysis
from pyes import MissingAgg
# Documents without ratings
missing_rating_agg = MissingAgg("no_rating", field="rating")
missing_rating_agg.add_aggregation(TermsAgg("categories", field="category.keyword"))
# Documents without tags
missing_tags_agg = MissingAgg("no_tags", field="tags")# Complex e-commerce analytics aggregation
from pyes import (Search, MatchAllQuery, TermsAgg, DateHistogramAgg,
RangeAgg, StatsAgg, SumAgg, AvgAgg, CardinalityAgg)
def build_ecommerce_analytics():
"""Build comprehensive e-commerce analytics aggregation."""
search = Search(MatchAllQuery())
# Category performance analysis
categories_agg = TermsAgg("category_performance",
field="category.keyword",
size=20)
# Sales metrics per category
categories_agg.add_aggregation(SumAgg("total_sales", field="sale_amount"))
categories_agg.add_aggregation(AvgAgg("avg_price", field="price"))
categories_agg.add_aggregation(CardinalityAgg("unique_customers", field="customer_id"))
# Monthly trends per category
monthly_agg = DateHistogramAgg("monthly_trends",
field="sale_date",
interval="1M")
monthly_agg.add_aggregation(SumAgg("monthly_revenue", field="sale_amount"))
categories_agg.add_aggregation(monthly_agg)
# Price tier analysis per category
price_tiers_agg = RangeAgg("price_tiers", field="price", ranges=[
{"key": "budget", "to": 50},
{"key": "mid", "from": 50, "to": 200},
{"key": "premium", "from": 200}
])
price_tiers_agg.add_aggregation(SumAgg("tier_revenue", field="sale_amount"))
categories_agg.add_aggregation(price_tiers_agg)
search.add_aggregation(categories_agg)
# Overall time trends
daily_trends_agg = DateHistogramAgg("daily_trends",
field="sale_date",
interval="1d",
min_doc_count=1)
daily_trends_agg.add_aggregation(SumAgg("daily_revenue", field="sale_amount"))
daily_trends_agg.add_aggregation(CardinalityAgg("daily_customers", field="customer_id"))
daily_trends_agg.add_aggregation(AvgAgg("avg_order_value", field="sale_amount"))
search.add_aggregation(daily_trends_agg)
# Customer segmentation
customer_segments_agg = RangeAgg("customer_segments",
field="total_spent", ranges=[
{"key": "bronze", "to": 100},
{"key": "silver", "from": 100, "to": 500},
{"key": "gold", "from": 500, "to": 1000},
{"key": "platinum", "from": 1000}
])
customer_segments_agg.add_aggregation(CardinalityAgg("segment_size", field="customer_id"))
customer_segments_agg.add_aggregation(AvgAgg("avg_order_frequency", field="order_frequency"))
search.add_aggregation(customer_segments_agg)
return search
# Execute comprehensive analytics
analytics_search = build_ecommerce_analytics()
results = es.search(analytics_search, indices=["sales"])
# Process multi-level results
for category in results.aggregations.category_performance.buckets:
print(f"Category: {category.key}")
print(f"Total Sales: ${category.total_sales.value:.2f}")
print(f"Average Price: ${category.avg_price.value:.2f}")
print(f"Unique Customers: {category.unique_customers.value}")
# Monthly trends for this category
print("Monthly trends:")
for month in category.monthly_trends.buckets:
print(f" {month.key_as_string}: ${month.monthly_revenue.value:.2f}")
# Price tier breakdown
print("Price tier performance:")
for tier in category.price_tiers.buckets:
print(f" {tier.key}: ${tier.tier_revenue.value:.2f}")# Blog content analytics
def build_blog_analytics():
"""Build comprehensive blog analytics aggregation."""
search = Search(MatchAllQuery())
# Author performance
authors_agg = TermsAgg("author_performance",
field="author.keyword",
size=10,
order={"total_views": {"order": "desc"}})
authors_agg.add_aggregation(SumAgg("total_views", field="view_count"))
authors_agg.add_aggregation(AvgAgg("avg_views", field="view_count"))
authors_agg.add_aggregation(ValueCountAgg("post_count", field="_id"))
authors_agg.add_aggregation(AvgAgg("avg_rating", field="rating"))
# Tag distribution per author
tags_agg = TermsAgg("top_tags", field="tags.keyword", size=5)
authors_agg.add_aggregation(tags_agg)
search.add_aggregation(authors_agg)
# Content performance by publish time
publish_trends_agg = DateHistogramAgg("publish_trends",
field="published_date",
interval="1w")
publish_trends_agg.add_aggregation(AvgAgg("weekly_avg_views", field="view_count"))
publish_trends_agg.add_aggregation(MaxAgg("weekly_max_views", field="view_count"))
search.add_aggregation(publish_trends_agg)
# Tag popularity over time
tags_over_time_agg = TermsAgg("tag_trends", field="tags.keyword", size=20)
monthly_tag_agg = DateHistogramAgg("monthly_usage",
field="published_date",
interval="1M")
monthly_tag_agg.add_aggregation(SumAgg("tag_views", field="view_count"))
tags_over_time_agg.add_aggregation(monthly_tag_agg)
search.add_aggregation(tags_over_time_agg)
return search
# Process blog analytics
blog_analytics = build_blog_analytics()
results = es.search(blog_analytics, indices=["blog"])
# Top performing authors
for author in results.aggregations.author_performance.buckets:
print(f"Author: {author.key}")
print(f"Posts: {author.post_count.value}")
print(f"Total Views: {author.total_views.value}")
print(f"Avg Views per Post: {author.avg_views.value:.1f}")
print(f"Avg Rating: {author.avg_rating.value:.1f}")
# Top tags for this author
print("Top tags:")
for tag in author.top_tags.buckets:
print(f" - {tag.key} ({tag.doc_count} posts)")# Optimize aggregation performance
def optimize_aggregations():
"""Best practices for aggregation performance."""
# 1. Use appropriate field types
# - Use keyword fields for term aggregations
# - Use numeric fields for range/histogram aggregations
# - Use date fields for date histograms
# 2. Limit aggregation scope with filters
filtered_search = Search(MatchAllQuery()).filter(
RangeFilter("published_date", gte="2023-01-01") # Reduce dataset first
)
# 3. Use appropriate sizes for term aggregations
categories_agg = TermsAgg("categories",
field="category.keyword",
size=10, # Don't over-fetch
shard_size=50) # Control shard processing
# 4. Use min_doc_count to reduce noise
tags_agg = TermsAgg("popular_tags",
field="tags.keyword",
min_doc_count=10) # Skip rare terms
# 5. Order aggregations efficiently
ordered_agg = TermsAgg("top_categories",
field="category.keyword",
order={"avg_rating": {"order": "desc"}})
ordered_agg.add_aggregation(AvgAgg("avg_rating", field="rating"))
return filtered_search.add_aggregation(categories_agg)
# 6. Cache aggregation results in application when possible
import time
from functools import lru_cache
@lru_cache(maxsize=128)
def get_cached_analytics(cache_key, ttl_minutes=15):
"""Cache expensive aggregation results."""
# In real implementation, check cache timestamp
analytics_search = build_ecommerce_analytics()
return es.search(analytics_search, indices=["sales"])Both facets and aggregations provide powerful data analysis capabilities in PyES, with aggregations being the modern, more feature-rich approach for building comprehensive analytics and reporting systems.
Install with Tessl CLI
npx tessl i tessl/pypi-pyes