Python Elastic Search driver providing a pythonic interface for interacting with ElasticSearch clusters
npx @tessl/cli install tessl/pypi-pyes@0.99.0PyES is a comprehensive Python client library for ElasticSearch that provides a pythonic interface for interacting with ElasticSearch clusters. First released in 2010, it offers extensive functionality for indexing, searching, and managing ElasticSearch infrastructure with support for both Python 2 and Python 3.
Version: 0.99.6
License: BSD
Documentation: http://pyes.rtfd.org/
PyPI: https://pypi.org/project/pyes/
pip install pyes# Main client class
from pyes import ES
# Query DSL classes
from pyes import (
Query, Search, BoolQuery, MatchAllQuery, TermQuery, TermsQuery,
RangeQuery, FilteredQuery, QueryStringQuery, MatchQuery,
MultiMatchQuery, TextQuery, SimpleQueryStringQuery,
FuzzyQuery, FuzzyLikeThisQuery, MoreLikeThisQuery,
PrefixQuery, WildcardQuery, RegexTermQuery, IdsQuery,
ConstantScoreQuery, DisMaxQuery, BoostingQuery,
CustomScoreQuery, FunctionScoreQuery, HasChildQuery,
HasParentQuery, TopChildrenQuery, NestedQuery,
SpanTermQuery, SpanFirstQuery, SpanNearQuery,
SpanNotQuery, SpanOrQuery, SpanMultiQuery,
PercolatorQuery, RescoreQuery, Suggest
)
# Filter DSL classes
from pyes import (
Filter, FilterList, ANDFilter, ORFilter, BoolFilter, NotFilter,
TermFilter, TermsFilter, PrefixFilter, RegexTermFilter,
ExistsFilter, MissingFilter, RangeFilter, LimitFilter,
GeoDistanceFilter, GeoBoundingBoxFilter, GeoPolygonFilter,
GeoShapeFilter, GeoIndexedShapeFilter, HasChildFilter,
HasParentFilter, NestedFilter, TypeFilter, IdsFilter,
QueryFilter, ScriptFilter, MatchAllFilter, RawFilter
)
# Facet and Aggregation classes
from pyes import (
FacetFactory, TermFacet, DateHistogramFacet, HistogramFacet,
RangeFacet, GeoDistanceFacet, StatisticalFacet, TermStatsFacet,
QueryFacet, FilterFacet, AggFactory, Agg, BucketAgg,
TermsAgg, DateHistogramAgg, HistogramAgg, RangeAgg,
FilterAgg, FiltersAgg, NestedAgg, ReverseNestedAgg,
MissingAgg, StatsAgg, ValueCountAgg, SumAgg, AvgAgg,
MinAgg, MaxAgg, CardinalityAgg, TermStatsAgg
)
# Mapping classes
from pyes import (
Mapper, AbstractField, StringField, NumericFieldAbstract,
IntegerField, LongField, FloatField, DoubleField,
DateField, BooleanField, BinaryField, IpField,
ByteField, ShortField, GeoPointField, MultiField,
ObjectField, NestedObject, DocumentObjectField,
AttachmentField
)
# River classes
from pyes import (
River, RabbitMQRiver, TwitterRiver, CouchDBRiver,
JDBCRiver, MongoDBRiver
)
# Utility functions
from pyes import (
file_to_attachment, make_path, make_id, clean_string,
string_b64encode, string_b64decode, quote, ESRange,
ESRangeOp, TermsLookup
)
# Exception classes
from pyes import (
ElasticSearchException, QueryError, InvalidQuery,
InvalidParameterQuery, IndexAlreadyExistsException,
IndexMissingException, InvalidIndexNameException,
TypeMissingException, DocumentAlreadyExistsException,
DocumentMissingException, VersionConflictEngineException,
BulkOperationException, SearchPhaseExecutionException,
ReduceSearchPhaseException, ReplicationShardOperationFailedException,
ClusterBlockException, MapperParsingException, NoServerAvailable
)from pyes import ES, TermQuery, Search
# Create ES client connection
es = ES('localhost:9200')
# Index a document
doc = {
"title": "Python ElasticSearch Guide",
"content": "Comprehensive guide to using PyES library",
"tags": ["python", "elasticsearch", "search"],
"published": "2023-01-15",
"author": "John Doe"
}
es.index(doc, "blog", "post", id="1")
# Search for documents
query = Search(TermQuery("tags", "python"))
results = es.search(query, indices=["blog"])
# Process results
for hit in results:
print(f"Title: {hit.title}")
print(f"Score: {hit._meta.score}")PyES provides a layered architecture for ElasticSearch interaction:
ES class) - Connection management and high-level operationsThe main ES class provides comprehensive ElasticSearch client functionality:
# Initialize client with configuration
es = ES(
server="localhost:9200",
timeout=30.0,
bulk_size=400,
max_retries=3,
basic_auth=("username", "password")
)
# Document operations
doc_id = es.index(document, "index_name", "doc_type", id="optional_id")
document = es.get("index_name", "doc_type", "doc_id")
es.update("index_name", "doc_type", "doc_id", script="ctx._source.views += 1")
es.delete("index_name", "doc_type", "doc_id")
# Bulk operations for performance
es.index(doc1, "index", "type", bulk=True)
es.index(doc2, "index", "type", bulk=True)
es.flush_bulk() # Execute all buffered operationsBuild complex search queries with the comprehensive query DSL:
from pyes import Search, BoolQuery, TermQuery, RangeQuery, MatchQuery
# Complex boolean query
query = Search(
BoolQuery(
must=[MatchQuery("title", "python")],
should=[TermQuery("tags", "tutorial")],
must_not=[TermQuery("status", "draft")],
filter=RangeQuery("published", gte="2023-01-01")
)
).size(20).sort("published", order="desc")
results = es.search(query, indices=["blog"])→ Complete Query DSL Reference
Use filters for fast, non-scored filtering:
from pyes import BoolFilter, TermFilter, RangeFilter, GeoDistanceFilter
# Geographic and term filtering
filter = BoolFilter(
must=[
TermFilter("category", "restaurant"),
RangeFilter("rating", gte=4.0),
GeoDistanceFilter(
distance="5km",
location={"lat": 40.7128, "lon": -74.0060}
)
]
)
filtered_query = Search().filter(filter)→ Complete Filter DSL Reference
Analyze and summarize data with facets and aggregations:
from pyes import Search, TermsAgg, DateHistogramAgg, StatsAgg
# Multi-level aggregations
search = Search().add_aggregation(
TermsAgg("categories", field="category.keyword", size=10)
.add_aggregation(
DateHistogramAgg("monthly", field="published", interval="month")
)
).add_aggregation(
StatsAgg("price_stats", field="price")
)
results = es.search(search, indices=["products"])
categories = results.facets.categories
monthly_trend = results.facets.categories.monthly
price_stats = results.facets.price_stats→ Complete Facets & Aggregations Reference
Define and manage index schemas with typed field mappings:
from pyes import Mapper, StringField, IntegerField, DateField, GeoPointField
# Define document mapping
mapping = Mapper()
mapping.add_property("title", StringField(analyzer="standard"))
mapping.add_property("content", StringField(analyzer="english"))
mapping.add_property("views", IntegerField())
mapping.add_property("published", DateField())
mapping.add_property("location", GeoPointField())
# Apply mapping to index
es.indices.put_mapping("blog_post", mapping.as_dict(), indices=["blog"])Set up automated data ingestion from external sources:
from pyes import CouchDBRiver, TwitterRiver, JDBCRiver
# CouchDB replication river
couchdb_river = CouchDBRiver(
couchdb_db="mydb",
couchdb_host="localhost",
couchdb_port=5984,
es_index="replicated_data",
es_type="document"
)
es.create_river(couchdb_river, "couchdb_sync")
# Twitter streaming river
twitter_river = TwitterRiver(
oauth_token="token",
oauth_secret="secret",
consumer_key="key",
consumer_secret="secret",
filter_tracks=["python", "elasticsearch"]
)
es.create_river(twitter_river, "twitter_stream")Handle large-scale data operations efficiently:
# Configure bulk processing
es.bulk_size = 1000 # Process in batches of 1000
# Bulk indexing with automatic flushing
documents = [{"title": f"Doc {i}", "content": f"Content {i}"} for i in range(5000)]
for doc in documents:
es.index(doc, "bulk_index", "doc", bulk=True)
# Automatically flushes when bulk_size reached
# Manual bulk operations
es.force_bulk() # Force immediate processing
# Bulk deletion
es.delete("index", "type", "id1", bulk=True)
es.delete("index", "type", "id2", bulk=True)
es.flush_bulk()→ Complete Bulk Operations Reference
Store queries and match documents against them:
# Register percolator query
percolator_query = TermQuery("tags", "python")
es.create_percolator("blog", "python_posts", percolator_query)
# Test document against registered queries
doc = {"title": "Python Tutorial", "tags": ["python", "programming"]}
matches = es.percolate("blog", ["post"], doc)Find similar documents:
similar_docs = es.morelikethis(
"blog", "post", "doc_id_1",
fields=["title", "content"],
min_term_freq=1,
max_query_terms=12
)Provide search suggestions:
from pyes import Suggest
# Term suggestions
suggest = Suggest()
suggest.add_term("python programming", "title_suggest", "title")
suggestions = es.suggest_from_object(suggest, indices=["blog"])Search by geographic location:
from pyes import GeoDistanceFilter, Search
# Find restaurants within 2km
geo_query = Search().filter(
GeoDistanceFilter(
distance="2km",
location={"lat": 40.7128, "lon": -74.0060}
)
)
nearby_restaurants = es.search(geo_query, indices=["restaurants"])PyES supports multiple connection protocols and extensive configuration:
# HTTP connection (default)
es = ES(
server=["host1:9200", "host2:9200"], # Multiple hosts for failover
timeout=30.0,
max_retries=3,
retry_time=60,
basic_auth=("username", "password"),
cert_reqs='CERT_REQUIRED' # SSL certificate verification
)
# Thrift connection (optional)
from pyes import ES
es = ES(server="localhost:9500", connection_type="thrift")PyES provides comprehensive exception handling:
from pyes import (
ElasticSearchException, IndexMissingException,
DocumentMissingException, BulkOperationException
)
try:
result = es.get("missing_index", "doc_type", "doc_id")
except IndexMissingException:
print("Index does not exist")
except DocumentMissingException:
print("Document not found")
except ElasticSearchException as e:
print(f"ElasticSearch error: {e}")PyES maintains compatibility with ElasticSearch versions up to 2.x. For newer ElasticSearch versions (5.x+), consider migrating to the official elasticsearch-py client. PyES supports both Python 2 and Python 3.
This documentation provides comprehensive coverage of the PyES Python ElasticSearch driver. Each linked section contains detailed API references, examples, and usage patterns for building robust search-enabled applications.