CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyes

Python Elastic Search driver providing a pythonic interface for interacting with ElasticSearch clusters

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

PyES - Python ElasticSearch Driver

Overview

PyES is a comprehensive Python client library for ElasticSearch that provides a pythonic interface for interacting with ElasticSearch clusters. First released in 2010, it offers extensive functionality for indexing, searching, and managing ElasticSearch infrastructure with support for both Python 2 and Python 3.

Version: 0.99.6
License: BSD
Documentation: http://pyes.rtfd.org/
PyPI: https://pypi.org/project/pyes/

Installation

pip install pyes

Core Imports

# Main client class
from pyes import ES

# Query DSL classes
from pyes import (
    Query, Search, BoolQuery, MatchAllQuery, TermQuery, TermsQuery,
    RangeQuery, FilteredQuery, QueryStringQuery, MatchQuery, 
    MultiMatchQuery, TextQuery, SimpleQueryStringQuery,
    FuzzyQuery, FuzzyLikeThisQuery, MoreLikeThisQuery,
    PrefixQuery, WildcardQuery, RegexTermQuery, IdsQuery,
    ConstantScoreQuery, DisMaxQuery, BoostingQuery,
    CustomScoreQuery, FunctionScoreQuery, HasChildQuery,
    HasParentQuery, TopChildrenQuery, NestedQuery,
    SpanTermQuery, SpanFirstQuery, SpanNearQuery,
    SpanNotQuery, SpanOrQuery, SpanMultiQuery,
    PercolatorQuery, RescoreQuery, Suggest
)

# Filter DSL classes  
from pyes import (
    Filter, FilterList, ANDFilter, ORFilter, BoolFilter, NotFilter,
    TermFilter, TermsFilter, PrefixFilter, RegexTermFilter,
    ExistsFilter, MissingFilter, RangeFilter, LimitFilter,
    GeoDistanceFilter, GeoBoundingBoxFilter, GeoPolygonFilter,
    GeoShapeFilter, GeoIndexedShapeFilter, HasChildFilter,
    HasParentFilter, NestedFilter, TypeFilter, IdsFilter,
    QueryFilter, ScriptFilter, MatchAllFilter, RawFilter
)

# Facet and Aggregation classes
from pyes import (
    FacetFactory, TermFacet, DateHistogramFacet, HistogramFacet,
    RangeFacet, GeoDistanceFacet, StatisticalFacet, TermStatsFacet,
    QueryFacet, FilterFacet, AggFactory, Agg, BucketAgg,
    TermsAgg, DateHistogramAgg, HistogramAgg, RangeAgg,
    FilterAgg, FiltersAgg, NestedAgg, ReverseNestedAgg,
    MissingAgg, StatsAgg, ValueCountAgg, SumAgg, AvgAgg,
    MinAgg, MaxAgg, CardinalityAgg, TermStatsAgg
)

# Mapping classes
from pyes import (
    Mapper, AbstractField, StringField, NumericFieldAbstract,
    IntegerField, LongField, FloatField, DoubleField,
    DateField, BooleanField, BinaryField, IpField,
    ByteField, ShortField, GeoPointField, MultiField,
    ObjectField, NestedObject, DocumentObjectField,
    AttachmentField
)

# River classes
from pyes import (
    River, RabbitMQRiver, TwitterRiver, CouchDBRiver,
    JDBCRiver, MongoDBRiver
)

# Utility functions
from pyes import (
    file_to_attachment, make_path, make_id, clean_string,
    string_b64encode, string_b64decode, quote, ESRange,
    ESRangeOp, TermsLookup
)

# Exception classes
from pyes import (
    ElasticSearchException, QueryError, InvalidQuery,
    InvalidParameterQuery, IndexAlreadyExistsException,
    IndexMissingException, InvalidIndexNameException,
    TypeMissingException, DocumentAlreadyExistsException,
    DocumentMissingException, VersionConflictEngineException,
    BulkOperationException, SearchPhaseExecutionException,
    ReduceSearchPhaseException, ReplicationShardOperationFailedException,
    ClusterBlockException, MapperParsingException, NoServerAvailable
)

Basic Usage Example

from pyes import ES, TermQuery, Search

# Create ES client connection
es = ES('localhost:9200')

# Index a document
doc = {
    "title": "Python ElasticSearch Guide", 
    "content": "Comprehensive guide to using PyES library",
    "tags": ["python", "elasticsearch", "search"],
    "published": "2023-01-15",
    "author": "John Doe"
}
es.index(doc, "blog", "post", id="1")

# Search for documents
query = Search(TermQuery("tags", "python"))
results = es.search(query, indices=["blog"])

# Process results
for hit in results:
    print(f"Title: {hit.title}")
    print(f"Score: {hit._meta.score}")

Architecture Overview

PyES provides a layered architecture for ElasticSearch interaction:

  1. Client Layer (ES class) - Connection management and high-level operations
  2. Query DSL - Pythonic query construction with full ElasticSearch query support
  3. Filter DSL - Filtering capabilities with logical and specialized filters
  4. Facets & Aggregations - Data analysis and summarization tools
  5. Mapping System - Schema definition and field type management
  6. River System - Data streaming from external sources
  7. Bulk Operations - High-performance batch processing
  8. Index Management - Index lifecycle and cluster administration

Core Capabilities

ES Client Operations

The main ES class provides comprehensive ElasticSearch client functionality:

# Initialize client with configuration
es = ES(
    server="localhost:9200",
    timeout=30.0,
    bulk_size=400,
    max_retries=3,
    basic_auth=("username", "password")
)

# Document operations
doc_id = es.index(document, "index_name", "doc_type", id="optional_id")
document = es.get("index_name", "doc_type", "doc_id")
es.update("index_name", "doc_type", "doc_id", script="ctx._source.views += 1")
es.delete("index_name", "doc_type", "doc_id")

# Bulk operations for performance
es.index(doc1, "index", "type", bulk=True)
es.index(doc2, "index", "type", bulk=True) 
es.flush_bulk()  # Execute all buffered operations

→ Full ES Client Reference

Query DSL Construction

Build complex search queries with the comprehensive query DSL:

from pyes import Search, BoolQuery, TermQuery, RangeQuery, MatchQuery

# Complex boolean query
query = Search(
    BoolQuery(
        must=[MatchQuery("title", "python")],
        should=[TermQuery("tags", "tutorial")],
        must_not=[TermQuery("status", "draft")],
        filter=RangeQuery("published", gte="2023-01-01")
    )
).size(20).sort("published", order="desc")

results = es.search(query, indices=["blog"])

→ Complete Query DSL Reference

Filter DSL for Performance

Use filters for fast, non-scored filtering:

from pyes import BoolFilter, TermFilter, RangeFilter, GeoDistanceFilter

# Geographic and term filtering
filter = BoolFilter(
    must=[
        TermFilter("category", "restaurant"),
        RangeFilter("rating", gte=4.0),
        GeoDistanceFilter(
            distance="5km",
            location={"lat": 40.7128, "lon": -74.0060}
        )
    ]
)

filtered_query = Search().filter(filter)

→ Complete Filter DSL Reference

Facets and Aggregations

Analyze and summarize data with facets and aggregations:

from pyes import Search, TermsAgg, DateHistogramAgg, StatsAgg

# Multi-level aggregations
search = Search().add_aggregation(
    TermsAgg("categories", field="category.keyword", size=10)
    .add_aggregation(
        DateHistogramAgg("monthly", field="published", interval="month")
    )
).add_aggregation(
    StatsAgg("price_stats", field="price")
)

results = es.search(search, indices=["products"])
categories = results.facets.categories
monthly_trend = results.facets.categories.monthly
price_stats = results.facets.price_stats

→ Complete Facets & Aggregations Reference

Index Mapping Management

Define and manage index schemas with typed field mappings:

from pyes import Mapper, StringField, IntegerField, DateField, GeoPointField

# Define document mapping
mapping = Mapper()
mapping.add_property("title", StringField(analyzer="standard"))
mapping.add_property("content", StringField(analyzer="english"))
mapping.add_property("views", IntegerField())
mapping.add_property("published", DateField())
mapping.add_property("location", GeoPointField())

# Apply mapping to index
es.indices.put_mapping("blog_post", mapping.as_dict(), indices=["blog"])

→ Complete Mappings Reference

Rivers for Data Streaming

Set up automated data ingestion from external sources:

from pyes import CouchDBRiver, TwitterRiver, JDBCRiver

# CouchDB replication river
couchdb_river = CouchDBRiver(
    couchdb_db="mydb",
    couchdb_host="localhost",
    couchdb_port=5984,
    es_index="replicated_data",
    es_type="document"
)
es.create_river(couchdb_river, "couchdb_sync")

# Twitter streaming river
twitter_river = TwitterRiver(
    oauth_token="token",
    oauth_secret="secret", 
    consumer_key="key",
    consumer_secret="secret",
    filter_tracks=["python", "elasticsearch"]
)
es.create_river(twitter_river, "twitter_stream")

→ Complete Rivers Reference

Bulk Operations for Performance

Handle large-scale data operations efficiently:

# Configure bulk processing
es.bulk_size = 1000  # Process in batches of 1000

# Bulk indexing with automatic flushing
documents = [{"title": f"Doc {i}", "content": f"Content {i}"} for i in range(5000)]

for doc in documents:
    es.index(doc, "bulk_index", "doc", bulk=True)
    # Automatically flushes when bulk_size reached

# Manual bulk operations
es.force_bulk()  # Force immediate processing

# Bulk deletion
es.delete("index", "type", "id1", bulk=True)
es.delete("index", "type", "id2", bulk=True)
es.flush_bulk()

→ Complete Bulk Operations Reference

Advanced Features

Percolator Queries

Store queries and match documents against them:

# Register percolator query
percolator_query = TermQuery("tags", "python")
es.create_percolator("blog", "python_posts", percolator_query)

# Test document against registered queries
doc = {"title": "Python Tutorial", "tags": ["python", "programming"]}
matches = es.percolate("blog", ["post"], doc)

More Like This

Find similar documents:

similar_docs = es.morelikethis(
    "blog", "post", "doc_id_1",
    fields=["title", "content"],
    min_term_freq=1,
    max_query_terms=12
)

Suggestions and Auto-complete

Provide search suggestions:

from pyes import Suggest

# Term suggestions
suggest = Suggest()
suggest.add_term("python programming", "title_suggest", "title")

suggestions = es.suggest_from_object(suggest, indices=["blog"])

Geospatial Search

Search by geographic location:

from pyes import GeoDistanceFilter, Search

# Find restaurants within 2km
geo_query = Search().filter(
    GeoDistanceFilter(
        distance="2km",
        location={"lat": 40.7128, "lon": -74.0060}
    )
)

nearby_restaurants = es.search(geo_query, indices=["restaurants"])

Connection and Configuration

PyES supports multiple connection protocols and extensive configuration:

# HTTP connection (default)
es = ES(
    server=["host1:9200", "host2:9200"],  # Multiple hosts for failover
    timeout=30.0,
    max_retries=3,
    retry_time=60,
    basic_auth=("username", "password"),
    cert_reqs='CERT_REQUIRED'  # SSL certificate verification
)

# Thrift connection (optional)
from pyes import ES
es = ES(server="localhost:9500", connection_type="thrift")

Error Handling

PyES provides comprehensive exception handling:

from pyes import (
    ElasticSearchException, IndexMissingException, 
    DocumentMissingException, BulkOperationException
)

try:
    result = es.get("missing_index", "doc_type", "doc_id")
except IndexMissingException:
    print("Index does not exist")
except DocumentMissingException:
    print("Document not found")
except ElasticSearchException as e:
    print(f"ElasticSearch error: {e}")

Performance Considerations

  • Use bulk operations for high-throughput indexing
  • Implement connection pooling for concurrent access
  • Use filters instead of queries when scoring is not needed
  • Configure appropriate bulk_size based on document size and memory
  • Use scan & scroll for large result sets
  • Implement proper error handling and retry logic

Migration and Compatibility

PyES maintains compatibility with ElasticSearch versions up to 2.x. For newer ElasticSearch versions (5.x+), consider migrating to the official elasticsearch-py client. PyES supports both Python 2 and Python 3.


This documentation provides comprehensive coverage of the PyES Python ElasticSearch driver. Each linked section contains detailed API references, examples, and usage patterns for building robust search-enabled applications.

docs

bulk-operations.md

client.md

facets-aggregations.md

filters.md

index.md

mappings.md

query-dsl.md

rivers.md

tile.json