0
# Transport and Connection
1
2
Low-level connection management, transport configuration, and client customization. Provides control over HTTP communication, connection pooling, retry logic, serialization, and advanced client configuration options.
3
4
## Capabilities
5
6
### Elasticsearch Client Configuration
7
8
Main client class with comprehensive configuration options for production use.
9
10
```python { .api }
11
class Elasticsearch:
12
def __init__(
13
self,
14
hosts=None,
15
transport_class=Transport,
16
**kwargs
17
):
18
"""
19
Initialize Elasticsearch client with connection and transport configuration.
20
21
Parameters:
22
- hosts: List of Elasticsearch nodes (default: [{'host': 'localhost', 'port': 9200}])
23
- transport_class: Transport class to handle communication (default: Transport)
24
- **kwargs: Additional arguments passed to Transport constructor
25
26
Host specification formats:
27
- String: 'localhost:9200'
28
- Dict: {'host': 'localhost', 'port': 9200, 'use_ssl': True}
29
- List: ['host1:9200', 'host2:9200'] or [{'host': 'host1'}, {'host': 'host2'}]
30
31
Examples:
32
# Simple connection
33
es = Elasticsearch(['localhost:9200'])
34
35
# Multiple hosts with SSL
36
es = Elasticsearch([
37
{'host': 'host1', 'port': 9200, 'use_ssl': True},
38
{'host': 'host2', 'port': 9200, 'use_ssl': True}
39
])
40
41
# With authentication
42
es = Elasticsearch(
43
['https://host1:9200'],
44
http_auth=('username', 'password'),
45
use_ssl=True,
46
verify_certs=True
47
)
48
"""
49
```
50
51
### Transport Layer Configuration
52
53
Core transport class handling HTTP communication, connection management, and request processing.
54
55
```python { .api }
56
class Transport:
57
def __init__(
58
self,
59
hosts,
60
connection_class=Urllib3HttpConnection,
61
connection_pool_class=ConnectionPool,
62
host_info_callback=get_host_info,
63
sniff_on_start=False,
64
sniffer_timeout=None,
65
sniff_on_connection_fail=False,
66
sniff_timeout=0.1,
67
serializer=JSONSerializer(),
68
serializers=None,
69
default_mimetype='application/json',
70
max_retries=3,
71
retry_on_status=(502, 503, 504),
72
retry_on_timeout=False,
73
send_get_body_as='GET',
74
**kwargs
75
):
76
"""
77
Initialize transport layer with connection and retry configuration.
78
79
Parameters:
80
- hosts: List of host configurations
81
- connection_class: HTTP connection implementation (Urllib3HttpConnection, RequestsHttpConnection)
82
- connection_pool_class: Connection pool management class (ConnectionPool, DummyConnectionPool)
83
- host_info_callback: Function to process discovered host information
84
- sniff_on_start: Discover cluster nodes at startup (bool)
85
- sniffer_timeout: Interval between automatic node discovery (seconds)
86
- sniff_on_connection_fail: Discover nodes when connection fails (bool)
87
- sniff_timeout: Timeout for node discovery requests (seconds)
88
- serializer: Default request/response serializer
89
- serializers: Dict of serializers by mimetype
90
- default_mimetype: Default response content type
91
- max_retries: Maximum retry attempts for failed requests
92
- retry_on_status: HTTP status codes that trigger retries
93
- retry_on_timeout: Retry requests that timeout (bool)
94
- send_get_body_as: Method for GET requests with body ('GET' or 'POST')
95
- **kwargs: Additional connection parameters
96
97
Connection parameters passed to connection class:
98
- timeout: Request timeout in seconds (default: 10)
99
- use_ssl: Enable HTTPS connections (bool)
100
- verify_certs: Verify SSL certificates (bool)
101
- ca_certs: Path to CA certificate file
102
- client_cert: Path to client certificate file
103
- client_key: Path to client private key file
104
- ssl_version: SSL version to use
105
- ssl_assert_hostname: Verify SSL hostname (bool)
106
- ssl_assert_fingerprint: Expected SSL certificate fingerprint
107
- maxsize: Maximum connection pool size
108
- http_auth: HTTP authentication tuple ('username', 'password')
109
- http_compress: Enable HTTP compression (bool)
110
- headers: Default HTTP headers dict
111
"""
112
113
def perform_request(self, method: str, url: str, params: dict = None, body: dict = None) -> dict:
114
"""
115
Execute HTTP request with automatic retry and connection management.
116
117
Parameters:
118
- method: HTTP method ('GET', 'POST', 'PUT', 'DELETE', 'HEAD')
119
- url: Request URL path
120
- params: Query parameters dict
121
- body: Request body (will be serialized)
122
123
Returns:
124
dict: Deserialized response body
125
126
Raises:
127
ConnectionError: Network/connection failures
128
TransportError: HTTP errors with status codes
129
SerializationError: Request/response serialization failures
130
"""
131
132
def add_connection(self, host: dict):
133
"""Add a new connection to the pool."""
134
135
def set_connections(self, hosts: list):
136
"""Replace all connections with new host list."""
137
138
def get_connection(self):
139
"""Get an available connection from the pool."""
140
141
def sniff_hosts(self, initial: bool = False):
142
"""Discover cluster nodes and update connection pool."""
143
144
def mark_dead(self, connection):
145
"""Mark a connection as failed for dead timeout period."""
146
147
def close(self):
148
"""Close all connections and clean up resources."""
149
```
150
151
### Connection Pool Management
152
153
Manage multiple connections with failure detection, load balancing, and automatic recovery.
154
155
```python { .api }
156
class ConnectionPool:
157
def __init__(
158
self,
159
connections,
160
dead_timeout=60,
161
timeout_cutoff=5,
162
selector_class=RoundRobinSelector,
163
randomize_hosts=True,
164
**kwargs
165
):
166
"""
167
Initialize connection pool with failure handling and load balancing.
168
169
Parameters:
170
- connections: List of connection instances
171
- dead_timeout: Seconds to wait before retrying failed connections
172
- timeout_cutoff: Failures needed to mark connection dead
173
- selector_class: Connection selection strategy class
174
- randomize_hosts: Randomize initial connection order (bool)
175
- **kwargs: Additional configuration passed to selector
176
"""
177
178
def get_connection(self):
179
"""
180
Select an available connection using the configured selector.
181
182
Returns:
183
Connection: Available connection instance
184
185
Raises:
186
ConnectionError: If no connections are available
187
"""
188
189
def mark_dead(self, connection, now: float = None):
190
"""
191
Mark connection as failed and start dead timeout.
192
193
Parameters:
194
- connection: Connection instance to mark as dead
195
- now: Current timestamp (auto-generated if None)
196
"""
197
198
def mark_live(self, connection):
199
"""Reset connection failure count and mark as available."""
200
201
def resurrect(self, force: bool = False):
202
"""
203
Attempt to revive dead connections after timeout period.
204
205
Parameters:
206
- force: Revive regardless of timeout (bool)
207
"""
208
209
def close(self):
210
"""Close all connections in the pool."""
211
212
class DummyConnectionPool:
213
"""
214
Single connection pool for cases with only one Elasticsearch node.
215
Provides same interface as ConnectionPool but manages single connection.
216
"""
217
```
218
219
### Connection Selection Strategies
220
221
Different strategies for selecting connections from the pool.
222
223
```python { .api }
224
class ConnectionSelector:
225
def __init__(self, opts: dict):
226
"""
227
Base class for connection selection strategies.
228
229
Parameters:
230
- opts: Selector configuration options
231
"""
232
233
def select(self, connections: list):
234
"""
235
Select connection from available connections.
236
237
Parameters:
238
- connections: List of available connections
239
240
Returns:
241
Connection: Selected connection instance
242
"""
243
244
class RoundRobinSelector(ConnectionSelector):
245
"""
246
Round-robin connection selection for even load distribution.
247
Cycles through available connections in order.
248
"""
249
250
class RandomSelector(ConnectionSelector):
251
"""
252
Random connection selection for simple load balancing.
253
Randomly selects from available connections.
254
"""
255
```
256
257
### HTTP Connection Implementations
258
259
Different HTTP client implementations for various use cases and environments.
260
261
```python { .api }
262
class Connection:
263
def __init__(
264
self,
265
host='localhost',
266
port=9200,
267
use_ssl=False,
268
url_prefix='',
269
timeout=10,
270
**kwargs
271
):
272
"""
273
Base connection class defining the connection interface.
274
275
Parameters:
276
- host: Elasticsearch host address
277
- port: Port number (default: 9200)
278
- use_ssl: Enable HTTPS (bool)
279
- url_prefix: URL prefix for requests
280
- timeout: Request timeout in seconds
281
- **kwargs: Implementation-specific parameters
282
"""
283
284
def perform_request(self, method: str, url: str, params: dict, body: bytes, timeout: float, ignore: tuple, headers: dict):
285
"""
286
Execute HTTP request (implemented by subclasses).
287
288
Returns:
289
tuple: (status_code, headers_dict, response_body)
290
"""
291
292
def log_request_success(self, method: str, full_url: str, path: str, body: bytes, status_code: int, response: str, duration: float):
293
"""Log successful request for debugging and monitoring."""
294
295
def log_request_fail(self, method: str, full_url: str, path: str, body: bytes, duration: float, status_code: int, response: str, exception: Exception):
296
"""Log failed request for debugging and error tracking."""
297
298
class Urllib3HttpConnection(Connection):
299
"""
300
HTTP connection using urllib3 library (default implementation).
301
302
Additional parameters:
303
- maxsize: Connection pool size (default: 10)
304
- block: Block when pool is full (bool, default: False)
305
- http_compress: Enable compression (bool, default: False)
306
- headers: Default headers dict
307
- ssl_context: Custom SSL context
308
- assert_same_host: Verify host matches (bool, default: True)
309
"""
310
311
class RequestsHttpConnection(Connection):
312
"""
313
HTTP connection using requests library (alternative implementation).
314
315
Additional parameters:
316
- pool_connections: Number of connection pools
317
- pool_maxsize: Maximum connections per pool
318
- max_retries: urllib3 retry configuration
319
- pool_block: Block when pool exhausted (bool)
320
"""
321
```
322
323
### Data Serialization
324
325
Handle request/response serialization with support for multiple formats.
326
327
```python { .api }
328
class JSONSerializer:
329
"""
330
Default JSON serializer for request/response data.
331
332
Attributes:
333
- mimetype: 'application/json'
334
"""
335
336
def loads(self, s: str) -> dict:
337
"""
338
Deserialize JSON string to Python object.
339
340
Parameters:
341
- s: JSON string to deserialize
342
343
Returns:
344
dict: Deserialized Python object
345
346
Raises:
347
SerializationError: If JSON is invalid
348
"""
349
350
def dumps(self, data: dict) -> str:
351
"""
352
Serialize Python object to JSON string.
353
354
Parameters:
355
- data: Python object to serialize
356
357
Returns:
358
str: JSON string representation
359
360
Raises:
361
SerializationError: If object cannot be serialized
362
"""
363
364
def default(self, data):
365
"""
366
Handle special data types during serialization.
367
Supports: datetime, date, Decimal, UUID objects
368
"""
369
370
class TextSerializer:
371
"""
372
Plain text serializer for string data.
373
374
Attributes:
375
- mimetype: 'text/plain'
376
"""
377
378
def loads(self, s: str) -> str:
379
"""Return string unchanged."""
380
381
def dumps(self, data: str) -> str:
382
"""Serialize string data only (raises error for other types)."""
383
384
class Deserializer:
385
def __init__(self, serializers: dict, default_mimetype: str = 'application/json'):
386
"""
387
Response deserializer supporting multiple content types.
388
389
Parameters:
390
- serializers: Dict mapping mimetypes to serializer instances
391
- default_mimetype: Fallback content type
392
"""
393
394
def loads(self, s: str, mimetype: str = None) -> dict:
395
"""
396
Deserialize response based on content type.
397
398
Parameters:
399
- s: Response string to deserialize
400
- mimetype: Content type (uses default if None)
401
402
Returns:
403
dict: Deserialized response
404
"""
405
```
406
407
### Host Discovery and Configuration
408
409
Automatic node discovery and host configuration management.
410
411
```python { .api }
412
def get_host_info(node_info: dict, host: dict) -> dict:
413
"""
414
Default callback to process discovered node information.
415
416
Parameters:
417
- node_info: Node information from cluster state
418
- host: Current host configuration
419
420
Returns:
421
dict: Updated host configuration with discovered information
422
423
Extracts:
424
- host: IP address or hostname
425
- port: HTTP port number
426
- use_ssl: Whether node uses HTTPS
427
- url_prefix: URL prefix if configured
428
"""
429
```
430
431
## Usage Examples
432
433
### Basic Client Configuration
434
435
```python
436
from elasticsearch5 import Elasticsearch
437
438
# Simple connection to local cluster
439
es = Elasticsearch()
440
441
# Multiple hosts for high availability
442
es = Elasticsearch([
443
'host1:9200',
444
'host2:9200',
445
'host3:9200'
446
])
447
448
# Detailed host configuration
449
es = Elasticsearch([
450
{'host': 'host1', 'port': 9200},
451
{'host': 'host2', 'port': 9200, 'use_ssl': True},
452
{'host': 'host3', 'port': 9243, 'url_prefix': '/elasticsearch'}
453
])
454
```
455
456
### SSL and Authentication
457
458
```python
459
# SSL with authentication
460
es = Elasticsearch(
461
['https://host1:9200'],
462
http_auth=('username', 'password'),
463
use_ssl=True,
464
verify_certs=True,
465
ca_certs='/path/to/ca.crt',
466
timeout=30
467
)
468
469
# Client certificate authentication
470
es = Elasticsearch(
471
['https://host1:9200'],
472
use_ssl=True,
473
verify_certs=True,
474
ca_certs='/path/to/ca.crt',
475
client_cert='/path/to/client.crt',
476
client_key='/path/to/client.key'
477
)
478
479
# Custom SSL context
480
import ssl
481
ssl_context = ssl.create_default_context(cafile='/path/to/ca.crt')
482
ssl_context.check_hostname = False
483
484
es = Elasticsearch(
485
['https://host1:9200'],
486
use_ssl=True,
487
ssl_context=ssl_context
488
)
489
```
490
491
### Advanced Transport Configuration
492
493
```python
494
from elasticsearch5 import (
495
Elasticsearch,
496
Transport,
497
RequestsHttpConnection,
498
RoundRobinSelector
499
)
500
501
# Custom transport with requests
502
es = Elasticsearch(
503
['host1:9200', 'host2:9200'],
504
connection_class=RequestsHttpConnection,
505
timeout=20,
506
max_retries=5,
507
retry_on_timeout=True,
508
retry_on_status=(429, 502, 503, 504),
509
http_compress=True
510
)
511
512
# Node discovery configuration
513
es = Elasticsearch(
514
['host1:9200'],
515
sniff_on_start=True,
516
sniff_on_connection_fail=True,
517
sniffer_timeout=60, # Sniff every 60 seconds
518
sniff_timeout=10 # 10 second sniff timeout
519
)
520
521
# Custom connection pool
522
from elasticsearch5 import ConnectionPool, RandomSelector
523
524
es = Elasticsearch(
525
['host1:9200', 'host2:9200', 'host3:9200'],
526
connection_pool_class=ConnectionPool,
527
selector_class=RandomSelector,
528
dead_timeout=30, # Retry dead connections after 30s
529
timeout_cutoff=3 # Mark dead after 3 failures
530
)
531
```
532
533
### Custom Serialization
534
535
```python
536
from elasticsearch5 import Elasticsearch, JSONSerializer
537
import json
538
from datetime import datetime
539
540
class CustomJSONSerializer(JSONSerializer):
541
def default(self, obj):
542
if isinstance(obj, datetime):
543
return obj.isoformat()
544
return super().default(obj)
545
546
# Use custom serializer
547
es = Elasticsearch(
548
['localhost:9200'],
549
serializer=CustomJSONSerializer()
550
)
551
552
# Multiple serializers for different content types
553
from elasticsearch5.serializers import TextSerializer
554
555
serializers = {
556
'application/json': CustomJSONSerializer(),
557
'text/plain': TextSerializer(),
558
'text/csv': TextSerializer()
559
}
560
561
es = Elasticsearch(
562
['localhost:9200'],
563
serializers=serializers,
564
default_mimetype='application/json'
565
)
566
```
567
568
### Connection Monitoring and Debugging
569
570
```python
571
import logging
572
573
# Enable debug logging
574
logging.basicConfig(level=logging.DEBUG)
575
logger = logging.getLogger('elasticsearch')
576
logger.setLevel(logging.DEBUG)
577
578
# Custom connection with monitoring
579
class MonitoredConnection(Urllib3HttpConnection):
580
def log_request_success(self, method, full_url, path, body, status_code, response, duration):
581
super().log_request_success(method, full_url, path, body, status_code, response, duration)
582
print(f"SUCCESS: {method} {path} -> {status_code} ({duration:.2f}s)")
583
584
def log_request_fail(self, method, full_url, path, body, duration, status_code, response, exception):
585
super().log_request_fail(method, full_url, path, body, duration, status_code, response, exception)
586
print(f"FAILED: {method} {path} -> {status_code} ({duration:.2f}s): {exception}")
587
588
es = Elasticsearch(
589
['localhost:9200'],
590
connection_class=MonitoredConnection
591
)
592
```
593
594
### Connection Pool Management
595
596
```python
597
# Access transport layer directly
598
transport = es.transport
599
600
# Add new connection at runtime
601
transport.add_connection({'host': 'new-host', 'port': 9200})
602
603
# Update entire connection list
604
transport.set_connections([
605
{'host': 'host1', 'port': 9200},
606
{'host': 'host2', 'port': 9200},
607
{'host': 'host3', 'port': 9200}
608
])
609
610
# Manual node discovery
611
transport.sniff_hosts()
612
613
# Check connection pool status
614
pool = transport.connection_pool
615
print(f"Total connections: {len(pool.connections)}")
616
print(f"Dead connections: {len(pool.dead_count)}")
617
618
# Force resurrection of dead connections
619
pool.resurrect(force=True)
620
621
# Clean shutdown
622
transport.close()
623
```
624
625
### Error Handling and Retries
626
627
```python
628
from elasticsearch5.exceptions import (
629
ConnectionError,
630
TransportError,
631
SerializationError
632
)
633
634
try:
635
es = Elasticsearch(
636
['host1:9200', 'host2:9200'],
637
max_retries=3,
638
retry_on_status=(429, 502, 503, 504),
639
retry_on_timeout=True,
640
timeout=10
641
)
642
643
result = es.search(index='my_index', body={'query': {'match_all': {}}})
644
645
except ConnectionError as e:
646
print(f"Connection failed: {e}")
647
except TransportError as e:
648
print(f"Transport error {e.status_code}: {e.error}")
649
except SerializationError as e:
650
print(f"Serialization error: {e}")
651
except Exception as e:
652
print(f"Unexpected error: {e}")
653
```
654
655
### Production Configuration
656
657
```python
658
# Production-ready configuration
659
es = Elasticsearch(
660
[
661
{'host': 'es-node-1', 'port': 9200},
662
{'host': 'es-node-2', 'port': 9200},
663
{'host': 'es-node-3', 'port': 9200}
664
],
665
666
# Security
667
http_auth=('username', 'password'),
668
use_ssl=True,
669
verify_certs=True,
670
ca_certs='/etc/ssl/certs/elasticsearch-ca.pem',
671
672
# Performance
673
timeout=30,
674
max_retries=3,
675
retry_on_status=(429, 502, 503, 504),
676
retry_on_timeout=True,
677
http_compress=True,
678
679
# Reliability
680
sniff_on_start=True,
681
sniff_on_connection_fail=True,
682
sniffer_timeout=60,
683
dead_timeout=30,
684
685
# Connection pool
686
maxsize=25, # Max connections per host
687
selector_class=RoundRobinSelector,
688
689
# Headers
690
headers={'User-Agent': 'MyApp/1.0'}
691
)
692
```