0
# Connections
1
2
Connection configuration and management for single and multiple Elasticsearch clusters with comprehensive support for authentication, SSL/TLS, connection pooling, retry logic, and both synchronous and asynchronous clients. Enables flexible deployment architectures and secure cluster communication.
3
4
## Capabilities
5
6
### Connection Management Functions
7
8
Core functions for managing Elasticsearch connections.
9
10
```python { .api }
11
def create_connection(alias='default', **kwargs):
12
"""
13
Create and register new Elasticsearch connection.
14
15
Args:
16
alias (str): Connection alias name (default: 'default')
17
**kwargs: Connection parameters passed to Elasticsearch client
18
19
Returns:
20
Elasticsearch: Elasticsearch client instance
21
22
Connection Parameters:
23
hosts (list or str): Elasticsearch host(s)
24
http_auth (tuple): HTTP authentication (username, password)
25
http_compress (bool): Enable HTTP compression
26
timeout (int): Request timeout in seconds
27
max_retries (int): Maximum number of retries
28
retry_on_timeout (bool): Retry on timeout errors
29
retry_on_status (list): HTTP status codes to retry on
30
sniff_on_start (bool): Sniff nodes on startup
31
sniff_on_connection_fail (bool): Sniff on connection failure
32
sniff_timeout (int): Sniffer timeout
33
sniffer_delay (int): Delay between sniff operations
34
randomize_hosts (bool): Randomize host selection
35
use_ssl (bool): Use SSL/TLS
36
verify_certs (bool): Verify SSL certificates
37
ssl_show_warn (bool): Show SSL warnings
38
ca_certs (str): Path to CA certificates
39
client_cert (str): Path to client certificate
40
client_key (str): Path to client private key
41
ssl_version (str): SSL/TLS version
42
ssl_assert_hostname (bool): Assert hostname in certificate
43
ssl_assert_fingerprint (str): Assert certificate fingerprint
44
headers (dict): Default HTTP headers
45
connections_per_node (int): Connections per node
46
http_compress (bool): Enable gzip compression
47
cloud_id (str): Elastic Cloud cluster ID
48
api_key (tuple): API key authentication (id, key)
49
bearer_auth (str): Bearer token authentication
50
opaque_id (str): Opaque ID for request identification
51
"""
52
53
def add_connection(alias, connection):
54
"""
55
Add existing connection to registry.
56
57
Args:
58
alias (str): Connection alias name
59
connection: Elasticsearch client instance
60
"""
61
62
def remove_connection(alias):
63
"""
64
Remove connection from registry.
65
66
Args:
67
alias (str): Connection alias name
68
69
Raises:
70
KeyError: If connection alias doesn't exist
71
"""
72
73
def get_connection(alias='default'):
74
"""
75
Retrieve connection by alias.
76
77
Args:
78
alias (str): Connection alias name (default: 'default')
79
80
Returns:
81
Elasticsearch: Elasticsearch client instance
82
83
Raises:
84
KeyError: If connection alias doesn't exist
85
"""
86
87
def configure(**kwargs):
88
"""
89
Configure default connection with given parameters.
90
91
Args:
92
**kwargs: Connection parameters (same as create_connection)
93
94
Returns:
95
Elasticsearch: Configured default connection
96
"""
97
```
98
99
### Connection Registry
100
101
Global connection registry for managing multiple connections.
102
103
```python { .api }
104
class Connections:
105
"""
106
Global connection registry singleton.
107
"""
108
109
def create_connection(self, alias='default', **kwargs):
110
"""Create and register connection (same as module function)."""
111
112
def add_connection(self, alias, connection):
113
"""Add existing connection (same as module function)."""
114
115
def remove_connection(self, alias):
116
"""Remove connection (same as module function)."""
117
118
def get_connection(self, alias='default'):
119
"""Get connection (same as module function)."""
120
121
def configure(self, **kwargs):
122
"""Configure default connection (same as module function)."""
123
124
def all(self):
125
"""
126
Get all registered connections.
127
128
Returns:
129
dict: Mapping of alias to connection
130
"""
131
132
# Global connections instance
133
connections: Connections
134
```
135
136
### Async Connection Management
137
138
Asynchronous connection management for async/await operations.
139
140
```python { .api }
141
def create_async_connection(alias='default', **kwargs):
142
"""
143
Create and register new async Elasticsearch connection.
144
145
Args:
146
alias (str): Connection alias name (default: 'default')
147
**kwargs: Connection parameters (same as create_connection)
148
149
Returns:
150
AsyncElasticsearch: Async Elasticsearch client instance
151
"""
152
153
def add_async_connection(alias, connection):
154
"""
155
Add existing async connection to registry.
156
157
Args:
158
alias (str): Connection alias name
159
connection: AsyncElasticsearch client instance
160
"""
161
162
def remove_async_connection(alias):
163
"""
164
Remove async connection from registry.
165
166
Args:
167
alias (str): Connection alias name
168
169
Raises:
170
KeyError: If connection alias doesn't exist
171
"""
172
173
def get_async_connection(alias='default'):
174
"""
175
Retrieve async connection by alias.
176
177
Args:
178
alias (str): Connection alias name (default: 'default')
179
180
Returns:
181
AsyncElasticsearch: Async Elasticsearch client instance
182
183
Raises:
184
KeyError: If connection alias doesn't exist
185
"""
186
187
def configure_async(**kwargs):
188
"""
189
Configure default async connection with given parameters.
190
191
Args:
192
**kwargs: Connection parameters (same as create_connection)
193
194
Returns:
195
AsyncElasticsearch: Configured default async connection
196
"""
197
```
198
199
### Async Connection Registry
200
201
Async connection registry for managing multiple async connections.
202
203
```python { .api }
204
class AsyncConnections:
205
"""
206
Global async connection registry singleton.
207
"""
208
209
def create_connection(self, alias='default', **kwargs):
210
"""Create and register async connection."""
211
212
def add_connection(self, alias, connection):
213
"""Add existing async connection."""
214
215
def remove_connection(self, alias):
216
"""Remove async connection."""
217
218
def get_connection(self, alias='default'):
219
"""Get async connection."""
220
221
def configure(self, **kwargs):
222
"""Configure default async connection."""
223
224
def all(self):
225
"""
226
Get all registered async connections.
227
228
Returns:
229
dict: Mapping of alias to async connection
230
"""
231
232
# Global async connections instance
233
async_connections: AsyncConnections
234
```
235
236
## Usage Examples
237
238
### Basic Connection Setup
239
240
```python
241
from elasticsearch_dsl import connections, Document, Text
242
243
# Simple connection to localhost
244
connections.create_connection(hosts=['localhost:9200'])
245
246
# Connection with authentication
247
connections.create_connection(
248
alias='secure',
249
hosts=['https://elasticsearch.example.com:9200'],
250
http_auth=('username', 'password'),
251
use_ssl=True,
252
verify_certs=True
253
)
254
255
# Use connection in document operations
256
class Article(Document):
257
title = Text()
258
content = Text()
259
260
class Index:
261
name = 'articles'
262
263
# Document operations will use default connection
264
article = Article(title='Test', content='Content')
265
article.save()
266
267
# Use specific connection
268
article.save(using='secure')
269
```
270
271
### Multiple Cluster Configuration
272
273
```python
274
from elasticsearch_dsl import connections
275
276
# Production cluster
277
connections.create_connection(
278
alias='production',
279
hosts=['prod-es-1.example.com:9200', 'prod-es-2.example.com:9200'],
280
http_auth=('prod_user', 'prod_password'),
281
use_ssl=True,
282
verify_certs=True,
283
ca_certs='/path/to/ca.pem',
284
timeout=30,
285
max_retries=3,
286
retry_on_timeout=True,
287
sniff_on_start=True,
288
sniff_on_connection_fail=True,
289
sniff_timeout=10,
290
randomize_hosts=True
291
)
292
293
# Development cluster
294
connections.create_connection(
295
alias='development',
296
hosts=['dev-es.example.com:9200'],
297
http_auth=('dev_user', 'dev_password'),
298
timeout=10,
299
max_retries=1
300
)
301
302
# Analytics cluster (read-only)
303
connections.create_connection(
304
alias='analytics',
305
hosts=['analytics-es.example.com:9200'],
306
http_auth=('analytics_user', 'analytics_password'),
307
use_ssl=True,
308
timeout=60 # Longer timeout for analytics queries
309
)
310
311
# Use different connections for different operations
312
from elasticsearch_dsl import Search
313
314
# Search production data
315
search = Search(using='production', index='logs')
316
response = search.execute()
317
318
# Run analytics on dedicated cluster
319
analytics_search = Search(using='analytics', index='metrics')
320
analytics_response = analytics_search.execute()
321
```
322
323
### SSL/TLS and Authentication
324
325
```python
326
from elasticsearch_dsl import connections
327
328
# SSL with client certificates
329
connections.create_connection(
330
alias='ssl_client_cert',
331
hosts=['https://secure-es.example.com:9200'],
332
use_ssl=True,
333
verify_certs=True,
334
ca_certs='/path/to/ca-certificates.crt',
335
client_cert='/path/to/client.crt',
336
client_key='/path/to/client.key',
337
ssl_assert_hostname=True
338
)
339
340
# API Key authentication
341
connections.create_connection(
342
alias='api_key',
343
hosts=['https://es.example.com:9200'],
344
api_key=('api_key_id', 'api_key_secret'),
345
use_ssl=True,
346
verify_certs=True
347
)
348
349
# Bearer token authentication
350
connections.create_connection(
351
alias='bearer_token',
352
hosts=['https://es.example.com:9200'],
353
bearer_auth='your_bearer_token_here',
354
use_ssl=True
355
)
356
357
# Elastic Cloud connection
358
connections.create_connection(
359
alias='elastic_cloud',
360
cloud_id='cluster_name:base64_encoded_endpoint',
361
http_auth=('elastic_username', 'elastic_password')
362
)
363
364
# Custom headers and opaque ID
365
connections.create_connection(
366
alias='custom_headers',
367
hosts=['https://es.example.com:9200'],
368
headers={'Custom-Header': 'value'},
369
opaque_id='my-application-v1.0',
370
http_auth=('username', 'password')
371
)
372
```
373
374
### Connection Pooling and Performance
375
376
```python
377
from elasticsearch_dsl import connections
378
379
# High-performance configuration
380
connections.create_connection(
381
alias='high_performance',
382
hosts=[
383
'es-node-01.example.com:9200',
384
'es-node-02.example.com:9200',
385
'es-node-03.example.com:9200'
386
],
387
# Connection pooling
388
connections_per_node=10,
389
390
# Retry configuration
391
max_retries=5,
392
retry_on_timeout=True,
393
retry_on_status=[429, 502, 503, 504],
394
395
# Sniffing for node discovery
396
sniff_on_start=True,
397
sniff_on_connection_fail=True,
398
sniff_timeout=5,
399
sniffer_delay=60,
400
401
# Performance optimizations
402
http_compress=True,
403
randomize_hosts=True,
404
405
# Timeouts
406
timeout=20,
407
408
# Authentication
409
http_auth=('username', 'password'),
410
use_ssl=True,
411
verify_certs=True
412
)
413
414
# Bulk operations configuration
415
connections.create_connection(
416
alias='bulk_operations',
417
hosts=['bulk-es.example.com:9200'],
418
timeout=300, # 5 minute timeout for bulk operations
419
max_retries=1, # Fewer retries for bulk
420
http_compress=True, # Important for bulk data
421
connections_per_node=20, # More connections for concurrent bulk
422
http_auth=('bulk_user', 'bulk_password')
423
)
424
```
425
426
### Async Connection Setup
427
428
```python
429
from elasticsearch_dsl import async_connections
430
import asyncio
431
432
async def setup_async_connections():
433
# Create async connection
434
await async_connections.create_connection(
435
alias='async_default',
436
hosts=['localhost:9200'],
437
timeout=30,
438
max_retries=3
439
)
440
441
# Async connection with authentication
442
await async_connections.create_connection(
443
alias='async_secure',
444
hosts=['https://es.example.com:9200'],
445
http_auth=('username', 'password'),
446
use_ssl=True,
447
verify_certs=True,
448
timeout=60
449
)
450
451
# Run async setup
452
asyncio.run(setup_async_connections())
453
454
# Use async connections with AsyncDocument and AsyncSearch
455
from elasticsearch_dsl import AsyncDocument, AsyncSearch
456
457
class AsyncArticle(AsyncDocument):
458
title = Text()
459
content = Text()
460
461
class Index:
462
name = 'async_articles'
463
464
async def async_operations():
465
# Create and save document
466
article = AsyncArticle(title='Async Test', content='Async content')
467
await article.save(using='async_default')
468
469
# Async search
470
search = AsyncSearch(using='async_secure', index='logs')
471
search = search.query('match', message='error')
472
response = await search.execute()
473
474
for hit in response:
475
print(f"Log: {hit.message}")
476
477
# Run async operations
478
asyncio.run(async_operations())
479
```
480
481
### Connection Health and Monitoring
482
483
```python
484
from elasticsearch_dsl import connections
485
from elasticsearch.exceptions import ConnectionError, TransportError
486
487
def check_connection_health(alias='default'):
488
"""Check health of Elasticsearch connection."""
489
try:
490
client = connections.get_connection(alias)
491
492
# Check cluster health
493
health = client.cluster.health()
494
print(f"Cluster status: {health['status']}")
495
print(f"Number of nodes: {health['number_of_nodes']}")
496
print(f"Active shards: {health['active_shards']}")
497
498
# Check node info
499
nodes = client.nodes.info()
500
for node_id, node_info in nodes['nodes'].items():
501
print(f"Node {node_id}: {node_info['name']} ({node_info['version']})")
502
503
# Test with simple search
504
client.search(index='_all', body={'query': {'match_all': {}}}, size=1)
505
print("Connection test successful!")
506
507
return True
508
509
except ConnectionError as e:
510
print(f"Connection error: {e}")
511
return False
512
except TransportError as e:
513
print(f"Transport error: {e}")
514
return False
515
except Exception as e:
516
print(f"Unexpected error: {e}")
517
return False
518
519
# Check all connections
520
for alias in connections.all():
521
print(f"Checking connection '{alias}':")
522
check_connection_health(alias)
523
print("-" * 40)
524
```
525
526
### Dynamic Connection Management
527
528
```python
529
from elasticsearch_dsl import connections, Document, Text
530
import os
531
532
class ConfigurableDocument(Document):
533
"""Document that uses environment-based connection selection."""
534
535
title = Text()
536
content = Text()
537
538
class Index:
539
name = 'configurable_docs'
540
541
def save(self, **kwargs):
542
# Use environment-specific connection
543
env = os.getenv('ENVIRONMENT', 'development')
544
connection_alias = f'es_{env}'
545
546
if connection_alias not in connections.all():
547
self._setup_connection(env)
548
549
return super().save(using=connection_alias, **kwargs)
550
551
def _setup_connection(self, env):
552
"""Setup connection based on environment."""
553
if env == 'production':
554
connections.create_connection(
555
alias='es_production',
556
hosts=os.getenv('ES_PROD_HOSTS', 'localhost:9200').split(','),
557
http_auth=(
558
os.getenv('ES_PROD_USER'),
559
os.getenv('ES_PROD_PASSWORD')
560
),
561
use_ssl=True,
562
verify_certs=True,
563
timeout=30
564
)
565
elif env == 'staging':
566
connections.create_connection(
567
alias='es_staging',
568
hosts=os.getenv('ES_STAGING_HOSTS', 'localhost:9200').split(','),
569
http_auth=(
570
os.getenv('ES_STAGING_USER'),
571
os.getenv('ES_STAGING_PASSWORD')
572
),
573
timeout=20
574
)
575
else: # development
576
connections.create_connection(
577
alias='es_development',
578
hosts=['localhost:9200'],
579
timeout=10
580
)
581
582
# Usage
583
doc = ConfigurableDocument(title='Test', content='Content')
584
doc.save() # Will use appropriate connection based on ENVIRONMENT variable
585
```
586
587
### Connection Error Handling
588
589
```python
590
from elasticsearch_dsl import connections, Search
591
from elasticsearch.exceptions import (
592
ConnectionError, ConnectionTimeout, TransportError,
593
NotFoundError, RequestError
594
)
595
import time
596
597
def robust_search_with_retry(index, query, max_retries=3, delay=1):
598
"""Perform search with connection retry logic."""
599
600
for attempt in range(max_retries):
601
try:
602
search = Search(index=index)
603
search = search.query('match', content=query)
604
response = search.execute()
605
return response
606
607
except ConnectionTimeout:
608
print(f"Attempt {attempt + 1}: Connection timeout")
609
if attempt < max_retries - 1:
610
time.sleep(delay * (2 ** attempt)) # Exponential backoff
611
continue
612
raise
613
614
except ConnectionError as e:
615
print(f"Attempt {attempt + 1}: Connection error - {e}")
616
if attempt < max_retries - 1:
617
# Try to recreate connection
618
try:
619
connections.remove_connection('default')
620
except KeyError:
621
pass
622
connections.create_connection(hosts=['localhost:9200'])
623
time.sleep(delay)
624
continue
625
raise
626
627
except TransportError as e:
628
if e.status_code in [429, 502, 503, 504]: # Retry on these errors
629
print(f"Attempt {attempt + 1}: Transport error {e.status_code}")
630
if attempt < max_retries - 1:
631
time.sleep(delay * (2 ** attempt))
632
continue
633
raise
634
635
except (NotFoundError, RequestError):
636
# Don't retry on client errors
637
raise
638
639
except Exception as e:
640
print(f"Unexpected error: {e}")
641
raise
642
643
# Usage
644
try:
645
results = robust_search_with_retry('articles', 'elasticsearch')
646
print(f"Found {len(results)} results")
647
except Exception as e:
648
print(f"Search failed after retries: {e}")
649
```