0
# PyES Rivers for Data Streaming
1
2
## Overview
3
4
Rivers in PyES provide automated data ingestion from external sources into ElasticSearch. Rivers are long-running processes that continuously pull data from external systems and index it into ElasticSearch, enabling real-time or near-real-time data synchronization. While rivers are deprecated in newer ElasticSearch versions in favor of Beats and Logstash, PyES still provides comprehensive river support for legacy systems.
5
6
**Note**: Rivers were deprecated in ElasticSearch 2.0+. For modern ElasticSearch versions, consider using Beats, Logstash, or custom indexing solutions.
7
8
## Base River Class
9
10
### River
11
12
```python { .api }
13
class River:
14
"""
15
Base class for all ElasticSearch rivers.
16
17
Rivers provide continuous data ingestion from external sources
18
into ElasticSearch indices.
19
"""
20
21
def __init__(self, index_name=None, type_name=None, **kwargs):
22
"""
23
Initialize base river.
24
25
Args:
26
index_name (str, optional): Target index name
27
type_name (str, optional): Target document type
28
**kwargs: River-specific configuration parameters
29
"""
30
pass
31
32
def serialize(self):
33
"""
34
Serialize river configuration to ElasticSearch format.
35
36
Returns:
37
dict: River configuration dictionary
38
"""
39
pass
40
41
# Basic river usage
42
from pyes import ES
43
44
es = ES('localhost:9200')
45
46
# Create and start a river
47
river_config = create_river_configuration()
48
es.create_river(river_config, "my_data_river")
49
50
# Monitor river status
51
river_status = es.cluster.state(filter_nodes=True)
52
53
# Delete river when no longer needed
54
es.delete_river(river_config, "my_data_river")
55
```
56
57
## Database Rivers
58
59
### JDBC River
60
61
```python { .api }
62
class JDBCRiver(River):
63
"""
64
JDBC river for importing data from relational databases.
65
66
Connects to SQL databases and continuously imports data based
67
on SQL queries and update detection strategies.
68
"""
69
70
def __init__(self, driver=None, url=None, user=None, password=None,
71
sql=None, index=None, type=None, bulk_size=100,
72
bulk_timeout="60s", max_bulk_requests=30, poll="5s",
73
strategy="simple", **kwargs):
74
"""
75
Initialize JDBCRiver.
76
77
Args:
78
driver (str): JDBC driver class name
79
url (str): Database connection URL
80
user (str): Database username
81
password (str): Database password
82
sql (str): SQL query to fetch data
83
index (str): Target ElasticSearch index
84
type (str): Target document type
85
bulk_size (int): Number of documents per bulk request. Default: 100
86
bulk_timeout (str): Bulk request timeout. Default: "60s"
87
max_bulk_requests (int): Maximum concurrent bulk requests. Default: 30
88
poll (str): Polling interval for new data. Default: "5s"
89
strategy (str): Update detection strategy. Default: "simple"
90
**kwargs: Additional JDBC river parameters
91
"""
92
pass
93
94
# JDBC river for MySQL data import
95
from pyes import JDBCRiver
96
97
# Configure MySQL river
98
mysql_river = JDBCRiver(
99
driver="com.mysql.jdbc.Driver",
100
url="jdbc:mysql://localhost:3306/mydb",
101
user="db_user",
102
password="db_password",
103
sql="SELECT id, name, email, created_at FROM users WHERE updated_at > ?",
104
index="users",
105
type="user",
106
bulk_size=1000,
107
poll="30s",
108
strategy="column", # Use column-based update detection
109
column_name="updated_at" # Track updates by this column
110
)
111
112
# Create and start the river
113
es.create_river(mysql_river, "mysql_users_river")
114
115
# PostgreSQL river example
116
postgres_river = JDBCRiver(
117
driver="org.postgresql.Driver",
118
url="jdbc:postgresql://localhost:5432/mydb",
119
user="postgres_user",
120
password="postgres_password",
121
sql="SELECT product_id, name, description, price FROM products",
122
index="catalog",
123
type="product",
124
bulk_size=500,
125
poll="60s"
126
)
127
128
es.create_river(postgres_river, "postgres_products_river")
129
```
130
131
### MongoDB River
132
133
```python { .api }
134
class MongoDBRiver(River):
135
"""
136
MongoDB river for importing data from MongoDB collections.
137
138
Provides real-time synchronization with MongoDB using oplog tailing
139
or periodic collection scanning.
140
"""
141
142
def __init__(self, host="localhost", port=27017, db=None, collection=None,
143
gridfs=False, filter=None, index=None, type=None,
144
bulk_size=100, bulk_timeout="10s", throttle_size=-1,
145
initial_timestamp=None, **kwargs):
146
"""
147
Initialize MongoDBRiver.
148
149
Args:
150
host (str): MongoDB host. Default: "localhost"
151
port (int): MongoDB port. Default: 27017
152
db (str): MongoDB database name
153
collection (str): MongoDB collection name
154
gridfs (bool): Import GridFS files. Default: False
155
filter (dict, optional): MongoDB query filter
156
index (str): Target ElasticSearch index
157
type (str): Target document type
158
bulk_size (int): Documents per bulk request. Default: 100
159
bulk_timeout (str): Bulk timeout. Default: "10s"
160
throttle_size (int): Throttle size (-1 for no throttling). Default: -1
161
initial_timestamp (dict, optional): Starting oplog timestamp
162
**kwargs: Additional MongoDB river parameters
163
"""
164
pass
165
166
# MongoDB river for user collection
167
from pyes import MongoDBRiver
168
169
# Basic MongoDB river
170
mongo_river = MongoDBRiver(
171
host="mongodb-server",
172
port=27017,
173
db="application_db",
174
collection="users",
175
index="users",
176
type="user_profile",
177
bulk_size=1000
178
)
179
180
# MongoDB river with filtering and authentication
181
filtered_mongo_river = MongoDBRiver(
182
host="secure-mongo.example.com",
183
port=27017,
184
db="ecommerce",
185
collection="products",
186
filter={"status": "active", "price": {"$gt": 0}}, # Only active products with price > 0
187
index="catalog",
188
type="product",
189
bulk_size=500,
190
credentials={
191
"user": "mongo_user",
192
"password": "mongo_password"
193
}
194
)
195
196
# GridFS river for file content
197
gridfs_river = MongoDBRiver(
198
host="localhost",
199
db="files_db",
200
gridfs=True, # Import GridFS files
201
index="documents",
202
type="file",
203
bulk_size=100
204
)
205
206
es.create_river(mongo_river, "mongo_users_river")
207
es.create_river(gridfs_river, "gridfs_files_river")
208
```
209
210
## NoSQL and Document Rivers
211
212
### CouchDB River
213
214
```python { .api }
215
class CouchDBRiver(River):
216
"""
217
CouchDB river for replicating CouchDB databases to ElasticSearch.
218
219
Provides continuous replication using CouchDB's change feed mechanism.
220
"""
221
222
def __init__(self, couchdb_host="localhost", couchdb_port=5984,
223
couchdb_db=None, couchdb_user=None, couchdb_password=None,
224
couchdb_filter=None, es_index=None, es_type=None,
225
bulk_size=100, bulk_timeout="10s", **kwargs):
226
"""
227
Initialize CouchDBRiver.
228
229
Args:
230
couchdb_host (str): CouchDB host. Default: "localhost"
231
couchdb_port (int): CouchDB port. Default: 5984
232
couchdb_db (str): CouchDB database name
233
couchdb_user (str, optional): CouchDB username
234
couchdb_password (str, optional): CouchDB password
235
couchdb_filter (str, optional): CouchDB filter function
236
es_index (str): Target ElasticSearch index
237
es_type (str): Target document type
238
bulk_size (int): Documents per bulk request. Default: 100
239
bulk_timeout (str): Bulk timeout. Default: "10s"
240
**kwargs: Additional CouchDB river parameters
241
"""
242
pass
243
244
# CouchDB replication river
245
from pyes import CouchDBRiver
246
247
# Basic CouchDB river
248
couchdb_river = CouchDBRiver(
249
couchdb_host="couchdb.example.com",
250
couchdb_port=5984,
251
couchdb_db="blog_posts",
252
es_index="blog",
253
es_type="post",
254
bulk_size=200
255
)
256
257
# CouchDB river with authentication and filtering
258
secure_couchdb_river = CouchDBRiver(
259
couchdb_host="secure-couch.example.com",
260
couchdb_port=6984,
261
couchdb_db="documents",
262
couchdb_user="couch_user",
263
couchdb_password="couch_password",
264
couchdb_filter="published_docs/by_status", # Custom filter function
265
es_index="public_docs",
266
es_type="document",
267
bulk_size=500,
268
bulk_timeout="30s"
269
)
270
271
es.create_river(couchdb_river, "couchdb_blog_river")
272
es.create_river(secure_couchdb_river, "secure_couchdb_river")
273
```
274
275
## Message Queue Rivers
276
277
### RabbitMQ River
278
279
```python { .api }
280
class RabbitMQRiver(River):
281
"""
282
RabbitMQ river for consuming messages from RabbitMQ queues.
283
284
Consumes messages from RabbitMQ and indexes them as documents
285
in ElasticSearch, enabling real-time message indexing.
286
"""
287
288
def __init__(self, host="localhost", port=5672, user="guest",
289
password="guest", vhost="/", queue=None, exchange=None,
290
routing_key=None, exchange_type="direct", durable=True,
291
index=None, type=None, bulk_size=100, bulk_timeout="5s",
292
ordered=False, **kwargs):
293
"""
294
Initialize RabbitMQRiver.
295
296
Args:
297
host (str): RabbitMQ host. Default: "localhost"
298
port (int): RabbitMQ port. Default: 5672
299
user (str): RabbitMQ username. Default: "guest"
300
password (str): RabbitMQ password. Default: "guest"
301
vhost (str): RabbitMQ virtual host. Default: "/"
302
queue (str): Queue name to consume from
303
exchange (str, optional): Exchange name
304
routing_key (str, optional): Routing key pattern
305
exchange_type (str): Exchange type. Default: "direct"
306
durable (bool): Durable queue. Default: True
307
index (str): Target ElasticSearch index
308
type (str): Target document type
309
bulk_size (int): Messages per bulk request. Default: 100
310
bulk_timeout (str): Bulk timeout. Default: "5s"
311
ordered (bool): Maintain message order. Default: False
312
**kwargs: Additional RabbitMQ river parameters
313
"""
314
pass
315
316
# RabbitMQ river for log processing
317
from pyes import RabbitMQRiver
318
319
# Basic RabbitMQ river
320
rabbitmq_river = RabbitMQRiver(
321
host="rabbitmq.example.com",
322
port=5672,
323
user="log_consumer",
324
password="consumer_password",
325
queue="application_logs",
326
index="logs",
327
type="log_entry",
328
bulk_size=500,
329
bulk_timeout="10s"
330
)
331
332
# RabbitMQ river with exchange and routing
333
exchange_river = RabbitMQRiver(
334
host="localhost",
335
user="event_consumer",
336
password="event_password",
337
exchange="events",
338
exchange_type="topic",
339
routing_key="user.*.created", # Route user creation events
340
queue="user_events_queue",
341
index="user_events",
342
type="user_event",
343
durable=True,
344
ordered=True # Maintain event order
345
)
346
347
# RabbitMQ river for real-time notifications
348
notification_river = RabbitMQRiver(
349
host="message-broker.example.com",
350
vhost="/notifications",
351
queue="notification_queue",
352
index="notifications",
353
type="notification",
354
bulk_size=100,
355
bulk_timeout="2s" # Fast processing for real-time notifications
356
)
357
358
es.create_river(rabbitmq_river, "rabbitmq_logs_river")
359
es.create_river(exchange_river, "rabbitmq_events_river")
360
```
361
362
## Social Media Rivers
363
364
### Twitter River
365
366
```python { .api }
367
class TwitterRiver(River):
368
"""
369
Twitter river for streaming Twitter data into ElasticSearch.
370
371
Connects to Twitter Streaming API to index tweets in real-time
372
based on search terms, users, or locations.
373
"""
374
375
def __init__(self, oauth_consumer_key=None, oauth_consumer_secret=None,
376
oauth_access_token=None, oauth_access_token_secret=None,
377
filter_tracks=None, filter_follow=None, filter_locations=None,
378
index="twitter", type="tweet", bulk_size=100,
379
drop_threshold=10, **kwargs):
380
"""
381
Initialize TwitterRiver.
382
383
Args:
384
oauth_consumer_key (str): Twitter OAuth consumer key
385
oauth_consumer_secret (str): Twitter OAuth consumer secret
386
oauth_access_token (str): Twitter OAuth access token
387
oauth_access_token_secret (str): Twitter OAuth access token secret
388
filter_tracks (list, optional): Keywords/hashtags to track
389
filter_follow (list, optional): User IDs to follow
390
filter_locations (list, optional): Geographic bounding boxes
391
index (str): Target ElasticSearch index. Default: "twitter"
392
type (str): Target document type. Default: "tweet"
393
bulk_size (int): Tweets per bulk request. Default: 100
394
drop_threshold (int): Drop tweets if queue exceeds threshold. Default: 10
395
**kwargs: Additional Twitter river parameters
396
"""
397
pass
398
399
# Twitter river for brand monitoring
400
from pyes import TwitterRiver
401
402
# Track specific keywords and hashtags
403
twitter_river = TwitterRiver(
404
oauth_consumer_key="your_consumer_key",
405
oauth_consumer_secret="your_consumer_secret",
406
oauth_access_token="your_access_token",
407
oauth_access_token_secret="your_access_token_secret",
408
filter_tracks=["elasticsearch", "python", "bigdata", "#elasticsearch"],
409
index="social_media",
410
type="tweet",
411
bulk_size=200,
412
drop_threshold=50
413
)
414
415
# Twitter river following specific users
416
user_twitter_river = TwitterRiver(
417
oauth_consumer_key="your_consumer_key",
418
oauth_consumer_secret="your_consumer_secret",
419
oauth_access_token="your_access_token",
420
oauth_access_token_secret="your_access_token_secret",
421
filter_follow=["783214", "6253282", "16121831"], # Twitter user IDs
422
index="user_tweets",
423
type="tweet"
424
)
425
426
# Geographic Twitter river for location-based analysis
427
geo_twitter_river = TwitterRiver(
428
oauth_consumer_key="your_consumer_key",
429
oauth_consumer_secret="your_consumer_secret",
430
oauth_access_token="your_access_token",
431
oauth_access_token_secret="your_access_token_secret",
432
filter_locations=[
433
[-74.0059, 40.7128, -73.9352, 40.7589] # NYC bounding box
434
],
435
index="geo_tweets",
436
type="geo_tweet"
437
)
438
439
es.create_river(twitter_river, "twitter_monitoring_river")
440
es.create_river(geo_twitter_river, "twitter_geo_river")
441
```
442
443
## Custom River Implementation
444
445
### Creating Custom Rivers
446
447
```python { .api }
448
# Create custom river for specific data sources
449
class CustomAPIRiver(River):
450
"""
451
Custom river for importing data from REST APIs.
452
453
Example implementation for a custom data source.
454
"""
455
456
def __init__(self, api_url=None, api_key=None, endpoint=None,
457
poll_interval="60s", index=None, type=None,
458
bulk_size=100, **kwargs):
459
"""
460
Initialize CustomAPIRiver.
461
462
Args:
463
api_url (str): Base API URL
464
api_key (str): API authentication key
465
endpoint (str): API endpoint to poll
466
poll_interval (str): Polling interval. Default: "60s"
467
index (str): Target ElasticSearch index
468
type (str): Target document type
469
bulk_size (int): Documents per bulk request. Default: 100
470
**kwargs: Additional custom river parameters
471
"""
472
super().__init__(index, type, **kwargs)
473
self.api_url = api_url
474
self.api_key = api_key
475
self.endpoint = endpoint
476
self.poll_interval = poll_interval
477
self.bulk_size = bulk_size
478
479
def serialize(self):
480
"""Serialize custom river configuration."""
481
return {
482
"type": "custom_api",
483
"custom_api": {
484
"api_url": self.api_url,
485
"api_key": self.api_key,
486
"endpoint": self.endpoint,
487
"poll_interval": self.poll_interval,
488
"bulk_size": self.bulk_size
489
},
490
"index": {
491
"index": self.index_name,
492
"type": self.type_name,
493
"bulk_size": self.bulk_size
494
}
495
}
496
497
# RSS/Atom feed river
498
class RSSRiver(River):
499
"""Custom river for RSS/Atom feeds."""
500
501
def __init__(self, feed_url=None, poll_interval="300s",
502
index="rss", type="article", **kwargs):
503
super().__init__(index, type, **kwargs)
504
self.feed_url = feed_url
505
self.poll_interval = poll_interval
506
507
def serialize(self):
508
return {
509
"type": "rss",
510
"rss": {
511
"url": self.feed_url,
512
"poll_interval": self.poll_interval
513
},
514
"index": {
515
"index": self.index_name,
516
"type": self.type_name
517
}
518
}
519
520
# File system river
521
class FileSystemRiver(River):
522
"""Custom river for monitoring file system changes."""
523
524
def __init__(self, directory=None, pattern="*", recursive=True,
525
index="files", type="file", **kwargs):
526
super().__init__(index, type, **kwargs)
527
self.directory = directory
528
self.pattern = pattern
529
self.recursive = recursive
530
531
def serialize(self):
532
return {
533
"type": "fs",
534
"fs": {
535
"directory": self.directory,
536
"pattern": self.pattern,
537
"recursive": self.recursive
538
},
539
"index": {
540
"index": self.index_name,
541
"type": self.type_name
542
}
543
}
544
545
# Usage of custom rivers
546
api_river = CustomAPIRiver(
547
api_url="https://api.example.com",
548
api_key="your_api_key",
549
endpoint="/v1/data",
550
poll_interval="120s",
551
index="api_data",
552
type="api_record"
553
)
554
555
rss_river = RSSRiver(
556
feed_url="https://feeds.example.com/news.xml",
557
poll_interval="600s", # Check every 10 minutes
558
index="news",
559
type="article"
560
)
561
562
fs_river = FileSystemRiver(
563
directory="/var/log/application",
564
pattern="*.log",
565
recursive=True,
566
index="log_files",
567
type="log_file"
568
)
569
```
570
571
## River Management Operations
572
573
### River Lifecycle Management
574
575
```python { .api }
576
# River management functions
577
def manage_rivers(es):
578
"""Comprehensive river management operations."""
579
580
# Create rivers
581
def create_data_rivers():
582
"""Create multiple rivers for different data sources."""
583
584
# Database river
585
db_river = JDBCRiver(
586
driver="com.mysql.jdbc.Driver",
587
url="jdbc:mysql://db.example.com/app_db",
588
user="river_user",
589
password="river_password",
590
sql="SELECT * FROM products WHERE updated_at > ?",
591
index="products",
592
type="product",
593
strategy="column",
594
column_name="updated_at"
595
)
596
597
# Message queue river
598
mq_river = RabbitMQRiver(
599
host="mq.example.com",
600
queue="events_queue",
601
index="events",
602
type="event"
603
)
604
605
# Social media river
606
social_river = TwitterRiver(
607
oauth_consumer_key="key",
608
oauth_consumer_secret="secret",
609
oauth_access_token="token",
610
oauth_access_token_secret="token_secret",
611
filter_tracks=["#myapp", "mycompany"],
612
index="social",
613
type="tweet"
614
)
615
616
# Create all rivers
617
rivers = {
618
"product_sync_river": db_river,
619
"events_river": mq_river,
620
"social_monitoring_river": social_river
621
}
622
623
for river_name, river_config in rivers.items():
624
try:
625
es.create_river(river_config, river_name)
626
print(f"Created river: {river_name}")
627
except Exception as e:
628
print(f"Failed to create river {river_name}: {e}")
629
630
return rivers
631
632
# Monitor river status
633
def monitor_river_status():
634
"""Monitor the status of all rivers."""
635
636
try:
637
# Get cluster state to see rivers
638
cluster_state = es.cluster.state()
639
640
# Check river nodes
641
if 'nodes' in cluster_state:
642
for node_id, node_info in cluster_state['nodes'].items():
643
if 'rivers' in node_info:
644
print(f"Node {node_id} rivers:")
645
for river_name, river_info in node_info['rivers'].items():
646
print(f" - {river_name}: {river_info.get('status', 'unknown')}")
647
648
# Get river statistics (if available)
649
river_stats = es.indices.stats(indices=["_river"])
650
if river_stats:
651
print("River statistics:")
652
for stat_name, stat_value in river_stats.items():
653
print(f" {stat_name}: {stat_value}")
654
655
except Exception as e:
656
print(f"Error monitoring rivers: {e}")
657
658
# Clean up rivers
659
def cleanup_rivers(river_names):
660
"""Clean up specified rivers."""
661
662
for river_name in river_names:
663
try:
664
# Delete the river
665
es.delete_river(None, river_name)
666
print(f"Deleted river: {river_name}")
667
668
# Clean up river metadata index
669
es.indices.delete_index(f"_river_{river_name}")
670
print(f"Cleaned up river metadata: {river_name}")
671
672
except Exception as e:
673
print(f"Error cleaning up river {river_name}: {e}")
674
675
# River health check
676
def river_health_check():
677
"""Perform health check on rivers."""
678
679
health_status = {}
680
681
try:
682
# Check if river indices exist
683
river_indices = es.indices.status(indices=["_river"])
684
685
for index_name, index_info in river_indices.get('indices', {}).items():
686
river_name = index_name.replace('_river_', '')
687
688
# Check index health
689
health = index_info.get('health', 'unknown')
690
doc_count = index_info.get('docs', {}).get('num_docs', 0)
691
692
health_status[river_name] = {
693
'health': health,
694
'document_count': doc_count,
695
'last_check': '2023-12-01T10:30:00Z'
696
}
697
698
except Exception as e:
699
print(f"Error during river health check: {e}")
700
701
return health_status
702
703
# Execute management operations
704
rivers = create_data_rivers()
705
monitor_river_status()
706
health_status = river_health_check()
707
708
return rivers, health_status
709
710
# Execute river management
711
rivers, health = manage_rivers(es)
712
```
713
714
### River Configuration Patterns
715
716
```python { .api }
717
# Common river configuration patterns
718
def river_configuration_patterns():
719
"""Common patterns for river configurations."""
720
721
# 1. High-throughput river configuration
722
def high_throughput_config():
723
"""Configuration for high-volume data streams."""
724
725
return JDBCRiver(
726
driver="com.mysql.jdbc.Driver",
727
url="jdbc:mysql://db.example.com/large_db",
728
user="bulk_user",
729
password="bulk_password",
730
sql="SELECT * FROM transactions WHERE processed_at > ?",
731
index="transactions",
732
type="transaction",
733
bulk_size=5000, # Large bulk size
734
bulk_timeout="30s", # Longer timeout
735
max_bulk_requests=10, # Limit concurrent requests
736
poll="10s", # Frequent polling
737
strategy="column",
738
column_name="processed_at"
739
)
740
741
# 2. Low-latency river configuration
742
def low_latency_config():
743
"""Configuration for real-time data requirements."""
744
745
return RabbitMQRiver(
746
host="realtime-mq.example.com",
747
queue="realtime_events",
748
index="realtime",
749
type="event",
750
bulk_size=10, # Small bulk size
751
bulk_timeout="1s", # Fast timeout
752
ordered=True # Maintain order
753
)
754
755
# 3. Fault-tolerant river configuration
756
def fault_tolerant_config():
757
"""Configuration with enhanced error handling."""
758
759
return MongoDBRiver(
760
host="mongo-cluster.example.com",
761
db="production_db",
762
collection="critical_data",
763
index="critical",
764
type="data",
765
bulk_size=1000,
766
bulk_timeout="60s",
767
throttle_size=10000, # Throttle under high load
768
# Enhanced retry configuration
769
retry_count=5,
770
retry_delay="30s"
771
)
772
773
# 4. Filtered river configuration
774
def filtered_config():
775
"""Configuration with content filtering."""
776
777
return CouchDBRiver(
778
couchdb_host="filtered-couch.example.com",
779
couchdb_db="content_db",
780
couchdb_filter="content/published_only", # Custom filter
781
es_index="published_content",
782
es_type="content",
783
bulk_size=500
784
)
785
786
return {
787
"high_throughput": high_throughput_config(),
788
"low_latency": low_latency_config(),
789
"fault_tolerant": fault_tolerant_config(),
790
"filtered": filtered_config()
791
}
792
793
# Apply configuration patterns
794
configs = river_configuration_patterns()
795
796
for config_name, river_config in configs.items():
797
river_name = f"{config_name}_river"
798
es.create_river(river_config, river_name)
799
print(f"Created {config_name} river: {river_name}")
800
```
801
802
## Performance and Monitoring
803
804
### River Optimization
805
806
```python { .api }
807
# River performance optimization strategies
808
def optimize_river_performance():
809
"""Best practices for river performance optimization."""
810
811
# 1. Bulk size optimization
812
def calculate_optimal_bulk_size(doc_size_kb, network_latency_ms, target_throughput):
813
"""Calculate optimal bulk size based on document characteristics."""
814
815
# Rule of thumb: aim for 5-15MB bulk requests
816
target_bulk_mb = 10
817
optimal_bulk_size = int((target_bulk_mb * 1024) / doc_size_kb)
818
819
# Adjust for network latency
820
if network_latency_ms > 100:
821
optimal_bulk_size = min(optimal_bulk_size, 1000)
822
elif network_latency_ms < 20:
823
optimal_bulk_size = max(optimal_bulk_size, 5000)
824
825
return max(100, min(optimal_bulk_size, 10000)) # Reasonable bounds
826
827
# 2. Polling optimization
828
def optimize_polling_interval(change_frequency, data_importance):
829
"""Determine optimal polling interval."""
830
831
if data_importance == "critical":
832
return "5s" if change_frequency == "high" else "30s"
833
elif data_importance == "normal":
834
return "30s" if change_frequency == "high" else "300s"
835
else: # low importance
836
return "300s" if change_frequency == "high" else "3600s"
837
838
# 3. Memory optimization
839
def memory_optimized_river():
840
"""River configuration optimized for memory usage."""
841
842
return JDBCRiver(
843
driver="com.mysql.jdbc.Driver",
844
url="jdbc:mysql://db.example.com/app_db",
845
user="river_user",
846
password="river_password",
847
sql="SELECT id, title, content FROM articles WHERE updated_at > ?",
848
index="articles",
849
type="article",
850
bulk_size=2000, # Balanced bulk size
851
bulk_timeout="45s", # Reasonable timeout
852
max_bulk_requests=5, # Limit memory usage
853
fetch_size=1000, # JDBC fetch size
854
strategy="column"
855
)
856
857
# 4. Network optimization
858
def network_optimized_river():
859
"""River configuration optimized for network efficiency."""
860
861
return MongoDBRiver(
862
host="remote-mongo.example.com",
863
db="remote_db",
864
collection="data",
865
index="remote_data",
866
type="record",
867
bulk_size=5000, # Larger bulks for network efficiency
868
bulk_timeout="120s", # Longer timeout for network delays
869
throttle_size=50000, # Higher throttle for batch processing
870
# Connection optimization
871
socket_timeout=60000,
872
connection_timeout=30000
873
)
874
875
return {
876
"memory_optimized": memory_optimized_river(),
877
"network_optimized": network_optimized_river()
878
}
879
880
# River monitoring and alerting
881
def setup_river_monitoring():
882
"""Set up monitoring and alerting for rivers."""
883
884
import time
885
import logging
886
887
logging.basicConfig(level=logging.INFO)
888
logger = logging.getLogger("river_monitor")
889
890
def monitor_river_metrics(river_name, es_client):
891
"""Monitor key river metrics."""
892
893
try:
894
# Check river index document count
895
stats = es_client.indices.stats(indices=[f"_river_{river_name}"])
896
897
metrics = {
898
"river_name": river_name,
899
"timestamp": int(time.time()),
900
"document_count": 0,
901
"indexing_rate": 0,
902
"error_count": 0,
903
"status": "unknown"
904
}
905
906
if stats and 'indices' in stats:
907
river_stats = stats['indices'].get(f"_river_{river_name}", {})
908
metrics["document_count"] = river_stats.get('total', {}).get('docs', {}).get('count', 0)
909
910
# Log metrics
911
logger.info(f"River metrics: {metrics}")
912
913
# Check for alerts
914
if metrics["error_count"] > 10:
915
logger.error(f"High error count for river {river_name}: {metrics['error_count']}")
916
917
if metrics["indexing_rate"] < 1: # Less than 1 doc/second
918
logger.warning(f"Low indexing rate for river {river_name}: {metrics['indexing_rate']}")
919
920
return metrics
921
922
except Exception as e:
923
logger.error(f"Error monitoring river {river_name}: {e}")
924
return None
925
926
def setup_alerting():
927
"""Set up alerting thresholds."""
928
929
alert_config = {
930
"error_threshold": 50,
931
"performance_threshold": 100, # docs per minute
932
"downtime_threshold": 300, # seconds
933
"disk_usage_threshold": 85 # percent
934
}
935
936
return alert_config
937
938
return monitor_river_metrics, setup_alerting
939
```
940
941
## Migration from Rivers
942
943
### Modern Alternatives
944
945
```python { .api }
946
# Migration patterns from rivers to modern alternatives
947
def river_migration_patterns():
948
"""Patterns for migrating from rivers to modern solutions."""
949
950
# 1. Replace JDBC River with custom Python script
951
def jdbc_river_replacement():
952
"""Replace JDBC river with custom Python indexing."""
953
954
import mysql.connector
955
from pyes import ES
956
import time
957
import logging
958
959
class DatabaseIndexer:
960
def __init__(self, db_config, es_config):
961
self.db_config = db_config
962
self.es = ES(**es_config)
963
self.logger = logging.getLogger("db_indexer")
964
965
def run_indexing_loop(self):
966
"""Run continuous indexing loop."""
967
968
last_updated = None
969
970
while True:
971
try:
972
# Connect to database
973
conn = mysql.connector.connect(**self.db_config)
974
cursor = conn.cursor(dictionary=True)
975
976
# Query for new/updated records
977
if last_updated:
978
query = "SELECT * FROM products WHERE updated_at > %s"
979
cursor.execute(query, (last_updated,))
980
else:
981
query = "SELECT * FROM products"
982
cursor.execute(query)
983
984
# Bulk index documents
985
bulk_docs = []
986
for row in cursor:
987
doc = {
988
"product_id": row["id"],
989
"name": row["name"],
990
"description": row["description"],
991
"price": float(row["price"]),
992
"updated_at": row["updated_at"].isoformat()
993
}
994
bulk_docs.append(doc)
995
996
if len(bulk_docs) >= 1000:
997
self.bulk_index(bulk_docs)
998
bulk_docs = []
999
1000
# Index remaining documents
1001
if bulk_docs:
1002
self.bulk_index(bulk_docs)
1003
1004
# Update last processed timestamp
1005
if cursor.rowcount > 0:
1006
cursor.execute("SELECT MAX(updated_at) as max_updated FROM products")
1007
result = cursor.fetchone()
1008
last_updated = result["max_updated"]
1009
1010
cursor.close()
1011
conn.close()
1012
1013
self.logger.info(f"Processed {cursor.rowcount} records")
1014
1015
except Exception as e:
1016
self.logger.error(f"Indexing error: {e}")
1017
1018
# Wait before next iteration
1019
time.sleep(60) # 1 minute polling
1020
1021
def bulk_index(self, docs):
1022
"""Bulk index documents."""
1023
for doc in docs:
1024
self.es.index(doc, "products", "product",
1025
id=doc["product_id"], bulk=True)
1026
self.es.flush_bulk()
1027
1028
return DatabaseIndexer
1029
1030
# 2. Replace RabbitMQ River with Logstash configuration
1031
def rabbitmq_logstash_config():
1032
"""Logstash configuration to replace RabbitMQ river."""
1033
1034
logstash_config = """
1035
input {
1036
rabbitmq {
1037
host => "rabbitmq.example.com"
1038
port => 5672
1039
user => "logstash_user"
1040
password => "logstash_password"
1041
queue => "events_queue"
1042
durable => true
1043
}
1044
}
1045
1046
filter {
1047
json {
1048
source => "message"
1049
}
1050
1051
date {
1052
match => [ "timestamp", "ISO8601" ]
1053
}
1054
1055
mutate {
1056
add_field => { "[@metadata][index]" => "events" }
1057
add_field => { "[@metadata][type]" => "event" }
1058
}
1059
}
1060
1061
output {
1062
elasticsearch {
1063
hosts => ["elasticsearch.example.com:9200"]
1064
index => "%{[@metadata][index]}"
1065
document_type => "%{[@metadata][type]}"
1066
}
1067
}
1068
"""
1069
1070
return logstash_config
1071
1072
# 3. Replace Twitter River with Beats
1073
def twitter_beats_config():
1074
"""Filebeat configuration for Twitter data."""
1075
1076
filebeat_config = """
1077
filebeat.inputs:
1078
- type: log
1079
enabled: true
1080
paths:
1081
- /var/log/twitter/*.json
1082
json.keys_under_root: true
1083
json.add_error_key: true
1084
1085
output.elasticsearch:
1086
hosts: ["elasticsearch.example.com:9200"]
1087
index: "social-media-%{+yyyy.MM.dd}"
1088
template.name: "social-media"
1089
template.pattern: "social-media-*"
1090
"""
1091
1092
return filebeat_config
1093
1094
return {
1095
"jdbc_replacement": jdbc_river_replacement(),
1096
"logstash_config": rabbitmq_logstash_config(),
1097
"beats_config": twitter_beats_config()
1098
}
1099
1100
# Execute migration
1101
migration_patterns = river_migration_patterns()
1102
```
1103
1104
Rivers in PyES provide powerful data ingestion capabilities for legacy ElasticSearch deployments, enabling real-time synchronization with databases, message queues, and external APIs. While deprecated in newer ElasticSearch versions, they remain useful for older systems and can be migrated to modern alternatives like Beats, Logstash, or custom indexing solutions.