Python Elastic Search driver providing a pythonic interface for interacting with ElasticSearch clusters
—
Rivers in PyES provide automated data ingestion from external sources into ElasticSearch. Rivers are long-running processes that continuously pull data from external systems and index it into ElasticSearch, enabling real-time or near-real-time data synchronization. While rivers are deprecated in newer ElasticSearch versions in favor of Beats and Logstash, PyES still provides comprehensive river support for legacy systems.
Note: Rivers were deprecated in ElasticSearch 2.0+. For modern ElasticSearch versions, consider using Beats, Logstash, or custom indexing solutions.
class River:
"""
Base class for all ElasticSearch rivers.
Rivers provide continuous data ingestion from external sources
into ElasticSearch indices.
"""
def __init__(self, index_name=None, type_name=None, **kwargs):
"""
Initialize base river.
Args:
index_name (str, optional): Target index name
type_name (str, optional): Target document type
**kwargs: River-specific configuration parameters
"""
pass
def serialize(self):
"""
Serialize river configuration to ElasticSearch format.
Returns:
dict: River configuration dictionary
"""
pass
# Basic river usage
from pyes import ES
es = ES('localhost:9200')
# Create and start a river
river_config = create_river_configuration()
es.create_river(river_config, "my_data_river")
# Monitor river status
river_status = es.cluster.state(filter_nodes=True)
# Delete river when no longer needed
es.delete_river(river_config, "my_data_river")class JDBCRiver(River):
"""
JDBC river for importing data from relational databases.
Connects to SQL databases and continuously imports data based
on SQL queries and update detection strategies.
"""
def __init__(self, driver=None, url=None, user=None, password=None,
sql=None, index=None, type=None, bulk_size=100,
bulk_timeout="60s", max_bulk_requests=30, poll="5s",
strategy="simple", **kwargs):
"""
Initialize JDBCRiver.
Args:
driver (str): JDBC driver class name
url (str): Database connection URL
user (str): Database username
password (str): Database password
sql (str): SQL query to fetch data
index (str): Target ElasticSearch index
type (str): Target document type
bulk_size (int): Number of documents per bulk request. Default: 100
bulk_timeout (str): Bulk request timeout. Default: "60s"
max_bulk_requests (int): Maximum concurrent bulk requests. Default: 30
poll (str): Polling interval for new data. Default: "5s"
strategy (str): Update detection strategy. Default: "simple"
**kwargs: Additional JDBC river parameters
"""
pass
# JDBC river for MySQL data import
from pyes import JDBCRiver
# Configure MySQL river
mysql_river = JDBCRiver(
driver="com.mysql.jdbc.Driver",
url="jdbc:mysql://localhost:3306/mydb",
user="db_user",
password="db_password",
sql="SELECT id, name, email, created_at FROM users WHERE updated_at > ?",
index="users",
type="user",
bulk_size=1000,
poll="30s",
strategy="column", # Use column-based update detection
column_name="updated_at" # Track updates by this column
)
# Create and start the river
es.create_river(mysql_river, "mysql_users_river")
# PostgreSQL river example
postgres_river = JDBCRiver(
driver="org.postgresql.Driver",
url="jdbc:postgresql://localhost:5432/mydb",
user="postgres_user",
password="postgres_password",
sql="SELECT product_id, name, description, price FROM products",
index="catalog",
type="product",
bulk_size=500,
poll="60s"
)
es.create_river(postgres_river, "postgres_products_river")class MongoDBRiver(River):
"""
MongoDB river for importing data from MongoDB collections.
Provides real-time synchronization with MongoDB using oplog tailing
or periodic collection scanning.
"""
def __init__(self, host="localhost", port=27017, db=None, collection=None,
gridfs=False, filter=None, index=None, type=None,
bulk_size=100, bulk_timeout="10s", throttle_size=-1,
initial_timestamp=None, **kwargs):
"""
Initialize MongoDBRiver.
Args:
host (str): MongoDB host. Default: "localhost"
port (int): MongoDB port. Default: 27017
db (str): MongoDB database name
collection (str): MongoDB collection name
gridfs (bool): Import GridFS files. Default: False
filter (dict, optional): MongoDB query filter
index (str): Target ElasticSearch index
type (str): Target document type
bulk_size (int): Documents per bulk request. Default: 100
bulk_timeout (str): Bulk timeout. Default: "10s"
throttle_size (int): Throttle size (-1 for no throttling). Default: -1
initial_timestamp (dict, optional): Starting oplog timestamp
**kwargs: Additional MongoDB river parameters
"""
pass
# MongoDB river for user collection
from pyes import MongoDBRiver
# Basic MongoDB river
mongo_river = MongoDBRiver(
host="mongodb-server",
port=27017,
db="application_db",
collection="users",
index="users",
type="user_profile",
bulk_size=1000
)
# MongoDB river with filtering and authentication
filtered_mongo_river = MongoDBRiver(
host="secure-mongo.example.com",
port=27017,
db="ecommerce",
collection="products",
filter={"status": "active", "price": {"$gt": 0}}, # Only active products with price > 0
index="catalog",
type="product",
bulk_size=500,
credentials={
"user": "mongo_user",
"password": "mongo_password"
}
)
# GridFS river for file content
gridfs_river = MongoDBRiver(
host="localhost",
db="files_db",
gridfs=True, # Import GridFS files
index="documents",
type="file",
bulk_size=100
)
es.create_river(mongo_river, "mongo_users_river")
es.create_river(gridfs_river, "gridfs_files_river")class CouchDBRiver(River):
"""
CouchDB river for replicating CouchDB databases to ElasticSearch.
Provides continuous replication using CouchDB's change feed mechanism.
"""
def __init__(self, couchdb_host="localhost", couchdb_port=5984,
couchdb_db=None, couchdb_user=None, couchdb_password=None,
couchdb_filter=None, es_index=None, es_type=None,
bulk_size=100, bulk_timeout="10s", **kwargs):
"""
Initialize CouchDBRiver.
Args:
couchdb_host (str): CouchDB host. Default: "localhost"
couchdb_port (int): CouchDB port. Default: 5984
couchdb_db (str): CouchDB database name
couchdb_user (str, optional): CouchDB username
couchdb_password (str, optional): CouchDB password
couchdb_filter (str, optional): CouchDB filter function
es_index (str): Target ElasticSearch index
es_type (str): Target document type
bulk_size (int): Documents per bulk request. Default: 100
bulk_timeout (str): Bulk timeout. Default: "10s"
**kwargs: Additional CouchDB river parameters
"""
pass
# CouchDB replication river
from pyes import CouchDBRiver
# Basic CouchDB river
couchdb_river = CouchDBRiver(
couchdb_host="couchdb.example.com",
couchdb_port=5984,
couchdb_db="blog_posts",
es_index="blog",
es_type="post",
bulk_size=200
)
# CouchDB river with authentication and filtering
secure_couchdb_river = CouchDBRiver(
couchdb_host="secure-couch.example.com",
couchdb_port=6984,
couchdb_db="documents",
couchdb_user="couch_user",
couchdb_password="couch_password",
couchdb_filter="published_docs/by_status", # Custom filter function
es_index="public_docs",
es_type="document",
bulk_size=500,
bulk_timeout="30s"
)
es.create_river(couchdb_river, "couchdb_blog_river")
es.create_river(secure_couchdb_river, "secure_couchdb_river")class RabbitMQRiver(River):
"""
RabbitMQ river for consuming messages from RabbitMQ queues.
Consumes messages from RabbitMQ and indexes them as documents
in ElasticSearch, enabling real-time message indexing.
"""
def __init__(self, host="localhost", port=5672, user="guest",
password="guest", vhost="/", queue=None, exchange=None,
routing_key=None, exchange_type="direct", durable=True,
index=None, type=None, bulk_size=100, bulk_timeout="5s",
ordered=False, **kwargs):
"""
Initialize RabbitMQRiver.
Args:
host (str): RabbitMQ host. Default: "localhost"
port (int): RabbitMQ port. Default: 5672
user (str): RabbitMQ username. Default: "guest"
password (str): RabbitMQ password. Default: "guest"
vhost (str): RabbitMQ virtual host. Default: "/"
queue (str): Queue name to consume from
exchange (str, optional): Exchange name
routing_key (str, optional): Routing key pattern
exchange_type (str): Exchange type. Default: "direct"
durable (bool): Durable queue. Default: True
index (str): Target ElasticSearch index
type (str): Target document type
bulk_size (int): Messages per bulk request. Default: 100
bulk_timeout (str): Bulk timeout. Default: "5s"
ordered (bool): Maintain message order. Default: False
**kwargs: Additional RabbitMQ river parameters
"""
pass
# RabbitMQ river for log processing
from pyes import RabbitMQRiver
# Basic RabbitMQ river
rabbitmq_river = RabbitMQRiver(
host="rabbitmq.example.com",
port=5672,
user="log_consumer",
password="consumer_password",
queue="application_logs",
index="logs",
type="log_entry",
bulk_size=500,
bulk_timeout="10s"
)
# RabbitMQ river with exchange and routing
exchange_river = RabbitMQRiver(
host="localhost",
user="event_consumer",
password="event_password",
exchange="events",
exchange_type="topic",
routing_key="user.*.created", # Route user creation events
queue="user_events_queue",
index="user_events",
type="user_event",
durable=True,
ordered=True # Maintain event order
)
# RabbitMQ river for real-time notifications
notification_river = RabbitMQRiver(
host="message-broker.example.com",
vhost="/notifications",
queue="notification_queue",
index="notifications",
type="notification",
bulk_size=100,
bulk_timeout="2s" # Fast processing for real-time notifications
)
es.create_river(rabbitmq_river, "rabbitmq_logs_river")
es.create_river(exchange_river, "rabbitmq_events_river")class TwitterRiver(River):
"""
Twitter river for streaming Twitter data into ElasticSearch.
Connects to Twitter Streaming API to index tweets in real-time
based on search terms, users, or locations.
"""
def __init__(self, oauth_consumer_key=None, oauth_consumer_secret=None,
oauth_access_token=None, oauth_access_token_secret=None,
filter_tracks=None, filter_follow=None, filter_locations=None,
index="twitter", type="tweet", bulk_size=100,
drop_threshold=10, **kwargs):
"""
Initialize TwitterRiver.
Args:
oauth_consumer_key (str): Twitter OAuth consumer key
oauth_consumer_secret (str): Twitter OAuth consumer secret
oauth_access_token (str): Twitter OAuth access token
oauth_access_token_secret (str): Twitter OAuth access token secret
filter_tracks (list, optional): Keywords/hashtags to track
filter_follow (list, optional): User IDs to follow
filter_locations (list, optional): Geographic bounding boxes
index (str): Target ElasticSearch index. Default: "twitter"
type (str): Target document type. Default: "tweet"
bulk_size (int): Tweets per bulk request. Default: 100
drop_threshold (int): Drop tweets if queue exceeds threshold. Default: 10
**kwargs: Additional Twitter river parameters
"""
pass
# Twitter river for brand monitoring
from pyes import TwitterRiver
# Track specific keywords and hashtags
twitter_river = TwitterRiver(
oauth_consumer_key="your_consumer_key",
oauth_consumer_secret="your_consumer_secret",
oauth_access_token="your_access_token",
oauth_access_token_secret="your_access_token_secret",
filter_tracks=["elasticsearch", "python", "bigdata", "#elasticsearch"],
index="social_media",
type="tweet",
bulk_size=200,
drop_threshold=50
)
# Twitter river following specific users
user_twitter_river = TwitterRiver(
oauth_consumer_key="your_consumer_key",
oauth_consumer_secret="your_consumer_secret",
oauth_access_token="your_access_token",
oauth_access_token_secret="your_access_token_secret",
filter_follow=["783214", "6253282", "16121831"], # Twitter user IDs
index="user_tweets",
type="tweet"
)
# Geographic Twitter river for location-based analysis
geo_twitter_river = TwitterRiver(
oauth_consumer_key="your_consumer_key",
oauth_consumer_secret="your_consumer_secret",
oauth_access_token="your_access_token",
oauth_access_token_secret="your_access_token_secret",
filter_locations=[
[-74.0059, 40.7128, -73.9352, 40.7589] # NYC bounding box
],
index="geo_tweets",
type="geo_tweet"
)
es.create_river(twitter_river, "twitter_monitoring_river")
es.create_river(geo_twitter_river, "twitter_geo_river")# Create custom river for specific data sources
class CustomAPIRiver(River):
"""
Custom river for importing data from REST APIs.
Example implementation for a custom data source.
"""
def __init__(self, api_url=None, api_key=None, endpoint=None,
poll_interval="60s", index=None, type=None,
bulk_size=100, **kwargs):
"""
Initialize CustomAPIRiver.
Args:
api_url (str): Base API URL
api_key (str): API authentication key
endpoint (str): API endpoint to poll
poll_interval (str): Polling interval. Default: "60s"
index (str): Target ElasticSearch index
type (str): Target document type
bulk_size (int): Documents per bulk request. Default: 100
**kwargs: Additional custom river parameters
"""
super().__init__(index, type, **kwargs)
self.api_url = api_url
self.api_key = api_key
self.endpoint = endpoint
self.poll_interval = poll_interval
self.bulk_size = bulk_size
def serialize(self):
"""Serialize custom river configuration."""
return {
"type": "custom_api",
"custom_api": {
"api_url": self.api_url,
"api_key": self.api_key,
"endpoint": self.endpoint,
"poll_interval": self.poll_interval,
"bulk_size": self.bulk_size
},
"index": {
"index": self.index_name,
"type": self.type_name,
"bulk_size": self.bulk_size
}
}
# RSS/Atom feed river
class RSSRiver(River):
"""Custom river for RSS/Atom feeds."""
def __init__(self, feed_url=None, poll_interval="300s",
index="rss", type="article", **kwargs):
super().__init__(index, type, **kwargs)
self.feed_url = feed_url
self.poll_interval = poll_interval
def serialize(self):
return {
"type": "rss",
"rss": {
"url": self.feed_url,
"poll_interval": self.poll_interval
},
"index": {
"index": self.index_name,
"type": self.type_name
}
}
# File system river
class FileSystemRiver(River):
"""Custom river for monitoring file system changes."""
def __init__(self, directory=None, pattern="*", recursive=True,
index="files", type="file", **kwargs):
super().__init__(index, type, **kwargs)
self.directory = directory
self.pattern = pattern
self.recursive = recursive
def serialize(self):
return {
"type": "fs",
"fs": {
"directory": self.directory,
"pattern": self.pattern,
"recursive": self.recursive
},
"index": {
"index": self.index_name,
"type": self.type_name
}
}
# Usage of custom rivers
api_river = CustomAPIRiver(
api_url="https://api.example.com",
api_key="your_api_key",
endpoint="/v1/data",
poll_interval="120s",
index="api_data",
type="api_record"
)
rss_river = RSSRiver(
feed_url="https://feeds.example.com/news.xml",
poll_interval="600s", # Check every 10 minutes
index="news",
type="article"
)
fs_river = FileSystemRiver(
directory="/var/log/application",
pattern="*.log",
recursive=True,
index="log_files",
type="log_file"
)# River management functions
def manage_rivers(es):
"""Comprehensive river management operations."""
# Create rivers
def create_data_rivers():
"""Create multiple rivers for different data sources."""
# Database river
db_river = JDBCRiver(
driver="com.mysql.jdbc.Driver",
url="jdbc:mysql://db.example.com/app_db",
user="river_user",
password="river_password",
sql="SELECT * FROM products WHERE updated_at > ?",
index="products",
type="product",
strategy="column",
column_name="updated_at"
)
# Message queue river
mq_river = RabbitMQRiver(
host="mq.example.com",
queue="events_queue",
index="events",
type="event"
)
# Social media river
social_river = TwitterRiver(
oauth_consumer_key="key",
oauth_consumer_secret="secret",
oauth_access_token="token",
oauth_access_token_secret="token_secret",
filter_tracks=["#myapp", "mycompany"],
index="social",
type="tweet"
)
# Create all rivers
rivers = {
"product_sync_river": db_river,
"events_river": mq_river,
"social_monitoring_river": social_river
}
for river_name, river_config in rivers.items():
try:
es.create_river(river_config, river_name)
print(f"Created river: {river_name}")
except Exception as e:
print(f"Failed to create river {river_name}: {e}")
return rivers
# Monitor river status
def monitor_river_status():
"""Monitor the status of all rivers."""
try:
# Get cluster state to see rivers
cluster_state = es.cluster.state()
# Check river nodes
if 'nodes' in cluster_state:
for node_id, node_info in cluster_state['nodes'].items():
if 'rivers' in node_info:
print(f"Node {node_id} rivers:")
for river_name, river_info in node_info['rivers'].items():
print(f" - {river_name}: {river_info.get('status', 'unknown')}")
# Get river statistics (if available)
river_stats = es.indices.stats(indices=["_river"])
if river_stats:
print("River statistics:")
for stat_name, stat_value in river_stats.items():
print(f" {stat_name}: {stat_value}")
except Exception as e:
print(f"Error monitoring rivers: {e}")
# Clean up rivers
def cleanup_rivers(river_names):
"""Clean up specified rivers."""
for river_name in river_names:
try:
# Delete the river
es.delete_river(None, river_name)
print(f"Deleted river: {river_name}")
# Clean up river metadata index
es.indices.delete_index(f"_river_{river_name}")
print(f"Cleaned up river metadata: {river_name}")
except Exception as e:
print(f"Error cleaning up river {river_name}: {e}")
# River health check
def river_health_check():
"""Perform health check on rivers."""
health_status = {}
try:
# Check if river indices exist
river_indices = es.indices.status(indices=["_river"])
for index_name, index_info in river_indices.get('indices', {}).items():
river_name = index_name.replace('_river_', '')
# Check index health
health = index_info.get('health', 'unknown')
doc_count = index_info.get('docs', {}).get('num_docs', 0)
health_status[river_name] = {
'health': health,
'document_count': doc_count,
'last_check': '2023-12-01T10:30:00Z'
}
except Exception as e:
print(f"Error during river health check: {e}")
return health_status
# Execute management operations
rivers = create_data_rivers()
monitor_river_status()
health_status = river_health_check()
return rivers, health_status
# Execute river management
rivers, health = manage_rivers(es)# Common river configuration patterns
def river_configuration_patterns():
"""Common patterns for river configurations."""
# 1. High-throughput river configuration
def high_throughput_config():
"""Configuration for high-volume data streams."""
return JDBCRiver(
driver="com.mysql.jdbc.Driver",
url="jdbc:mysql://db.example.com/large_db",
user="bulk_user",
password="bulk_password",
sql="SELECT * FROM transactions WHERE processed_at > ?",
index="transactions",
type="transaction",
bulk_size=5000, # Large bulk size
bulk_timeout="30s", # Longer timeout
max_bulk_requests=10, # Limit concurrent requests
poll="10s", # Frequent polling
strategy="column",
column_name="processed_at"
)
# 2. Low-latency river configuration
def low_latency_config():
"""Configuration for real-time data requirements."""
return RabbitMQRiver(
host="realtime-mq.example.com",
queue="realtime_events",
index="realtime",
type="event",
bulk_size=10, # Small bulk size
bulk_timeout="1s", # Fast timeout
ordered=True # Maintain order
)
# 3. Fault-tolerant river configuration
def fault_tolerant_config():
"""Configuration with enhanced error handling."""
return MongoDBRiver(
host="mongo-cluster.example.com",
db="production_db",
collection="critical_data",
index="critical",
type="data",
bulk_size=1000,
bulk_timeout="60s",
throttle_size=10000, # Throttle under high load
# Enhanced retry configuration
retry_count=5,
retry_delay="30s"
)
# 4. Filtered river configuration
def filtered_config():
"""Configuration with content filtering."""
return CouchDBRiver(
couchdb_host="filtered-couch.example.com",
couchdb_db="content_db",
couchdb_filter="content/published_only", # Custom filter
es_index="published_content",
es_type="content",
bulk_size=500
)
return {
"high_throughput": high_throughput_config(),
"low_latency": low_latency_config(),
"fault_tolerant": fault_tolerant_config(),
"filtered": filtered_config()
}
# Apply configuration patterns
configs = river_configuration_patterns()
for config_name, river_config in configs.items():
river_name = f"{config_name}_river"
es.create_river(river_config, river_name)
print(f"Created {config_name} river: {river_name}")# River performance optimization strategies
def optimize_river_performance():
"""Best practices for river performance optimization."""
# 1. Bulk size optimization
def calculate_optimal_bulk_size(doc_size_kb, network_latency_ms, target_throughput):
"""Calculate optimal bulk size based on document characteristics."""
# Rule of thumb: aim for 5-15MB bulk requests
target_bulk_mb = 10
optimal_bulk_size = int((target_bulk_mb * 1024) / doc_size_kb)
# Adjust for network latency
if network_latency_ms > 100:
optimal_bulk_size = min(optimal_bulk_size, 1000)
elif network_latency_ms < 20:
optimal_bulk_size = max(optimal_bulk_size, 5000)
return max(100, min(optimal_bulk_size, 10000)) # Reasonable bounds
# 2. Polling optimization
def optimize_polling_interval(change_frequency, data_importance):
"""Determine optimal polling interval."""
if data_importance == "critical":
return "5s" if change_frequency == "high" else "30s"
elif data_importance == "normal":
return "30s" if change_frequency == "high" else "300s"
else: # low importance
return "300s" if change_frequency == "high" else "3600s"
# 3. Memory optimization
def memory_optimized_river():
"""River configuration optimized for memory usage."""
return JDBCRiver(
driver="com.mysql.jdbc.Driver",
url="jdbc:mysql://db.example.com/app_db",
user="river_user",
password="river_password",
sql="SELECT id, title, content FROM articles WHERE updated_at > ?",
index="articles",
type="article",
bulk_size=2000, # Balanced bulk size
bulk_timeout="45s", # Reasonable timeout
max_bulk_requests=5, # Limit memory usage
fetch_size=1000, # JDBC fetch size
strategy="column"
)
# 4. Network optimization
def network_optimized_river():
"""River configuration optimized for network efficiency."""
return MongoDBRiver(
host="remote-mongo.example.com",
db="remote_db",
collection="data",
index="remote_data",
type="record",
bulk_size=5000, # Larger bulks for network efficiency
bulk_timeout="120s", # Longer timeout for network delays
throttle_size=50000, # Higher throttle for batch processing
# Connection optimization
socket_timeout=60000,
connection_timeout=30000
)
return {
"memory_optimized": memory_optimized_river(),
"network_optimized": network_optimized_river()
}
# River monitoring and alerting
def setup_river_monitoring():
"""Set up monitoring and alerting for rivers."""
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("river_monitor")
def monitor_river_metrics(river_name, es_client):
"""Monitor key river metrics."""
try:
# Check river index document count
stats = es_client.indices.stats(indices=[f"_river_{river_name}"])
metrics = {
"river_name": river_name,
"timestamp": int(time.time()),
"document_count": 0,
"indexing_rate": 0,
"error_count": 0,
"status": "unknown"
}
if stats and 'indices' in stats:
river_stats = stats['indices'].get(f"_river_{river_name}", {})
metrics["document_count"] = river_stats.get('total', {}).get('docs', {}).get('count', 0)
# Log metrics
logger.info(f"River metrics: {metrics}")
# Check for alerts
if metrics["error_count"] > 10:
logger.error(f"High error count for river {river_name}: {metrics['error_count']}")
if metrics["indexing_rate"] < 1: # Less than 1 doc/second
logger.warning(f"Low indexing rate for river {river_name}: {metrics['indexing_rate']}")
return metrics
except Exception as e:
logger.error(f"Error monitoring river {river_name}: {e}")
return None
def setup_alerting():
"""Set up alerting thresholds."""
alert_config = {
"error_threshold": 50,
"performance_threshold": 100, # docs per minute
"downtime_threshold": 300, # seconds
"disk_usage_threshold": 85 # percent
}
return alert_config
return monitor_river_metrics, setup_alerting# Migration patterns from rivers to modern alternatives
def river_migration_patterns():
"""Patterns for migrating from rivers to modern solutions."""
# 1. Replace JDBC River with custom Python script
def jdbc_river_replacement():
"""Replace JDBC river with custom Python indexing."""
import mysql.connector
from pyes import ES
import time
import logging
class DatabaseIndexer:
def __init__(self, db_config, es_config):
self.db_config = db_config
self.es = ES(**es_config)
self.logger = logging.getLogger("db_indexer")
def run_indexing_loop(self):
"""Run continuous indexing loop."""
last_updated = None
while True:
try:
# Connect to database
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor(dictionary=True)
# Query for new/updated records
if last_updated:
query = "SELECT * FROM products WHERE updated_at > %s"
cursor.execute(query, (last_updated,))
else:
query = "SELECT * FROM products"
cursor.execute(query)
# Bulk index documents
bulk_docs = []
for row in cursor:
doc = {
"product_id": row["id"],
"name": row["name"],
"description": row["description"],
"price": float(row["price"]),
"updated_at": row["updated_at"].isoformat()
}
bulk_docs.append(doc)
if len(bulk_docs) >= 1000:
self.bulk_index(bulk_docs)
bulk_docs = []
# Index remaining documents
if bulk_docs:
self.bulk_index(bulk_docs)
# Update last processed timestamp
if cursor.rowcount > 0:
cursor.execute("SELECT MAX(updated_at) as max_updated FROM products")
result = cursor.fetchone()
last_updated = result["max_updated"]
cursor.close()
conn.close()
self.logger.info(f"Processed {cursor.rowcount} records")
except Exception as e:
self.logger.error(f"Indexing error: {e}")
# Wait before next iteration
time.sleep(60) # 1 minute polling
def bulk_index(self, docs):
"""Bulk index documents."""
for doc in docs:
self.es.index(doc, "products", "product",
id=doc["product_id"], bulk=True)
self.es.flush_bulk()
return DatabaseIndexer
# 2. Replace RabbitMQ River with Logstash configuration
def rabbitmq_logstash_config():
"""Logstash configuration to replace RabbitMQ river."""
logstash_config = """
input {
rabbitmq {
host => "rabbitmq.example.com"
port => 5672
user => "logstash_user"
password => "logstash_password"
queue => "events_queue"
durable => true
}
}
filter {
json {
source => "message"
}
date {
match => [ "timestamp", "ISO8601" ]
}
mutate {
add_field => { "[@metadata][index]" => "events" }
add_field => { "[@metadata][type]" => "event" }
}
}
output {
elasticsearch {
hosts => ["elasticsearch.example.com:9200"]
index => "%{[@metadata][index]}"
document_type => "%{[@metadata][type]}"
}
}
"""
return logstash_config
# 3. Replace Twitter River with Beats
def twitter_beats_config():
"""Filebeat configuration for Twitter data."""
filebeat_config = """
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/twitter/*.json
json.keys_under_root: true
json.add_error_key: true
output.elasticsearch:
hosts: ["elasticsearch.example.com:9200"]
index: "social-media-%{+yyyy.MM.dd}"
template.name: "social-media"
template.pattern: "social-media-*"
"""
return filebeat_config
return {
"jdbc_replacement": jdbc_river_replacement(),
"logstash_config": rabbitmq_logstash_config(),
"beats_config": twitter_beats_config()
}
# Execute migration
migration_patterns = river_migration_patterns()Rivers in PyES provide powerful data ingestion capabilities for legacy ElasticSearch deployments, enabling real-time synchronization with databases, message queues, and external APIs. While deprecated in newer ElasticSearch versions, they remain useful for older systems and can be migrated to modern alternatives like Beats, Logstash, or custom indexing solutions.
Install with Tessl CLI
npx tessl i tessl/pypi-pyes