or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-operations.mdcluster-operations.mddocument-operations.mdindex-management.mdindex.mdsearch-operations.mdtransport-connection.md

transport-connection.mddocs/

0

# Transport and Connection

1

2

Low-level connection management, transport configuration, and client customization. Provides control over HTTP communication, connection pooling, retry logic, serialization, and advanced client configuration options.

3

4

## Capabilities

5

6

### Elasticsearch Client Configuration

7

8

Main client class with comprehensive configuration options for production use.

9

10

```python { .api }

11

class Elasticsearch:

12

def __init__(

13

self,

14

hosts=None,

15

transport_class=Transport,

16

**kwargs

17

):

18

"""

19

Initialize Elasticsearch client with connection and transport configuration.

20

21

Parameters:

22

- hosts: List of Elasticsearch nodes (default: [{'host': 'localhost', 'port': 9200}])

23

- transport_class: Transport class to handle communication (default: Transport)

24

- **kwargs: Additional arguments passed to Transport constructor

25

26

Host specification formats:

27

- String: 'localhost:9200'

28

- Dict: {'host': 'localhost', 'port': 9200, 'use_ssl': True}

29

- List: ['host1:9200', 'host2:9200'] or [{'host': 'host1'}, {'host': 'host2'}]

30

31

Examples:

32

# Simple connection

33

es = Elasticsearch(['localhost:9200'])

34

35

# Multiple hosts with SSL

36

es = Elasticsearch([

37

{'host': 'host1', 'port': 9200, 'use_ssl': True},

38

{'host': 'host2', 'port': 9200, 'use_ssl': True}

39

])

40

41

# With authentication

42

es = Elasticsearch(

43

['https://host1:9200'],

44

http_auth=('username', 'password'),

45

use_ssl=True,

46

verify_certs=True

47

)

48

"""

49

```

50

51

### Transport Layer Configuration

52

53

Core transport class handling HTTP communication, connection management, and request processing.

54

55

```python { .api }

56

class Transport:

57

def __init__(

58

self,

59

hosts,

60

connection_class=Urllib3HttpConnection,

61

connection_pool_class=ConnectionPool,

62

host_info_callback=get_host_info,

63

sniff_on_start=False,

64

sniffer_timeout=None,

65

sniff_on_connection_fail=False,

66

sniff_timeout=0.1,

67

serializer=JSONSerializer(),

68

serializers=None,

69

default_mimetype='application/json',

70

max_retries=3,

71

retry_on_status=(502, 503, 504),

72

retry_on_timeout=False,

73

send_get_body_as='GET',

74

**kwargs

75

):

76

"""

77

Initialize transport layer with connection and retry configuration.

78

79

Parameters:

80

- hosts: List of host configurations

81

- connection_class: HTTP connection implementation (Urllib3HttpConnection, RequestsHttpConnection)

82

- connection_pool_class: Connection pool management class (ConnectionPool, DummyConnectionPool)

83

- host_info_callback: Function to process discovered host information

84

- sniff_on_start: Discover cluster nodes at startup (bool)

85

- sniffer_timeout: Interval between automatic node discovery (seconds)

86

- sniff_on_connection_fail: Discover nodes when connection fails (bool)

87

- sniff_timeout: Timeout for node discovery requests (seconds)

88

- serializer: Default request/response serializer

89

- serializers: Dict of serializers by mimetype

90

- default_mimetype: Default response content type

91

- max_retries: Maximum retry attempts for failed requests

92

- retry_on_status: HTTP status codes that trigger retries

93

- retry_on_timeout: Retry requests that timeout (bool)

94

- send_get_body_as: Method for GET requests with body ('GET' or 'POST')

95

- **kwargs: Additional connection parameters

96

97

Connection parameters passed to connection class:

98

- timeout: Request timeout in seconds (default: 10)

99

- use_ssl: Enable HTTPS connections (bool)

100

- verify_certs: Verify SSL certificates (bool)

101

- ca_certs: Path to CA certificate file

102

- client_cert: Path to client certificate file

103

- client_key: Path to client private key file

104

- ssl_version: SSL version to use

105

- ssl_assert_hostname: Verify SSL hostname (bool)

106

- ssl_assert_fingerprint: Expected SSL certificate fingerprint

107

- maxsize: Maximum connection pool size

108

- http_auth: HTTP authentication tuple ('username', 'password')

109

- http_compress: Enable HTTP compression (bool)

110

- headers: Default HTTP headers dict

111

"""

112

113

def perform_request(self, method: str, url: str, params: dict = None, body: dict = None) -> dict:

114

"""

115

Execute HTTP request with automatic retry and connection management.

116

117

Parameters:

118

- method: HTTP method ('GET', 'POST', 'PUT', 'DELETE', 'HEAD')

119

- url: Request URL path

120

- params: Query parameters dict

121

- body: Request body (will be serialized)

122

123

Returns:

124

dict: Deserialized response body

125

126

Raises:

127

ConnectionError: Network/connection failures

128

TransportError: HTTP errors with status codes

129

SerializationError: Request/response serialization failures

130

"""

131

132

def add_connection(self, host: dict):

133

"""Add a new connection to the pool."""

134

135

def set_connections(self, hosts: list):

136

"""Replace all connections with new host list."""

137

138

def get_connection(self):

139

"""Get an available connection from the pool."""

140

141

def sniff_hosts(self, initial: bool = False):

142

"""Discover cluster nodes and update connection pool."""

143

144

def mark_dead(self, connection):

145

"""Mark a connection as failed for dead timeout period."""

146

147

def close(self):

148

"""Close all connections and clean up resources."""

149

```

150

151

### Connection Pool Management

152

153

Manage multiple connections with failure detection, load balancing, and automatic recovery.

154

155

```python { .api }

156

class ConnectionPool:

157

def __init__(

158

self,

159

connections,

160

dead_timeout=60,

161

timeout_cutoff=5,

162

selector_class=RoundRobinSelector,

163

randomize_hosts=True,

164

**kwargs

165

):

166

"""

167

Initialize connection pool with failure handling and load balancing.

168

169

Parameters:

170

- connections: List of connection instances

171

- dead_timeout: Seconds to wait before retrying failed connections

172

- timeout_cutoff: Failures needed to mark connection dead

173

- selector_class: Connection selection strategy class

174

- randomize_hosts: Randomize initial connection order (bool)

175

- **kwargs: Additional configuration passed to selector

176

"""

177

178

def get_connection(self):

179

"""

180

Select an available connection using the configured selector.

181

182

Returns:

183

Connection: Available connection instance

184

185

Raises:

186

ConnectionError: If no connections are available

187

"""

188

189

def mark_dead(self, connection, now: float = None):

190

"""

191

Mark connection as failed and start dead timeout.

192

193

Parameters:

194

- connection: Connection instance to mark as dead

195

- now: Current timestamp (auto-generated if None)

196

"""

197

198

def mark_live(self, connection):

199

"""Reset connection failure count and mark as available."""

200

201

def resurrect(self, force: bool = False):

202

"""

203

Attempt to revive dead connections after timeout period.

204

205

Parameters:

206

- force: Revive regardless of timeout (bool)

207

"""

208

209

def close(self):

210

"""Close all connections in the pool."""

211

212

class DummyConnectionPool:

213

"""

214

Single connection pool for cases with only one Elasticsearch node.

215

Provides same interface as ConnectionPool but manages single connection.

216

"""

217

```

218

219

### Connection Selection Strategies

220

221

Different strategies for selecting connections from the pool.

222

223

```python { .api }

224

class ConnectionSelector:

225

def __init__(self, opts: dict):

226

"""

227

Base class for connection selection strategies.

228

229

Parameters:

230

- opts: Selector configuration options

231

"""

232

233

def select(self, connections: list):

234

"""

235

Select connection from available connections.

236

237

Parameters:

238

- connections: List of available connections

239

240

Returns:

241

Connection: Selected connection instance

242

"""

243

244

class RoundRobinSelector(ConnectionSelector):

245

"""

246

Round-robin connection selection for even load distribution.

247

Cycles through available connections in order.

248

"""

249

250

class RandomSelector(ConnectionSelector):

251

"""

252

Random connection selection for simple load balancing.

253

Randomly selects from available connections.

254

"""

255

```

256

257

### HTTP Connection Implementations

258

259

Different HTTP client implementations for various use cases and environments.

260

261

```python { .api }

262

class Connection:

263

def __init__(

264

self,

265

host='localhost',

266

port=9200,

267

use_ssl=False,

268

url_prefix='',

269

timeout=10,

270

**kwargs

271

):

272

"""

273

Base connection class defining the connection interface.

274

275

Parameters:

276

- host: Elasticsearch host address

277

- port: Port number (default: 9200)

278

- use_ssl: Enable HTTPS (bool)

279

- url_prefix: URL prefix for requests

280

- timeout: Request timeout in seconds

281

- **kwargs: Implementation-specific parameters

282

"""

283

284

def perform_request(self, method: str, url: str, params: dict, body: bytes, timeout: float, ignore: tuple, headers: dict):

285

"""

286

Execute HTTP request (implemented by subclasses).

287

288

Returns:

289

tuple: (status_code, headers_dict, response_body)

290

"""

291

292

def log_request_success(self, method: str, full_url: str, path: str, body: bytes, status_code: int, response: str, duration: float):

293

"""Log successful request for debugging and monitoring."""

294

295

def log_request_fail(self, method: str, full_url: str, path: str, body: bytes, duration: float, status_code: int, response: str, exception: Exception):

296

"""Log failed request for debugging and error tracking."""

297

298

class Urllib3HttpConnection(Connection):

299

"""

300

HTTP connection using urllib3 library (default implementation).

301

302

Additional parameters:

303

- maxsize: Connection pool size (default: 10)

304

- block: Block when pool is full (bool, default: False)

305

- http_compress: Enable compression (bool, default: False)

306

- headers: Default headers dict

307

- ssl_context: Custom SSL context

308

- assert_same_host: Verify host matches (bool, default: True)

309

"""

310

311

class RequestsHttpConnection(Connection):

312

"""

313

HTTP connection using requests library (alternative implementation).

314

315

Additional parameters:

316

- pool_connections: Number of connection pools

317

- pool_maxsize: Maximum connections per pool

318

- max_retries: urllib3 retry configuration

319

- pool_block: Block when pool exhausted (bool)

320

"""

321

```

322

323

### Data Serialization

324

325

Handle request/response serialization with support for multiple formats.

326

327

```python { .api }

328

class JSONSerializer:

329

"""

330

Default JSON serializer for request/response data.

331

332

Attributes:

333

- mimetype: 'application/json'

334

"""

335

336

def loads(self, s: str) -> dict:

337

"""

338

Deserialize JSON string to Python object.

339

340

Parameters:

341

- s: JSON string to deserialize

342

343

Returns:

344

dict: Deserialized Python object

345

346

Raises:

347

SerializationError: If JSON is invalid

348

"""

349

350

def dumps(self, data: dict) -> str:

351

"""

352

Serialize Python object to JSON string.

353

354

Parameters:

355

- data: Python object to serialize

356

357

Returns:

358

str: JSON string representation

359

360

Raises:

361

SerializationError: If object cannot be serialized

362

"""

363

364

def default(self, data):

365

"""

366

Handle special data types during serialization.

367

Supports: datetime, date, Decimal, UUID objects

368

"""

369

370

class TextSerializer:

371

"""

372

Plain text serializer for string data.

373

374

Attributes:

375

- mimetype: 'text/plain'

376

"""

377

378

def loads(self, s: str) -> str:

379

"""Return string unchanged."""

380

381

def dumps(self, data: str) -> str:

382

"""Serialize string data only (raises error for other types)."""

383

384

class Deserializer:

385

def __init__(self, serializers: dict, default_mimetype: str = 'application/json'):

386

"""

387

Response deserializer supporting multiple content types.

388

389

Parameters:

390

- serializers: Dict mapping mimetypes to serializer instances

391

- default_mimetype: Fallback content type

392

"""

393

394

def loads(self, s: str, mimetype: str = None) -> dict:

395

"""

396

Deserialize response based on content type.

397

398

Parameters:

399

- s: Response string to deserialize

400

- mimetype: Content type (uses default if None)

401

402

Returns:

403

dict: Deserialized response

404

"""

405

```

406

407

### Host Discovery and Configuration

408

409

Automatic node discovery and host configuration management.

410

411

```python { .api }

412

def get_host_info(node_info: dict, host: dict) -> dict:

413

"""

414

Default callback to process discovered node information.

415

416

Parameters:

417

- node_info: Node information from cluster state

418

- host: Current host configuration

419

420

Returns:

421

dict: Updated host configuration with discovered information

422

423

Extracts:

424

- host: IP address or hostname

425

- port: HTTP port number

426

- use_ssl: Whether node uses HTTPS

427

- url_prefix: URL prefix if configured

428

"""

429

```

430

431

## Usage Examples

432

433

### Basic Client Configuration

434

435

```python

436

from elasticsearch5 import Elasticsearch

437

438

# Simple connection to local cluster

439

es = Elasticsearch()

440

441

# Multiple hosts for high availability

442

es = Elasticsearch([

443

'host1:9200',

444

'host2:9200',

445

'host3:9200'

446

])

447

448

# Detailed host configuration

449

es = Elasticsearch([

450

{'host': 'host1', 'port': 9200},

451

{'host': 'host2', 'port': 9200, 'use_ssl': True},

452

{'host': 'host3', 'port': 9243, 'url_prefix': '/elasticsearch'}

453

])

454

```

455

456

### SSL and Authentication

457

458

```python

459

# SSL with authentication

460

es = Elasticsearch(

461

['https://host1:9200'],

462

http_auth=('username', 'password'),

463

use_ssl=True,

464

verify_certs=True,

465

ca_certs='/path/to/ca.crt',

466

timeout=30

467

)

468

469

# Client certificate authentication

470

es = Elasticsearch(

471

['https://host1:9200'],

472

use_ssl=True,

473

verify_certs=True,

474

ca_certs='/path/to/ca.crt',

475

client_cert='/path/to/client.crt',

476

client_key='/path/to/client.key'

477

)

478

479

# Custom SSL context

480

import ssl

481

ssl_context = ssl.create_default_context(cafile='/path/to/ca.crt')

482

ssl_context.check_hostname = False

483

484

es = Elasticsearch(

485

['https://host1:9200'],

486

use_ssl=True,

487

ssl_context=ssl_context

488

)

489

```

490

491

### Advanced Transport Configuration

492

493

```python

494

from elasticsearch5 import (

495

Elasticsearch,

496

Transport,

497

RequestsHttpConnection,

498

RoundRobinSelector

499

)

500

501

# Custom transport with requests

502

es = Elasticsearch(

503

['host1:9200', 'host2:9200'],

504

connection_class=RequestsHttpConnection,

505

timeout=20,

506

max_retries=5,

507

retry_on_timeout=True,

508

retry_on_status=(429, 502, 503, 504),

509

http_compress=True

510

)

511

512

# Node discovery configuration

513

es = Elasticsearch(

514

['host1:9200'],

515

sniff_on_start=True,

516

sniff_on_connection_fail=True,

517

sniffer_timeout=60, # Sniff every 60 seconds

518

sniff_timeout=10 # 10 second sniff timeout

519

)

520

521

# Custom connection pool

522

from elasticsearch5 import ConnectionPool, RandomSelector

523

524

es = Elasticsearch(

525

['host1:9200', 'host2:9200', 'host3:9200'],

526

connection_pool_class=ConnectionPool,

527

selector_class=RandomSelector,

528

dead_timeout=30, # Retry dead connections after 30s

529

timeout_cutoff=3 # Mark dead after 3 failures

530

)

531

```

532

533

### Custom Serialization

534

535

```python

536

from elasticsearch5 import Elasticsearch, JSONSerializer

537

import json

538

from datetime import datetime

539

540

class CustomJSONSerializer(JSONSerializer):

541

def default(self, obj):

542

if isinstance(obj, datetime):

543

return obj.isoformat()

544

return super().default(obj)

545

546

# Use custom serializer

547

es = Elasticsearch(

548

['localhost:9200'],

549

serializer=CustomJSONSerializer()

550

)

551

552

# Multiple serializers for different content types

553

from elasticsearch5.serializers import TextSerializer

554

555

serializers = {

556

'application/json': CustomJSONSerializer(),

557

'text/plain': TextSerializer(),

558

'text/csv': TextSerializer()

559

}

560

561

es = Elasticsearch(

562

['localhost:9200'],

563

serializers=serializers,

564

default_mimetype='application/json'

565

)

566

```

567

568

### Connection Monitoring and Debugging

569

570

```python

571

import logging

572

573

# Enable debug logging

574

logging.basicConfig(level=logging.DEBUG)

575

logger = logging.getLogger('elasticsearch')

576

logger.setLevel(logging.DEBUG)

577

578

# Custom connection with monitoring

579

class MonitoredConnection(Urllib3HttpConnection):

580

def log_request_success(self, method, full_url, path, body, status_code, response, duration):

581

super().log_request_success(method, full_url, path, body, status_code, response, duration)

582

print(f"SUCCESS: {method} {path} -> {status_code} ({duration:.2f}s)")

583

584

def log_request_fail(self, method, full_url, path, body, duration, status_code, response, exception):

585

super().log_request_fail(method, full_url, path, body, duration, status_code, response, exception)

586

print(f"FAILED: {method} {path} -> {status_code} ({duration:.2f}s): {exception}")

587

588

es = Elasticsearch(

589

['localhost:9200'],

590

connection_class=MonitoredConnection

591

)

592

```

593

594

### Connection Pool Management

595

596

```python

597

# Access transport layer directly

598

transport = es.transport

599

600

# Add new connection at runtime

601

transport.add_connection({'host': 'new-host', 'port': 9200})

602

603

# Update entire connection list

604

transport.set_connections([

605

{'host': 'host1', 'port': 9200},

606

{'host': 'host2', 'port': 9200},

607

{'host': 'host3', 'port': 9200}

608

])

609

610

# Manual node discovery

611

transport.sniff_hosts()

612

613

# Check connection pool status

614

pool = transport.connection_pool

615

print(f"Total connections: {len(pool.connections)}")

616

print(f"Dead connections: {len(pool.dead_count)}")

617

618

# Force resurrection of dead connections

619

pool.resurrect(force=True)

620

621

# Clean shutdown

622

transport.close()

623

```

624

625

### Error Handling and Retries

626

627

```python

628

from elasticsearch5.exceptions import (

629

ConnectionError,

630

TransportError,

631

SerializationError

632

)

633

634

try:

635

es = Elasticsearch(

636

['host1:9200', 'host2:9200'],

637

max_retries=3,

638

retry_on_status=(429, 502, 503, 504),

639

retry_on_timeout=True,

640

timeout=10

641

)

642

643

result = es.search(index='my_index', body={'query': {'match_all': {}}})

644

645

except ConnectionError as e:

646

print(f"Connection failed: {e}")

647

except TransportError as e:

648

print(f"Transport error {e.status_code}: {e.error}")

649

except SerializationError as e:

650

print(f"Serialization error: {e}")

651

except Exception as e:

652

print(f"Unexpected error: {e}")

653

```

654

655

### Production Configuration

656

657

```python

658

# Production-ready configuration

659

es = Elasticsearch(

660

[

661

{'host': 'es-node-1', 'port': 9200},

662

{'host': 'es-node-2', 'port': 9200},

663

{'host': 'es-node-3', 'port': 9200}

664

],

665

666

# Security

667

http_auth=('username', 'password'),

668

use_ssl=True,

669

verify_certs=True,

670

ca_certs='/etc/ssl/certs/elasticsearch-ca.pem',

671

672

# Performance

673

timeout=30,

674

max_retries=3,

675

retry_on_status=(429, 502, 503, 504),

676

retry_on_timeout=True,

677

http_compress=True,

678

679

# Reliability

680

sniff_on_start=True,

681

sniff_on_connection_fail=True,

682

sniffer_timeout=60,

683

dead_timeout=30,

684

685

# Connection pool

686

maxsize=25, # Max connections per host

687

selector_class=RoundRobinSelector,

688

689

# Headers

690

headers={'User-Agent': 'MyApp/1.0'}

691

)

692

```