CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyes

Python Elastic Search driver providing a pythonic interface for interacting with ElasticSearch clusters

Pending
Overview
Eval results
Files

rivers.mddocs/

PyES Rivers for Data Streaming

Overview

Rivers in PyES provide automated data ingestion from external sources into ElasticSearch. Rivers are long-running processes that continuously pull data from external systems and index it into ElasticSearch, enabling real-time or near-real-time data synchronization. While rivers are deprecated in newer ElasticSearch versions in favor of Beats and Logstash, PyES still provides comprehensive river support for legacy systems.

Note: Rivers were deprecated in ElasticSearch 2.0+. For modern ElasticSearch versions, consider using Beats, Logstash, or custom indexing solutions.

Base River Class

River

class River:
    """
    Base class for all ElasticSearch rivers.
    
    Rivers provide continuous data ingestion from external sources
    into ElasticSearch indices.
    """
    
    def __init__(self, index_name=None, type_name=None, **kwargs):
        """
        Initialize base river.
        
        Args:
            index_name (str, optional): Target index name
            type_name (str, optional): Target document type
            **kwargs: River-specific configuration parameters
        """
        pass
    
    def serialize(self):
        """
        Serialize river configuration to ElasticSearch format.
        
        Returns:
            dict: River configuration dictionary
        """
        pass

# Basic river usage
from pyes import ES

es = ES('localhost:9200')

# Create and start a river
river_config = create_river_configuration()
es.create_river(river_config, "my_data_river")

# Monitor river status
river_status = es.cluster.state(filter_nodes=True)

# Delete river when no longer needed
es.delete_river(river_config, "my_data_river")

Database Rivers

JDBC River

class JDBCRiver(River):
    """
    JDBC river for importing data from relational databases.
    
    Connects to SQL databases and continuously imports data based
    on SQL queries and update detection strategies.
    """
    
    def __init__(self, driver=None, url=None, user=None, password=None,
                 sql=None, index=None, type=None, bulk_size=100, 
                 bulk_timeout="60s", max_bulk_requests=30, poll="5s",
                 strategy="simple", **kwargs):
        """
        Initialize JDBCRiver.
        
        Args:
            driver (str): JDBC driver class name
            url (str): Database connection URL
            user (str): Database username
            password (str): Database password
            sql (str): SQL query to fetch data
            index (str): Target ElasticSearch index
            type (str): Target document type
            bulk_size (int): Number of documents per bulk request. Default: 100
            bulk_timeout (str): Bulk request timeout. Default: "60s"
            max_bulk_requests (int): Maximum concurrent bulk requests. Default: 30
            poll (str): Polling interval for new data. Default: "5s"
            strategy (str): Update detection strategy. Default: "simple"
            **kwargs: Additional JDBC river parameters
        """
        pass

# JDBC river for MySQL data import
from pyes import JDBCRiver

# Configure MySQL river
mysql_river = JDBCRiver(
    driver="com.mysql.jdbc.Driver",
    url="jdbc:mysql://localhost:3306/mydb",
    user="db_user",
    password="db_password",
    sql="SELECT id, name, email, created_at FROM users WHERE updated_at > ?",
    index="users",
    type="user",
    bulk_size=1000,
    poll="30s",
    strategy="column",  # Use column-based update detection
    column_name="updated_at"  # Track updates by this column
)

# Create and start the river
es.create_river(mysql_river, "mysql_users_river")

# PostgreSQL river example
postgres_river = JDBCRiver(
    driver="org.postgresql.Driver", 
    url="jdbc:postgresql://localhost:5432/mydb",
    user="postgres_user",
    password="postgres_password",
    sql="SELECT product_id, name, description, price FROM products",
    index="catalog",
    type="product",
    bulk_size=500,
    poll="60s"
)

es.create_river(postgres_river, "postgres_products_river")

MongoDB River

class MongoDBRiver(River):
    """
    MongoDB river for importing data from MongoDB collections.
    
    Provides real-time synchronization with MongoDB using oplog tailing
    or periodic collection scanning.
    """
    
    def __init__(self, host="localhost", port=27017, db=None, collection=None,
                 gridfs=False, filter=None, index=None, type=None,
                 bulk_size=100, bulk_timeout="10s", throttle_size=-1,
                 initial_timestamp=None, **kwargs):
        """
        Initialize MongoDBRiver.
        
        Args:
            host (str): MongoDB host. Default: "localhost"
            port (int): MongoDB port. Default: 27017
            db (str): MongoDB database name
            collection (str): MongoDB collection name
            gridfs (bool): Import GridFS files. Default: False
            filter (dict, optional): MongoDB query filter
            index (str): Target ElasticSearch index
            type (str): Target document type
            bulk_size (int): Documents per bulk request. Default: 100
            bulk_timeout (str): Bulk timeout. Default: "10s"
            throttle_size (int): Throttle size (-1 for no throttling). Default: -1
            initial_timestamp (dict, optional): Starting oplog timestamp
            **kwargs: Additional MongoDB river parameters
        """
        pass

# MongoDB river for user collection
from pyes import MongoDBRiver

# Basic MongoDB river
mongo_river = MongoDBRiver(
    host="mongodb-server",
    port=27017,
    db="application_db",
    collection="users",
    index="users",
    type="user_profile",
    bulk_size=1000
)

# MongoDB river with filtering and authentication
filtered_mongo_river = MongoDBRiver(
    host="secure-mongo.example.com",
    port=27017,
    db="ecommerce",
    collection="products",
    filter={"status": "active", "price": {"$gt": 0}},  # Only active products with price > 0
    index="catalog",
    type="product",
    bulk_size=500,
    credentials={
        "user": "mongo_user",
        "password": "mongo_password"
    }
)

# GridFS river for file content
gridfs_river = MongoDBRiver(
    host="localhost",
    db="files_db",
    gridfs=True,  # Import GridFS files
    index="documents",
    type="file",
    bulk_size=100
)

es.create_river(mongo_river, "mongo_users_river")
es.create_river(gridfs_river, "gridfs_files_river")

NoSQL and Document Rivers

CouchDB River

class CouchDBRiver(River):
    """
    CouchDB river for replicating CouchDB databases to ElasticSearch.
    
    Provides continuous replication using CouchDB's change feed mechanism.
    """
    
    def __init__(self, couchdb_host="localhost", couchdb_port=5984,
                 couchdb_db=None, couchdb_user=None, couchdb_password=None,
                 couchdb_filter=None, es_index=None, es_type=None,
                 bulk_size=100, bulk_timeout="10s", **kwargs):
        """
        Initialize CouchDBRiver.
        
        Args:
            couchdb_host (str): CouchDB host. Default: "localhost"
            couchdb_port (int): CouchDB port. Default: 5984
            couchdb_db (str): CouchDB database name
            couchdb_user (str, optional): CouchDB username
            couchdb_password (str, optional): CouchDB password
            couchdb_filter (str, optional): CouchDB filter function
            es_index (str): Target ElasticSearch index
            es_type (str): Target document type
            bulk_size (int): Documents per bulk request. Default: 100
            bulk_timeout (str): Bulk timeout. Default: "10s"
            **kwargs: Additional CouchDB river parameters
        """
        pass

# CouchDB replication river
from pyes import CouchDBRiver

# Basic CouchDB river
couchdb_river = CouchDBRiver(
    couchdb_host="couchdb.example.com",
    couchdb_port=5984,
    couchdb_db="blog_posts",
    es_index="blog",
    es_type="post",
    bulk_size=200
)

# CouchDB river with authentication and filtering
secure_couchdb_river = CouchDBRiver(
    couchdb_host="secure-couch.example.com",
    couchdb_port=6984,
    couchdb_db="documents",
    couchdb_user="couch_user",
    couchdb_password="couch_password",
    couchdb_filter="published_docs/by_status",  # Custom filter function
    es_index="public_docs",
    es_type="document",
    bulk_size=500,
    bulk_timeout="30s"
)

es.create_river(couchdb_river, "couchdb_blog_river")
es.create_river(secure_couchdb_river, "secure_couchdb_river")

Message Queue Rivers

RabbitMQ River

class RabbitMQRiver(River):
    """
    RabbitMQ river for consuming messages from RabbitMQ queues.
    
    Consumes messages from RabbitMQ and indexes them as documents
    in ElasticSearch, enabling real-time message indexing.
    """
    
    def __init__(self, host="localhost", port=5672, user="guest", 
                 password="guest", vhost="/", queue=None, exchange=None,
                 routing_key=None, exchange_type="direct", durable=True,
                 index=None, type=None, bulk_size=100, bulk_timeout="5s",
                 ordered=False, **kwargs):
        """
        Initialize RabbitMQRiver.
        
        Args:
            host (str): RabbitMQ host. Default: "localhost"
            port (int): RabbitMQ port. Default: 5672
            user (str): RabbitMQ username. Default: "guest"
            password (str): RabbitMQ password. Default: "guest"
            vhost (str): RabbitMQ virtual host. Default: "/"
            queue (str): Queue name to consume from
            exchange (str, optional): Exchange name
            routing_key (str, optional): Routing key pattern
            exchange_type (str): Exchange type. Default: "direct"
            durable (bool): Durable queue. Default: True
            index (str): Target ElasticSearch index
            type (str): Target document type
            bulk_size (int): Messages per bulk request. Default: 100
            bulk_timeout (str): Bulk timeout. Default: "5s"
            ordered (bool): Maintain message order. Default: False
            **kwargs: Additional RabbitMQ river parameters
        """
        pass

# RabbitMQ river for log processing
from pyes import RabbitMQRiver

# Basic RabbitMQ river
rabbitmq_river = RabbitMQRiver(
    host="rabbitmq.example.com",
    port=5672,
    user="log_consumer",
    password="consumer_password",
    queue="application_logs",
    index="logs",
    type="log_entry",
    bulk_size=500,
    bulk_timeout="10s"
)

# RabbitMQ river with exchange and routing
exchange_river = RabbitMQRiver(
    host="localhost",
    user="event_consumer", 
    password="event_password",
    exchange="events",
    exchange_type="topic",
    routing_key="user.*.created",  # Route user creation events
    queue="user_events_queue",
    index="user_events",
    type="user_event",
    durable=True,
    ordered=True  # Maintain event order
)

# RabbitMQ river for real-time notifications
notification_river = RabbitMQRiver(
    host="message-broker.example.com",
    vhost="/notifications",
    queue="notification_queue",
    index="notifications",
    type="notification",
    bulk_size=100,
    bulk_timeout="2s"  # Fast processing for real-time notifications
)

es.create_river(rabbitmq_river, "rabbitmq_logs_river")
es.create_river(exchange_river, "rabbitmq_events_river")

Social Media Rivers

Twitter River

class TwitterRiver(River):
    """
    Twitter river for streaming Twitter data into ElasticSearch.
    
    Connects to Twitter Streaming API to index tweets in real-time
    based on search terms, users, or locations.
    """
    
    def __init__(self, oauth_consumer_key=None, oauth_consumer_secret=None,
                 oauth_access_token=None, oauth_access_token_secret=None,
                 filter_tracks=None, filter_follow=None, filter_locations=None,
                 index="twitter", type="tweet", bulk_size=100, 
                 drop_threshold=10, **kwargs):
        """
        Initialize TwitterRiver.
        
        Args:
            oauth_consumer_key (str): Twitter OAuth consumer key
            oauth_consumer_secret (str): Twitter OAuth consumer secret
            oauth_access_token (str): Twitter OAuth access token
            oauth_access_token_secret (str): Twitter OAuth access token secret
            filter_tracks (list, optional): Keywords/hashtags to track
            filter_follow (list, optional): User IDs to follow
            filter_locations (list, optional): Geographic bounding boxes
            index (str): Target ElasticSearch index. Default: "twitter"
            type (str): Target document type. Default: "tweet"
            bulk_size (int): Tweets per bulk request. Default: 100
            drop_threshold (int): Drop tweets if queue exceeds threshold. Default: 10
            **kwargs: Additional Twitter river parameters
        """
        pass

# Twitter river for brand monitoring
from pyes import TwitterRiver

# Track specific keywords and hashtags
twitter_river = TwitterRiver(
    oauth_consumer_key="your_consumer_key",
    oauth_consumer_secret="your_consumer_secret", 
    oauth_access_token="your_access_token",
    oauth_access_token_secret="your_access_token_secret",
    filter_tracks=["elasticsearch", "python", "bigdata", "#elasticsearch"],
    index="social_media",
    type="tweet",
    bulk_size=200,
    drop_threshold=50
)

# Twitter river following specific users
user_twitter_river = TwitterRiver(
    oauth_consumer_key="your_consumer_key",
    oauth_consumer_secret="your_consumer_secret",
    oauth_access_token="your_access_token", 
    oauth_access_token_secret="your_access_token_secret",
    filter_follow=["783214", "6253282", "16121831"],  # Twitter user IDs
    index="user_tweets",
    type="tweet"
)

# Geographic Twitter river for location-based analysis
geo_twitter_river = TwitterRiver(
    oauth_consumer_key="your_consumer_key",
    oauth_consumer_secret="your_consumer_secret",
    oauth_access_token="your_access_token",
    oauth_access_token_secret="your_access_token_secret",
    filter_locations=[
        [-74.0059, 40.7128, -73.9352, 40.7589]  # NYC bounding box
    ],
    index="geo_tweets", 
    type="geo_tweet"
)

es.create_river(twitter_river, "twitter_monitoring_river")
es.create_river(geo_twitter_river, "twitter_geo_river")

Custom River Implementation

Creating Custom Rivers

# Create custom river for specific data sources
class CustomAPIRiver(River):
    """
    Custom river for importing data from REST APIs.
    
    Example implementation for a custom data source.
    """
    
    def __init__(self, api_url=None, api_key=None, endpoint=None,
                 poll_interval="60s", index=None, type=None,
                 bulk_size=100, **kwargs):
        """
        Initialize CustomAPIRiver.
        
        Args:
            api_url (str): Base API URL
            api_key (str): API authentication key
            endpoint (str): API endpoint to poll
            poll_interval (str): Polling interval. Default: "60s"
            index (str): Target ElasticSearch index
            type (str): Target document type
            bulk_size (int): Documents per bulk request. Default: 100
            **kwargs: Additional custom river parameters
        """
        super().__init__(index, type, **kwargs)
        self.api_url = api_url
        self.api_key = api_key
        self.endpoint = endpoint
        self.poll_interval = poll_interval
        self.bulk_size = bulk_size
    
    def serialize(self):
        """Serialize custom river configuration."""
        return {
            "type": "custom_api",
            "custom_api": {
                "api_url": self.api_url,
                "api_key": self.api_key,
                "endpoint": self.endpoint,
                "poll_interval": self.poll_interval,
                "bulk_size": self.bulk_size
            },
            "index": {
                "index": self.index_name,
                "type": self.type_name,
                "bulk_size": self.bulk_size
            }
        }

# RSS/Atom feed river
class RSSRiver(River):
    """Custom river for RSS/Atom feeds."""
    
    def __init__(self, feed_url=None, poll_interval="300s", 
                 index="rss", type="article", **kwargs):
        super().__init__(index, type, **kwargs)
        self.feed_url = feed_url
        self.poll_interval = poll_interval
    
    def serialize(self):
        return {
            "type": "rss",
            "rss": {
                "url": self.feed_url,
                "poll_interval": self.poll_interval
            },
            "index": {
                "index": self.index_name,
                "type": self.type_name
            }
        }

# File system river
class FileSystemRiver(River):
    """Custom river for monitoring file system changes."""
    
    def __init__(self, directory=None, pattern="*", recursive=True,
                 index="files", type="file", **kwargs):
        super().__init__(index, type, **kwargs)
        self.directory = directory
        self.pattern = pattern
        self.recursive = recursive
    
    def serialize(self):
        return {
            "type": "fs",
            "fs": {
                "directory": self.directory,
                "pattern": self.pattern,
                "recursive": self.recursive
            },
            "index": {
                "index": self.index_name,
                "type": self.type_name
            }
        }

# Usage of custom rivers
api_river = CustomAPIRiver(
    api_url="https://api.example.com",
    api_key="your_api_key",
    endpoint="/v1/data",
    poll_interval="120s",
    index="api_data",
    type="api_record"
)

rss_river = RSSRiver(
    feed_url="https://feeds.example.com/news.xml",
    poll_interval="600s",  # Check every 10 minutes
    index="news",
    type="article"
)

fs_river = FileSystemRiver(
    directory="/var/log/application",
    pattern="*.log",
    recursive=True,
    index="log_files",
    type="log_file"
)

River Management Operations

River Lifecycle Management

# River management functions
def manage_rivers(es):
    """Comprehensive river management operations."""
    
    # Create rivers
    def create_data_rivers():
        """Create multiple rivers for different data sources."""
        
        # Database river
        db_river = JDBCRiver(
            driver="com.mysql.jdbc.Driver",
            url="jdbc:mysql://db.example.com/app_db",
            user="river_user", 
            password="river_password",
            sql="SELECT * FROM products WHERE updated_at > ?",
            index="products",
            type="product",
            strategy="column",
            column_name="updated_at"
        )
        
        # Message queue river
        mq_river = RabbitMQRiver(
            host="mq.example.com",
            queue="events_queue",
            index="events",
            type="event"
        )
        
        # Social media river
        social_river = TwitterRiver(
            oauth_consumer_key="key",
            oauth_consumer_secret="secret",
            oauth_access_token="token",
            oauth_access_token_secret="token_secret",
            filter_tracks=["#myapp", "mycompany"],
            index="social",
            type="tweet"
        )
        
        # Create all rivers
        rivers = {
            "product_sync_river": db_river,
            "events_river": mq_river,
            "social_monitoring_river": social_river
        }
        
        for river_name, river_config in rivers.items():
            try:
                es.create_river(river_config, river_name)
                print(f"Created river: {river_name}")
            except Exception as e:
                print(f"Failed to create river {river_name}: {e}")
        
        return rivers
    
    # Monitor river status
    def monitor_river_status():
        """Monitor the status of all rivers."""
        
        try:
            # Get cluster state to see rivers
            cluster_state = es.cluster.state()
            
            # Check river nodes
            if 'nodes' in cluster_state:
                for node_id, node_info in cluster_state['nodes'].items():
                    if 'rivers' in node_info:
                        print(f"Node {node_id} rivers:")
                        for river_name, river_info in node_info['rivers'].items():
                            print(f"  - {river_name}: {river_info.get('status', 'unknown')}")
            
            # Get river statistics (if available)
            river_stats = es.indices.stats(indices=["_river"])
            if river_stats:
                print("River statistics:")
                for stat_name, stat_value in river_stats.items():
                    print(f"  {stat_name}: {stat_value}")
                    
        except Exception as e:
            print(f"Error monitoring rivers: {e}")
    
    # Clean up rivers
    def cleanup_rivers(river_names):
        """Clean up specified rivers."""
        
        for river_name in river_names:
            try:
                # Delete the river
                es.delete_river(None, river_name)
                print(f"Deleted river: {river_name}")
                
                # Clean up river metadata index
                es.indices.delete_index(f"_river_{river_name}")
                print(f"Cleaned up river metadata: {river_name}")
                
            except Exception as e:
                print(f"Error cleaning up river {river_name}: {e}")
    
    # River health check
    def river_health_check():
        """Perform health check on rivers."""
        
        health_status = {}
        
        try:
            # Check if river indices exist
            river_indices = es.indices.status(indices=["_river"])
            
            for index_name, index_info in river_indices.get('indices', {}).items():
                river_name = index_name.replace('_river_', '')
                
                # Check index health
                health = index_info.get('health', 'unknown')
                doc_count = index_info.get('docs', {}).get('num_docs', 0)
                
                health_status[river_name] = {
                    'health': health,
                    'document_count': doc_count,
                    'last_check': '2023-12-01T10:30:00Z'
                }
                
        except Exception as e:
            print(f"Error during river health check: {e}")
        
        return health_status
    
    # Execute management operations
    rivers = create_data_rivers()
    monitor_river_status()
    health_status = river_health_check()
    
    return rivers, health_status

# Execute river management
rivers, health = manage_rivers(es)

River Configuration Patterns

# Common river configuration patterns
def river_configuration_patterns():
    """Common patterns for river configurations."""
    
    # 1. High-throughput river configuration
    def high_throughput_config():
        """Configuration for high-volume data streams."""
        
        return JDBCRiver(
            driver="com.mysql.jdbc.Driver",
            url="jdbc:mysql://db.example.com/large_db",
            user="bulk_user",
            password="bulk_password", 
            sql="SELECT * FROM transactions WHERE processed_at > ?",
            index="transactions",
            type="transaction",
            bulk_size=5000,        # Large bulk size
            bulk_timeout="30s",    # Longer timeout
            max_bulk_requests=10,  # Limit concurrent requests
            poll="10s",           # Frequent polling
            strategy="column",
            column_name="processed_at"
        )
    
    # 2. Low-latency river configuration
    def low_latency_config():
        """Configuration for real-time data requirements."""
        
        return RabbitMQRiver(
            host="realtime-mq.example.com",
            queue="realtime_events",
            index="realtime",
            type="event",
            bulk_size=10,        # Small bulk size
            bulk_timeout="1s",   # Fast timeout
            ordered=True         # Maintain order
        )
    
    # 3. Fault-tolerant river configuration
    def fault_tolerant_config():
        """Configuration with enhanced error handling."""
        
        return MongoDBRiver(
            host="mongo-cluster.example.com",
            db="production_db", 
            collection="critical_data",
            index="critical",
            type="data",
            bulk_size=1000,
            bulk_timeout="60s",
            throttle_size=10000,  # Throttle under high load
            # Enhanced retry configuration
            retry_count=5,
            retry_delay="30s"
        )
    
    # 4. Filtered river configuration  
    def filtered_config():
        """Configuration with content filtering."""
        
        return CouchDBRiver(
            couchdb_host="filtered-couch.example.com",
            couchdb_db="content_db",
            couchdb_filter="content/published_only", # Custom filter
            es_index="published_content",
            es_type="content",
            bulk_size=500
        )
    
    return {
        "high_throughput": high_throughput_config(),
        "low_latency": low_latency_config(),
        "fault_tolerant": fault_tolerant_config(),
        "filtered": filtered_config()
    }

# Apply configuration patterns
configs = river_configuration_patterns()

for config_name, river_config in configs.items():
    river_name = f"{config_name}_river"
    es.create_river(river_config, river_name)
    print(f"Created {config_name} river: {river_name}")

Performance and Monitoring

River Optimization

# River performance optimization strategies
def optimize_river_performance():
    """Best practices for river performance optimization."""
    
    # 1. Bulk size optimization
    def calculate_optimal_bulk_size(doc_size_kb, network_latency_ms, target_throughput):
        """Calculate optimal bulk size based on document characteristics."""
        
        # Rule of thumb: aim for 5-15MB bulk requests
        target_bulk_mb = 10
        optimal_bulk_size = int((target_bulk_mb * 1024) / doc_size_kb)
        
        # Adjust for network latency
        if network_latency_ms > 100:
            optimal_bulk_size = min(optimal_bulk_size, 1000)
        elif network_latency_ms < 20:
            optimal_bulk_size = max(optimal_bulk_size, 5000)
        
        return max(100, min(optimal_bulk_size, 10000))  # Reasonable bounds
    
    # 2. Polling optimization
    def optimize_polling_interval(change_frequency, data_importance):
        """Determine optimal polling interval."""
        
        if data_importance == "critical":
            return "5s" if change_frequency == "high" else "30s"
        elif data_importance == "normal":
            return "30s" if change_frequency == "high" else "300s"
        else:  # low importance
            return "300s" if change_frequency == "high" else "3600s"
    
    # 3. Memory optimization
    def memory_optimized_river():
        """River configuration optimized for memory usage."""
        
        return JDBCRiver(
            driver="com.mysql.jdbc.Driver",
            url="jdbc:mysql://db.example.com/app_db",
            user="river_user",
            password="river_password",
            sql="SELECT id, title, content FROM articles WHERE updated_at > ?",
            index="articles",
            type="article",
            bulk_size=2000,        # Balanced bulk size
            bulk_timeout="45s",    # Reasonable timeout
            max_bulk_requests=5,   # Limit memory usage
            fetch_size=1000,      # JDBC fetch size
            strategy="column"
        )
    
    # 4. Network optimization
    def network_optimized_river():
        """River configuration optimized for network efficiency."""
        
        return MongoDBRiver(
            host="remote-mongo.example.com",
            db="remote_db",
            collection="data",
            index="remote_data", 
            type="record",
            bulk_size=5000,       # Larger bulks for network efficiency
            bulk_timeout="120s",  # Longer timeout for network delays
            throttle_size=50000,  # Higher throttle for batch processing
            # Connection optimization
            socket_timeout=60000,
            connection_timeout=30000
        )
    
    return {
        "memory_optimized": memory_optimized_river(),
        "network_optimized": network_optimized_river()
    }

# River monitoring and alerting
def setup_river_monitoring():
    """Set up monitoring and alerting for rivers."""
    
    import time
    import logging
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger("river_monitor")
    
    def monitor_river_metrics(river_name, es_client):
        """Monitor key river metrics."""
        
        try:
            # Check river index document count
            stats = es_client.indices.stats(indices=[f"_river_{river_name}"])
            
            metrics = {
                "river_name": river_name,
                "timestamp": int(time.time()),
                "document_count": 0,
                "indexing_rate": 0,
                "error_count": 0,
                "status": "unknown"
            }
            
            if stats and 'indices' in stats:
                river_stats = stats['indices'].get(f"_river_{river_name}", {})
                metrics["document_count"] = river_stats.get('total', {}).get('docs', {}).get('count', 0)
            
            # Log metrics
            logger.info(f"River metrics: {metrics}")
            
            # Check for alerts
            if metrics["error_count"] > 10:
                logger.error(f"High error count for river {river_name}: {metrics['error_count']}")
            
            if metrics["indexing_rate"] < 1:  # Less than 1 doc/second
                logger.warning(f"Low indexing rate for river {river_name}: {metrics['indexing_rate']}")
            
            return metrics
            
        except Exception as e:
            logger.error(f"Error monitoring river {river_name}: {e}")
            return None
    
    def setup_alerting():
        """Set up alerting thresholds."""
        
        alert_config = {
            "error_threshold": 50,
            "performance_threshold": 100,  # docs per minute
            "downtime_threshold": 300,     # seconds
            "disk_usage_threshold": 85     # percent
        }
        
        return alert_config
    
    return monitor_river_metrics, setup_alerting

Migration from Rivers

Modern Alternatives

# Migration patterns from rivers to modern alternatives
def river_migration_patterns():
    """Patterns for migrating from rivers to modern solutions."""
    
    # 1. Replace JDBC River with custom Python script
    def jdbc_river_replacement():
        """Replace JDBC river with custom Python indexing."""
        
        import mysql.connector
        from pyes import ES
        import time
        import logging
        
        class DatabaseIndexer:
            def __init__(self, db_config, es_config):
                self.db_config = db_config
                self.es = ES(**es_config)
                self.logger = logging.getLogger("db_indexer")
                
            def run_indexing_loop(self):
                """Run continuous indexing loop."""
                
                last_updated = None
                
                while True:
                    try:
                        # Connect to database
                        conn = mysql.connector.connect(**self.db_config)
                        cursor = conn.cursor(dictionary=True)
                        
                        # Query for new/updated records
                        if last_updated:
                            query = "SELECT * FROM products WHERE updated_at > %s"
                            cursor.execute(query, (last_updated,))
                        else:
                            query = "SELECT * FROM products"
                            cursor.execute(query)
                        
                        # Bulk index documents
                        bulk_docs = []
                        for row in cursor:
                            doc = {
                                "product_id": row["id"],
                                "name": row["name"],
                                "description": row["description"],
                                "price": float(row["price"]),
                                "updated_at": row["updated_at"].isoformat()
                            }
                            bulk_docs.append(doc)
                            
                            if len(bulk_docs) >= 1000:
                                self.bulk_index(bulk_docs)
                                bulk_docs = []
                                
                        # Index remaining documents
                        if bulk_docs:
                            self.bulk_index(bulk_docs)
                            
                        # Update last processed timestamp
                        if cursor.rowcount > 0:
                            cursor.execute("SELECT MAX(updated_at) as max_updated FROM products")
                            result = cursor.fetchone()
                            last_updated = result["max_updated"]
                        
                        cursor.close()
                        conn.close()
                        
                        self.logger.info(f"Processed {cursor.rowcount} records")
                        
                    except Exception as e:
                        self.logger.error(f"Indexing error: {e}")
                    
                    # Wait before next iteration
                    time.sleep(60)  # 1 minute polling
            
            def bulk_index(self, docs):
                """Bulk index documents."""
                for doc in docs:
                    self.es.index(doc, "products", "product", 
                                 id=doc["product_id"], bulk=True)
                self.es.flush_bulk()
        
        return DatabaseIndexer
    
    # 2. Replace RabbitMQ River with Logstash configuration
    def rabbitmq_logstash_config():
        """Logstash configuration to replace RabbitMQ river."""
        
        logstash_config = """
        input {
          rabbitmq {
            host => "rabbitmq.example.com"
            port => 5672
            user => "logstash_user"
            password => "logstash_password"
            queue => "events_queue"
            durable => true
          }
        }
        
        filter {
          json {
            source => "message"
          }
          
          date {
            match => [ "timestamp", "ISO8601" ]
          }
          
          mutate {
            add_field => { "[@metadata][index]" => "events" }
            add_field => { "[@metadata][type]" => "event" }
          }
        }
        
        output {
          elasticsearch {
            hosts => ["elasticsearch.example.com:9200"]
            index => "%{[@metadata][index]}"
            document_type => "%{[@metadata][type]}"
          }
        }
        """
        
        return logstash_config
    
    # 3. Replace Twitter River with Beats
    def twitter_beats_config():
        """Filebeat configuration for Twitter data."""
        
        filebeat_config = """
        filebeat.inputs:
        - type: log
          enabled: true
          paths:
            - /var/log/twitter/*.json
          json.keys_under_root: true
          json.add_error_key: true
        
        output.elasticsearch:
          hosts: ["elasticsearch.example.com:9200"]
          index: "social-media-%{+yyyy.MM.dd}"
          template.name: "social-media"
          template.pattern: "social-media-*"
        """
        
        return filebeat_config
    
    return {
        "jdbc_replacement": jdbc_river_replacement(),
        "logstash_config": rabbitmq_logstash_config(),
        "beats_config": twitter_beats_config()
    }

# Execute migration
migration_patterns = river_migration_patterns()

Rivers in PyES provide powerful data ingestion capabilities for legacy ElasticSearch deployments, enabling real-time synchronization with databases, message queues, and external APIs. While deprecated in newer ElasticSearch versions, they remain useful for older systems and can be migrated to modern alternatives like Beats, Logstash, or custom indexing solutions.

Install with Tessl CLI

npx tessl i tessl/pypi-pyes

docs

bulk-operations.md

client.md

facets-aggregations.md

filters.md

index.md

mappings.md

query-dsl.md

rivers.md

tile.json