or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-operations.mdclient.mdfacets-aggregations.mdfilters.mdindex.mdmappings.mdquery-dsl.mdrivers.md

rivers.mddocs/

0

# PyES Rivers for Data Streaming

1

2

## Overview

3

4

Rivers in PyES provide automated data ingestion from external sources into ElasticSearch. Rivers are long-running processes that continuously pull data from external systems and index it into ElasticSearch, enabling real-time or near-real-time data synchronization. While rivers are deprecated in newer ElasticSearch versions in favor of Beats and Logstash, PyES still provides comprehensive river support for legacy systems.

5

6

**Note**: Rivers were deprecated in ElasticSearch 2.0+. For modern ElasticSearch versions, consider using Beats, Logstash, or custom indexing solutions.

7

8

## Base River Class

9

10

### River

11

12

```python { .api }

13

class River:

14

"""

15

Base class for all ElasticSearch rivers.

16

17

Rivers provide continuous data ingestion from external sources

18

into ElasticSearch indices.

19

"""

20

21

def __init__(self, index_name=None, type_name=None, **kwargs):

22

"""

23

Initialize base river.

24

25

Args:

26

index_name (str, optional): Target index name

27

type_name (str, optional): Target document type

28

**kwargs: River-specific configuration parameters

29

"""

30

pass

31

32

def serialize(self):

33

"""

34

Serialize river configuration to ElasticSearch format.

35

36

Returns:

37

dict: River configuration dictionary

38

"""

39

pass

40

41

# Basic river usage

42

from pyes import ES

43

44

es = ES('localhost:9200')

45

46

# Create and start a river

47

river_config = create_river_configuration()

48

es.create_river(river_config, "my_data_river")

49

50

# Monitor river status

51

river_status = es.cluster.state(filter_nodes=True)

52

53

# Delete river when no longer needed

54

es.delete_river(river_config, "my_data_river")

55

```

56

57

## Database Rivers

58

59

### JDBC River

60

61

```python { .api }

62

class JDBCRiver(River):

63

"""

64

JDBC river for importing data from relational databases.

65

66

Connects to SQL databases and continuously imports data based

67

on SQL queries and update detection strategies.

68

"""

69

70

def __init__(self, driver=None, url=None, user=None, password=None,

71

sql=None, index=None, type=None, bulk_size=100,

72

bulk_timeout="60s", max_bulk_requests=30, poll="5s",

73

strategy="simple", **kwargs):

74

"""

75

Initialize JDBCRiver.

76

77

Args:

78

driver (str): JDBC driver class name

79

url (str): Database connection URL

80

user (str): Database username

81

password (str): Database password

82

sql (str): SQL query to fetch data

83

index (str): Target ElasticSearch index

84

type (str): Target document type

85

bulk_size (int): Number of documents per bulk request. Default: 100

86

bulk_timeout (str): Bulk request timeout. Default: "60s"

87

max_bulk_requests (int): Maximum concurrent bulk requests. Default: 30

88

poll (str): Polling interval for new data. Default: "5s"

89

strategy (str): Update detection strategy. Default: "simple"

90

**kwargs: Additional JDBC river parameters

91

"""

92

pass

93

94

# JDBC river for MySQL data import

95

from pyes import JDBCRiver

96

97

# Configure MySQL river

98

mysql_river = JDBCRiver(

99

driver="com.mysql.jdbc.Driver",

100

url="jdbc:mysql://localhost:3306/mydb",

101

user="db_user",

102

password="db_password",

103

sql="SELECT id, name, email, created_at FROM users WHERE updated_at > ?",

104

index="users",

105

type="user",

106

bulk_size=1000,

107

poll="30s",

108

strategy="column", # Use column-based update detection

109

column_name="updated_at" # Track updates by this column

110

)

111

112

# Create and start the river

113

es.create_river(mysql_river, "mysql_users_river")

114

115

# PostgreSQL river example

116

postgres_river = JDBCRiver(

117

driver="org.postgresql.Driver",

118

url="jdbc:postgresql://localhost:5432/mydb",

119

user="postgres_user",

120

password="postgres_password",

121

sql="SELECT product_id, name, description, price FROM products",

122

index="catalog",

123

type="product",

124

bulk_size=500,

125

poll="60s"

126

)

127

128

es.create_river(postgres_river, "postgres_products_river")

129

```

130

131

### MongoDB River

132

133

```python { .api }

134

class MongoDBRiver(River):

135

"""

136

MongoDB river for importing data from MongoDB collections.

137

138

Provides real-time synchronization with MongoDB using oplog tailing

139

or periodic collection scanning.

140

"""

141

142

def __init__(self, host="localhost", port=27017, db=None, collection=None,

143

gridfs=False, filter=None, index=None, type=None,

144

bulk_size=100, bulk_timeout="10s", throttle_size=-1,

145

initial_timestamp=None, **kwargs):

146

"""

147

Initialize MongoDBRiver.

148

149

Args:

150

host (str): MongoDB host. Default: "localhost"

151

port (int): MongoDB port. Default: 27017

152

db (str): MongoDB database name

153

collection (str): MongoDB collection name

154

gridfs (bool): Import GridFS files. Default: False

155

filter (dict, optional): MongoDB query filter

156

index (str): Target ElasticSearch index

157

type (str): Target document type

158

bulk_size (int): Documents per bulk request. Default: 100

159

bulk_timeout (str): Bulk timeout. Default: "10s"

160

throttle_size (int): Throttle size (-1 for no throttling). Default: -1

161

initial_timestamp (dict, optional): Starting oplog timestamp

162

**kwargs: Additional MongoDB river parameters

163

"""

164

pass

165

166

# MongoDB river for user collection

167

from pyes import MongoDBRiver

168

169

# Basic MongoDB river

170

mongo_river = MongoDBRiver(

171

host="mongodb-server",

172

port=27017,

173

db="application_db",

174

collection="users",

175

index="users",

176

type="user_profile",

177

bulk_size=1000

178

)

179

180

# MongoDB river with filtering and authentication

181

filtered_mongo_river = MongoDBRiver(

182

host="secure-mongo.example.com",

183

port=27017,

184

db="ecommerce",

185

collection="products",

186

filter={"status": "active", "price": {"$gt": 0}}, # Only active products with price > 0

187

index="catalog",

188

type="product",

189

bulk_size=500,

190

credentials={

191

"user": "mongo_user",

192

"password": "mongo_password"

193

}

194

)

195

196

# GridFS river for file content

197

gridfs_river = MongoDBRiver(

198

host="localhost",

199

db="files_db",

200

gridfs=True, # Import GridFS files

201

index="documents",

202

type="file",

203

bulk_size=100

204

)

205

206

es.create_river(mongo_river, "mongo_users_river")

207

es.create_river(gridfs_river, "gridfs_files_river")

208

```

209

210

## NoSQL and Document Rivers

211

212

### CouchDB River

213

214

```python { .api }

215

class CouchDBRiver(River):

216

"""

217

CouchDB river for replicating CouchDB databases to ElasticSearch.

218

219

Provides continuous replication using CouchDB's change feed mechanism.

220

"""

221

222

def __init__(self, couchdb_host="localhost", couchdb_port=5984,

223

couchdb_db=None, couchdb_user=None, couchdb_password=None,

224

couchdb_filter=None, es_index=None, es_type=None,

225

bulk_size=100, bulk_timeout="10s", **kwargs):

226

"""

227

Initialize CouchDBRiver.

228

229

Args:

230

couchdb_host (str): CouchDB host. Default: "localhost"

231

couchdb_port (int): CouchDB port. Default: 5984

232

couchdb_db (str): CouchDB database name

233

couchdb_user (str, optional): CouchDB username

234

couchdb_password (str, optional): CouchDB password

235

couchdb_filter (str, optional): CouchDB filter function

236

es_index (str): Target ElasticSearch index

237

es_type (str): Target document type

238

bulk_size (int): Documents per bulk request. Default: 100

239

bulk_timeout (str): Bulk timeout. Default: "10s"

240

**kwargs: Additional CouchDB river parameters

241

"""

242

pass

243

244

# CouchDB replication river

245

from pyes import CouchDBRiver

246

247

# Basic CouchDB river

248

couchdb_river = CouchDBRiver(

249

couchdb_host="couchdb.example.com",

250

couchdb_port=5984,

251

couchdb_db="blog_posts",

252

es_index="blog",

253

es_type="post",

254

bulk_size=200

255

)

256

257

# CouchDB river with authentication and filtering

258

secure_couchdb_river = CouchDBRiver(

259

couchdb_host="secure-couch.example.com",

260

couchdb_port=6984,

261

couchdb_db="documents",

262

couchdb_user="couch_user",

263

couchdb_password="couch_password",

264

couchdb_filter="published_docs/by_status", # Custom filter function

265

es_index="public_docs",

266

es_type="document",

267

bulk_size=500,

268

bulk_timeout="30s"

269

)

270

271

es.create_river(couchdb_river, "couchdb_blog_river")

272

es.create_river(secure_couchdb_river, "secure_couchdb_river")

273

```

274

275

## Message Queue Rivers

276

277

### RabbitMQ River

278

279

```python { .api }

280

class RabbitMQRiver(River):

281

"""

282

RabbitMQ river for consuming messages from RabbitMQ queues.

283

284

Consumes messages from RabbitMQ and indexes them as documents

285

in ElasticSearch, enabling real-time message indexing.

286

"""

287

288

def __init__(self, host="localhost", port=5672, user="guest",

289

password="guest", vhost="/", queue=None, exchange=None,

290

routing_key=None, exchange_type="direct", durable=True,

291

index=None, type=None, bulk_size=100, bulk_timeout="5s",

292

ordered=False, **kwargs):

293

"""

294

Initialize RabbitMQRiver.

295

296

Args:

297

host (str): RabbitMQ host. Default: "localhost"

298

port (int): RabbitMQ port. Default: 5672

299

user (str): RabbitMQ username. Default: "guest"

300

password (str): RabbitMQ password. Default: "guest"

301

vhost (str): RabbitMQ virtual host. Default: "/"

302

queue (str): Queue name to consume from

303

exchange (str, optional): Exchange name

304

routing_key (str, optional): Routing key pattern

305

exchange_type (str): Exchange type. Default: "direct"

306

durable (bool): Durable queue. Default: True

307

index (str): Target ElasticSearch index

308

type (str): Target document type

309

bulk_size (int): Messages per bulk request. Default: 100

310

bulk_timeout (str): Bulk timeout. Default: "5s"

311

ordered (bool): Maintain message order. Default: False

312

**kwargs: Additional RabbitMQ river parameters

313

"""

314

pass

315

316

# RabbitMQ river for log processing

317

from pyes import RabbitMQRiver

318

319

# Basic RabbitMQ river

320

rabbitmq_river = RabbitMQRiver(

321

host="rabbitmq.example.com",

322

port=5672,

323

user="log_consumer",

324

password="consumer_password",

325

queue="application_logs",

326

index="logs",

327

type="log_entry",

328

bulk_size=500,

329

bulk_timeout="10s"

330

)

331

332

# RabbitMQ river with exchange and routing

333

exchange_river = RabbitMQRiver(

334

host="localhost",

335

user="event_consumer",

336

password="event_password",

337

exchange="events",

338

exchange_type="topic",

339

routing_key="user.*.created", # Route user creation events

340

queue="user_events_queue",

341

index="user_events",

342

type="user_event",

343

durable=True,

344

ordered=True # Maintain event order

345

)

346

347

# RabbitMQ river for real-time notifications

348

notification_river = RabbitMQRiver(

349

host="message-broker.example.com",

350

vhost="/notifications",

351

queue="notification_queue",

352

index="notifications",

353

type="notification",

354

bulk_size=100,

355

bulk_timeout="2s" # Fast processing for real-time notifications

356

)

357

358

es.create_river(rabbitmq_river, "rabbitmq_logs_river")

359

es.create_river(exchange_river, "rabbitmq_events_river")

360

```

361

362

## Social Media Rivers

363

364

### Twitter River

365

366

```python { .api }

367

class TwitterRiver(River):

368

"""

369

Twitter river for streaming Twitter data into ElasticSearch.

370

371

Connects to Twitter Streaming API to index tweets in real-time

372

based on search terms, users, or locations.

373

"""

374

375

def __init__(self, oauth_consumer_key=None, oauth_consumer_secret=None,

376

oauth_access_token=None, oauth_access_token_secret=None,

377

filter_tracks=None, filter_follow=None, filter_locations=None,

378

index="twitter", type="tweet", bulk_size=100,

379

drop_threshold=10, **kwargs):

380

"""

381

Initialize TwitterRiver.

382

383

Args:

384

oauth_consumer_key (str): Twitter OAuth consumer key

385

oauth_consumer_secret (str): Twitter OAuth consumer secret

386

oauth_access_token (str): Twitter OAuth access token

387

oauth_access_token_secret (str): Twitter OAuth access token secret

388

filter_tracks (list, optional): Keywords/hashtags to track

389

filter_follow (list, optional): User IDs to follow

390

filter_locations (list, optional): Geographic bounding boxes

391

index (str): Target ElasticSearch index. Default: "twitter"

392

type (str): Target document type. Default: "tweet"

393

bulk_size (int): Tweets per bulk request. Default: 100

394

drop_threshold (int): Drop tweets if queue exceeds threshold. Default: 10

395

**kwargs: Additional Twitter river parameters

396

"""

397

pass

398

399

# Twitter river for brand monitoring

400

from pyes import TwitterRiver

401

402

# Track specific keywords and hashtags

403

twitter_river = TwitterRiver(

404

oauth_consumer_key="your_consumer_key",

405

oauth_consumer_secret="your_consumer_secret",

406

oauth_access_token="your_access_token",

407

oauth_access_token_secret="your_access_token_secret",

408

filter_tracks=["elasticsearch", "python", "bigdata", "#elasticsearch"],

409

index="social_media",

410

type="tweet",

411

bulk_size=200,

412

drop_threshold=50

413

)

414

415

# Twitter river following specific users

416

user_twitter_river = TwitterRiver(

417

oauth_consumer_key="your_consumer_key",

418

oauth_consumer_secret="your_consumer_secret",

419

oauth_access_token="your_access_token",

420

oauth_access_token_secret="your_access_token_secret",

421

filter_follow=["783214", "6253282", "16121831"], # Twitter user IDs

422

index="user_tweets",

423

type="tweet"

424

)

425

426

# Geographic Twitter river for location-based analysis

427

geo_twitter_river = TwitterRiver(

428

oauth_consumer_key="your_consumer_key",

429

oauth_consumer_secret="your_consumer_secret",

430

oauth_access_token="your_access_token",

431

oauth_access_token_secret="your_access_token_secret",

432

filter_locations=[

433

[-74.0059, 40.7128, -73.9352, 40.7589] # NYC bounding box

434

],

435

index="geo_tweets",

436

type="geo_tweet"

437

)

438

439

es.create_river(twitter_river, "twitter_monitoring_river")

440

es.create_river(geo_twitter_river, "twitter_geo_river")

441

```

442

443

## Custom River Implementation

444

445

### Creating Custom Rivers

446

447

```python { .api }

448

# Create custom river for specific data sources

449

class CustomAPIRiver(River):

450

"""

451

Custom river for importing data from REST APIs.

452

453

Example implementation for a custom data source.

454

"""

455

456

def __init__(self, api_url=None, api_key=None, endpoint=None,

457

poll_interval="60s", index=None, type=None,

458

bulk_size=100, **kwargs):

459

"""

460

Initialize CustomAPIRiver.

461

462

Args:

463

api_url (str): Base API URL

464

api_key (str): API authentication key

465

endpoint (str): API endpoint to poll

466

poll_interval (str): Polling interval. Default: "60s"

467

index (str): Target ElasticSearch index

468

type (str): Target document type

469

bulk_size (int): Documents per bulk request. Default: 100

470

**kwargs: Additional custom river parameters

471

"""

472

super().__init__(index, type, **kwargs)

473

self.api_url = api_url

474

self.api_key = api_key

475

self.endpoint = endpoint

476

self.poll_interval = poll_interval

477

self.bulk_size = bulk_size

478

479

def serialize(self):

480

"""Serialize custom river configuration."""

481

return {

482

"type": "custom_api",

483

"custom_api": {

484

"api_url": self.api_url,

485

"api_key": self.api_key,

486

"endpoint": self.endpoint,

487

"poll_interval": self.poll_interval,

488

"bulk_size": self.bulk_size

489

},

490

"index": {

491

"index": self.index_name,

492

"type": self.type_name,

493

"bulk_size": self.bulk_size

494

}

495

}

496

497

# RSS/Atom feed river

498

class RSSRiver(River):

499

"""Custom river for RSS/Atom feeds."""

500

501

def __init__(self, feed_url=None, poll_interval="300s",

502

index="rss", type="article", **kwargs):

503

super().__init__(index, type, **kwargs)

504

self.feed_url = feed_url

505

self.poll_interval = poll_interval

506

507

def serialize(self):

508

return {

509

"type": "rss",

510

"rss": {

511

"url": self.feed_url,

512

"poll_interval": self.poll_interval

513

},

514

"index": {

515

"index": self.index_name,

516

"type": self.type_name

517

}

518

}

519

520

# File system river

521

class FileSystemRiver(River):

522

"""Custom river for monitoring file system changes."""

523

524

def __init__(self, directory=None, pattern="*", recursive=True,

525

index="files", type="file", **kwargs):

526

super().__init__(index, type, **kwargs)

527

self.directory = directory

528

self.pattern = pattern

529

self.recursive = recursive

530

531

def serialize(self):

532

return {

533

"type": "fs",

534

"fs": {

535

"directory": self.directory,

536

"pattern": self.pattern,

537

"recursive": self.recursive

538

},

539

"index": {

540

"index": self.index_name,

541

"type": self.type_name

542

}

543

}

544

545

# Usage of custom rivers

546

api_river = CustomAPIRiver(

547

api_url="https://api.example.com",

548

api_key="your_api_key",

549

endpoint="/v1/data",

550

poll_interval="120s",

551

index="api_data",

552

type="api_record"

553

)

554

555

rss_river = RSSRiver(

556

feed_url="https://feeds.example.com/news.xml",

557

poll_interval="600s", # Check every 10 minutes

558

index="news",

559

type="article"

560

)

561

562

fs_river = FileSystemRiver(

563

directory="/var/log/application",

564

pattern="*.log",

565

recursive=True,

566

index="log_files",

567

type="log_file"

568

)

569

```

570

571

## River Management Operations

572

573

### River Lifecycle Management

574

575

```python { .api }

576

# River management functions

577

def manage_rivers(es):

578

"""Comprehensive river management operations."""

579

580

# Create rivers

581

def create_data_rivers():

582

"""Create multiple rivers for different data sources."""

583

584

# Database river

585

db_river = JDBCRiver(

586

driver="com.mysql.jdbc.Driver",

587

url="jdbc:mysql://db.example.com/app_db",

588

user="river_user",

589

password="river_password",

590

sql="SELECT * FROM products WHERE updated_at > ?",

591

index="products",

592

type="product",

593

strategy="column",

594

column_name="updated_at"

595

)

596

597

# Message queue river

598

mq_river = RabbitMQRiver(

599

host="mq.example.com",

600

queue="events_queue",

601

index="events",

602

type="event"

603

)

604

605

# Social media river

606

social_river = TwitterRiver(

607

oauth_consumer_key="key",

608

oauth_consumer_secret="secret",

609

oauth_access_token="token",

610

oauth_access_token_secret="token_secret",

611

filter_tracks=["#myapp", "mycompany"],

612

index="social",

613

type="tweet"

614

)

615

616

# Create all rivers

617

rivers = {

618

"product_sync_river": db_river,

619

"events_river": mq_river,

620

"social_monitoring_river": social_river

621

}

622

623

for river_name, river_config in rivers.items():

624

try:

625

es.create_river(river_config, river_name)

626

print(f"Created river: {river_name}")

627

except Exception as e:

628

print(f"Failed to create river {river_name}: {e}")

629

630

return rivers

631

632

# Monitor river status

633

def monitor_river_status():

634

"""Monitor the status of all rivers."""

635

636

try:

637

# Get cluster state to see rivers

638

cluster_state = es.cluster.state()

639

640

# Check river nodes

641

if 'nodes' in cluster_state:

642

for node_id, node_info in cluster_state['nodes'].items():

643

if 'rivers' in node_info:

644

print(f"Node {node_id} rivers:")

645

for river_name, river_info in node_info['rivers'].items():

646

print(f" - {river_name}: {river_info.get('status', 'unknown')}")

647

648

# Get river statistics (if available)

649

river_stats = es.indices.stats(indices=["_river"])

650

if river_stats:

651

print("River statistics:")

652

for stat_name, stat_value in river_stats.items():

653

print(f" {stat_name}: {stat_value}")

654

655

except Exception as e:

656

print(f"Error monitoring rivers: {e}")

657

658

# Clean up rivers

659

def cleanup_rivers(river_names):

660

"""Clean up specified rivers."""

661

662

for river_name in river_names:

663

try:

664

# Delete the river

665

es.delete_river(None, river_name)

666

print(f"Deleted river: {river_name}")

667

668

# Clean up river metadata index

669

es.indices.delete_index(f"_river_{river_name}")

670

print(f"Cleaned up river metadata: {river_name}")

671

672

except Exception as e:

673

print(f"Error cleaning up river {river_name}: {e}")

674

675

# River health check

676

def river_health_check():

677

"""Perform health check on rivers."""

678

679

health_status = {}

680

681

try:

682

# Check if river indices exist

683

river_indices = es.indices.status(indices=["_river"])

684

685

for index_name, index_info in river_indices.get('indices', {}).items():

686

river_name = index_name.replace('_river_', '')

687

688

# Check index health

689

health = index_info.get('health', 'unknown')

690

doc_count = index_info.get('docs', {}).get('num_docs', 0)

691

692

health_status[river_name] = {

693

'health': health,

694

'document_count': doc_count,

695

'last_check': '2023-12-01T10:30:00Z'

696

}

697

698

except Exception as e:

699

print(f"Error during river health check: {e}")

700

701

return health_status

702

703

# Execute management operations

704

rivers = create_data_rivers()

705

monitor_river_status()

706

health_status = river_health_check()

707

708

return rivers, health_status

709

710

# Execute river management

711

rivers, health = manage_rivers(es)

712

```

713

714

### River Configuration Patterns

715

716

```python { .api }

717

# Common river configuration patterns

718

def river_configuration_patterns():

719

"""Common patterns for river configurations."""

720

721

# 1. High-throughput river configuration

722

def high_throughput_config():

723

"""Configuration for high-volume data streams."""

724

725

return JDBCRiver(

726

driver="com.mysql.jdbc.Driver",

727

url="jdbc:mysql://db.example.com/large_db",

728

user="bulk_user",

729

password="bulk_password",

730

sql="SELECT * FROM transactions WHERE processed_at > ?",

731

index="transactions",

732

type="transaction",

733

bulk_size=5000, # Large bulk size

734

bulk_timeout="30s", # Longer timeout

735

max_bulk_requests=10, # Limit concurrent requests

736

poll="10s", # Frequent polling

737

strategy="column",

738

column_name="processed_at"

739

)

740

741

# 2. Low-latency river configuration

742

def low_latency_config():

743

"""Configuration for real-time data requirements."""

744

745

return RabbitMQRiver(

746

host="realtime-mq.example.com",

747

queue="realtime_events",

748

index="realtime",

749

type="event",

750

bulk_size=10, # Small bulk size

751

bulk_timeout="1s", # Fast timeout

752

ordered=True # Maintain order

753

)

754

755

# 3. Fault-tolerant river configuration

756

def fault_tolerant_config():

757

"""Configuration with enhanced error handling."""

758

759

return MongoDBRiver(

760

host="mongo-cluster.example.com",

761

db="production_db",

762

collection="critical_data",

763

index="critical",

764

type="data",

765

bulk_size=1000,

766

bulk_timeout="60s",

767

throttle_size=10000, # Throttle under high load

768

# Enhanced retry configuration

769

retry_count=5,

770

retry_delay="30s"

771

)

772

773

# 4. Filtered river configuration

774

def filtered_config():

775

"""Configuration with content filtering."""

776

777

return CouchDBRiver(

778

couchdb_host="filtered-couch.example.com",

779

couchdb_db="content_db",

780

couchdb_filter="content/published_only", # Custom filter

781

es_index="published_content",

782

es_type="content",

783

bulk_size=500

784

)

785

786

return {

787

"high_throughput": high_throughput_config(),

788

"low_latency": low_latency_config(),

789

"fault_tolerant": fault_tolerant_config(),

790

"filtered": filtered_config()

791

}

792

793

# Apply configuration patterns

794

configs = river_configuration_patterns()

795

796

for config_name, river_config in configs.items():

797

river_name = f"{config_name}_river"

798

es.create_river(river_config, river_name)

799

print(f"Created {config_name} river: {river_name}")

800

```

801

802

## Performance and Monitoring

803

804

### River Optimization

805

806

```python { .api }

807

# River performance optimization strategies

808

def optimize_river_performance():

809

"""Best practices for river performance optimization."""

810

811

# 1. Bulk size optimization

812

def calculate_optimal_bulk_size(doc_size_kb, network_latency_ms, target_throughput):

813

"""Calculate optimal bulk size based on document characteristics."""

814

815

# Rule of thumb: aim for 5-15MB bulk requests

816

target_bulk_mb = 10

817

optimal_bulk_size = int((target_bulk_mb * 1024) / doc_size_kb)

818

819

# Adjust for network latency

820

if network_latency_ms > 100:

821

optimal_bulk_size = min(optimal_bulk_size, 1000)

822

elif network_latency_ms < 20:

823

optimal_bulk_size = max(optimal_bulk_size, 5000)

824

825

return max(100, min(optimal_bulk_size, 10000)) # Reasonable bounds

826

827

# 2. Polling optimization

828

def optimize_polling_interval(change_frequency, data_importance):

829

"""Determine optimal polling interval."""

830

831

if data_importance == "critical":

832

return "5s" if change_frequency == "high" else "30s"

833

elif data_importance == "normal":

834

return "30s" if change_frequency == "high" else "300s"

835

else: # low importance

836

return "300s" if change_frequency == "high" else "3600s"

837

838

# 3. Memory optimization

839

def memory_optimized_river():

840

"""River configuration optimized for memory usage."""

841

842

return JDBCRiver(

843

driver="com.mysql.jdbc.Driver",

844

url="jdbc:mysql://db.example.com/app_db",

845

user="river_user",

846

password="river_password",

847

sql="SELECT id, title, content FROM articles WHERE updated_at > ?",

848

index="articles",

849

type="article",

850

bulk_size=2000, # Balanced bulk size

851

bulk_timeout="45s", # Reasonable timeout

852

max_bulk_requests=5, # Limit memory usage

853

fetch_size=1000, # JDBC fetch size

854

strategy="column"

855

)

856

857

# 4. Network optimization

858

def network_optimized_river():

859

"""River configuration optimized for network efficiency."""

860

861

return MongoDBRiver(

862

host="remote-mongo.example.com",

863

db="remote_db",

864

collection="data",

865

index="remote_data",

866

type="record",

867

bulk_size=5000, # Larger bulks for network efficiency

868

bulk_timeout="120s", # Longer timeout for network delays

869

throttle_size=50000, # Higher throttle for batch processing

870

# Connection optimization

871

socket_timeout=60000,

872

connection_timeout=30000

873

)

874

875

return {

876

"memory_optimized": memory_optimized_river(),

877

"network_optimized": network_optimized_river()

878

}

879

880

# River monitoring and alerting

881

def setup_river_monitoring():

882

"""Set up monitoring and alerting for rivers."""

883

884

import time

885

import logging

886

887

logging.basicConfig(level=logging.INFO)

888

logger = logging.getLogger("river_monitor")

889

890

def monitor_river_metrics(river_name, es_client):

891

"""Monitor key river metrics."""

892

893

try:

894

# Check river index document count

895

stats = es_client.indices.stats(indices=[f"_river_{river_name}"])

896

897

metrics = {

898

"river_name": river_name,

899

"timestamp": int(time.time()),

900

"document_count": 0,

901

"indexing_rate": 0,

902

"error_count": 0,

903

"status": "unknown"

904

}

905

906

if stats and 'indices' in stats:

907

river_stats = stats['indices'].get(f"_river_{river_name}", {})

908

metrics["document_count"] = river_stats.get('total', {}).get('docs', {}).get('count', 0)

909

910

# Log metrics

911

logger.info(f"River metrics: {metrics}")

912

913

# Check for alerts

914

if metrics["error_count"] > 10:

915

logger.error(f"High error count for river {river_name}: {metrics['error_count']}")

916

917

if metrics["indexing_rate"] < 1: # Less than 1 doc/second

918

logger.warning(f"Low indexing rate for river {river_name}: {metrics['indexing_rate']}")

919

920

return metrics

921

922

except Exception as e:

923

logger.error(f"Error monitoring river {river_name}: {e}")

924

return None

925

926

def setup_alerting():

927

"""Set up alerting thresholds."""

928

929

alert_config = {

930

"error_threshold": 50,

931

"performance_threshold": 100, # docs per minute

932

"downtime_threshold": 300, # seconds

933

"disk_usage_threshold": 85 # percent

934

}

935

936

return alert_config

937

938

return monitor_river_metrics, setup_alerting

939

```

940

941

## Migration from Rivers

942

943

### Modern Alternatives

944

945

```python { .api }

946

# Migration patterns from rivers to modern alternatives

947

def river_migration_patterns():

948

"""Patterns for migrating from rivers to modern solutions."""

949

950

# 1. Replace JDBC River with custom Python script

951

def jdbc_river_replacement():

952

"""Replace JDBC river with custom Python indexing."""

953

954

import mysql.connector

955

from pyes import ES

956

import time

957

import logging

958

959

class DatabaseIndexer:

960

def __init__(self, db_config, es_config):

961

self.db_config = db_config

962

self.es = ES(**es_config)

963

self.logger = logging.getLogger("db_indexer")

964

965

def run_indexing_loop(self):

966

"""Run continuous indexing loop."""

967

968

last_updated = None

969

970

while True:

971

try:

972

# Connect to database

973

conn = mysql.connector.connect(**self.db_config)

974

cursor = conn.cursor(dictionary=True)

975

976

# Query for new/updated records

977

if last_updated:

978

query = "SELECT * FROM products WHERE updated_at > %s"

979

cursor.execute(query, (last_updated,))

980

else:

981

query = "SELECT * FROM products"

982

cursor.execute(query)

983

984

# Bulk index documents

985

bulk_docs = []

986

for row in cursor:

987

doc = {

988

"product_id": row["id"],

989

"name": row["name"],

990

"description": row["description"],

991

"price": float(row["price"]),

992

"updated_at": row["updated_at"].isoformat()

993

}

994

bulk_docs.append(doc)

995

996

if len(bulk_docs) >= 1000:

997

self.bulk_index(bulk_docs)

998

bulk_docs = []

999

1000

# Index remaining documents

1001

if bulk_docs:

1002

self.bulk_index(bulk_docs)

1003

1004

# Update last processed timestamp

1005

if cursor.rowcount > 0:

1006

cursor.execute("SELECT MAX(updated_at) as max_updated FROM products")

1007

result = cursor.fetchone()

1008

last_updated = result["max_updated"]

1009

1010

cursor.close()

1011

conn.close()

1012

1013

self.logger.info(f"Processed {cursor.rowcount} records")

1014

1015

except Exception as e:

1016

self.logger.error(f"Indexing error: {e}")

1017

1018

# Wait before next iteration

1019

time.sleep(60) # 1 minute polling

1020

1021

def bulk_index(self, docs):

1022

"""Bulk index documents."""

1023

for doc in docs:

1024

self.es.index(doc, "products", "product",

1025

id=doc["product_id"], bulk=True)

1026

self.es.flush_bulk()

1027

1028

return DatabaseIndexer

1029

1030

# 2. Replace RabbitMQ River with Logstash configuration

1031

def rabbitmq_logstash_config():

1032

"""Logstash configuration to replace RabbitMQ river."""

1033

1034

logstash_config = """

1035

input {

1036

rabbitmq {

1037

host => "rabbitmq.example.com"

1038

port => 5672

1039

user => "logstash_user"

1040

password => "logstash_password"

1041

queue => "events_queue"

1042

durable => true

1043

}

1044

}

1045

1046

filter {

1047

json {

1048

source => "message"

1049

}

1050

1051

date {

1052

match => [ "timestamp", "ISO8601" ]

1053

}

1054

1055

mutate {

1056

add_field => { "[@metadata][index]" => "events" }

1057

add_field => { "[@metadata][type]" => "event" }

1058

}

1059

}

1060

1061

output {

1062

elasticsearch {

1063

hosts => ["elasticsearch.example.com:9200"]

1064

index => "%{[@metadata][index]}"

1065

document_type => "%{[@metadata][type]}"

1066

}

1067

}

1068

"""

1069

1070

return logstash_config

1071

1072

# 3. Replace Twitter River with Beats

1073

def twitter_beats_config():

1074

"""Filebeat configuration for Twitter data."""

1075

1076

filebeat_config = """

1077

filebeat.inputs:

1078

- type: log

1079

enabled: true

1080

paths:

1081

- /var/log/twitter/*.json

1082

json.keys_under_root: true

1083

json.add_error_key: true

1084

1085

output.elasticsearch:

1086

hosts: ["elasticsearch.example.com:9200"]

1087

index: "social-media-%{+yyyy.MM.dd}"

1088

template.name: "social-media"

1089

template.pattern: "social-media-*"

1090

"""

1091

1092

return filebeat_config

1093

1094

return {

1095

"jdbc_replacement": jdbc_river_replacement(),

1096

"logstash_config": rabbitmq_logstash_config(),

1097

"beats_config": twitter_beats_config()

1098

}

1099

1100

# Execute migration

1101

migration_patterns = river_migration_patterns()

1102

```

1103

1104

Rivers in PyES provide powerful data ingestion capabilities for legacy ElasticSearch deployments, enabling real-time synchronization with databases, message queues, and external APIs. While deprecated in newer ElasticSearch versions, they remain useful for older systems and can be migrated to modern alternatives like Beats, Logstash, or custom indexing solutions.