or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregations.mdanalysis.mdconnections.mddocument-operations.mdfield-types.mdindex-management.mdindex.mdsearch-queries.md

connections.mddocs/

0

# Connections

1

2

Connection configuration and management for single and multiple Elasticsearch clusters with comprehensive support for authentication, SSL/TLS, connection pooling, retry logic, and both synchronous and asynchronous clients. Enables flexible deployment architectures and secure cluster communication.

3

4

## Capabilities

5

6

### Connection Management Functions

7

8

Core functions for managing Elasticsearch connections.

9

10

```python { .api }

11

def create_connection(alias='default', **kwargs):

12

"""

13

Create and register new Elasticsearch connection.

14

15

Args:

16

alias (str): Connection alias name (default: 'default')

17

**kwargs: Connection parameters passed to Elasticsearch client

18

19

Returns:

20

Elasticsearch: Elasticsearch client instance

21

22

Connection Parameters:

23

hosts (list or str): Elasticsearch host(s)

24

http_auth (tuple): HTTP authentication (username, password)

25

http_compress (bool): Enable HTTP compression

26

timeout (int): Request timeout in seconds

27

max_retries (int): Maximum number of retries

28

retry_on_timeout (bool): Retry on timeout errors

29

retry_on_status (list): HTTP status codes to retry on

30

sniff_on_start (bool): Sniff nodes on startup

31

sniff_on_connection_fail (bool): Sniff on connection failure

32

sniff_timeout (int): Sniffer timeout

33

sniffer_delay (int): Delay between sniff operations

34

randomize_hosts (bool): Randomize host selection

35

use_ssl (bool): Use SSL/TLS

36

verify_certs (bool): Verify SSL certificates

37

ssl_show_warn (bool): Show SSL warnings

38

ca_certs (str): Path to CA certificates

39

client_cert (str): Path to client certificate

40

client_key (str): Path to client private key

41

ssl_version (str): SSL/TLS version

42

ssl_assert_hostname (bool): Assert hostname in certificate

43

ssl_assert_fingerprint (str): Assert certificate fingerprint

44

headers (dict): Default HTTP headers

45

connections_per_node (int): Connections per node

46

http_compress (bool): Enable gzip compression

47

cloud_id (str): Elastic Cloud cluster ID

48

api_key (tuple): API key authentication (id, key)

49

bearer_auth (str): Bearer token authentication

50

opaque_id (str): Opaque ID for request identification

51

"""

52

53

def add_connection(alias, connection):

54

"""

55

Add existing connection to registry.

56

57

Args:

58

alias (str): Connection alias name

59

connection: Elasticsearch client instance

60

"""

61

62

def remove_connection(alias):

63

"""

64

Remove connection from registry.

65

66

Args:

67

alias (str): Connection alias name

68

69

Raises:

70

KeyError: If connection alias doesn't exist

71

"""

72

73

def get_connection(alias='default'):

74

"""

75

Retrieve connection by alias.

76

77

Args:

78

alias (str): Connection alias name (default: 'default')

79

80

Returns:

81

Elasticsearch: Elasticsearch client instance

82

83

Raises:

84

KeyError: If connection alias doesn't exist

85

"""

86

87

def configure(**kwargs):

88

"""

89

Configure default connection with given parameters.

90

91

Args:

92

**kwargs: Connection parameters (same as create_connection)

93

94

Returns:

95

Elasticsearch: Configured default connection

96

"""

97

```

98

99

### Connection Registry

100

101

Global connection registry for managing multiple connections.

102

103

```python { .api }

104

class Connections:

105

"""

106

Global connection registry singleton.

107

"""

108

109

def create_connection(self, alias='default', **kwargs):

110

"""Create and register connection (same as module function)."""

111

112

def add_connection(self, alias, connection):

113

"""Add existing connection (same as module function)."""

114

115

def remove_connection(self, alias):

116

"""Remove connection (same as module function)."""

117

118

def get_connection(self, alias='default'):

119

"""Get connection (same as module function)."""

120

121

def configure(self, **kwargs):

122

"""Configure default connection (same as module function)."""

123

124

def all(self):

125

"""

126

Get all registered connections.

127

128

Returns:

129

dict: Mapping of alias to connection

130

"""

131

132

# Global connections instance

133

connections: Connections

134

```

135

136

### Async Connection Management

137

138

Asynchronous connection management for async/await operations.

139

140

```python { .api }

141

def create_async_connection(alias='default', **kwargs):

142

"""

143

Create and register new async Elasticsearch connection.

144

145

Args:

146

alias (str): Connection alias name (default: 'default')

147

**kwargs: Connection parameters (same as create_connection)

148

149

Returns:

150

AsyncElasticsearch: Async Elasticsearch client instance

151

"""

152

153

def add_async_connection(alias, connection):

154

"""

155

Add existing async connection to registry.

156

157

Args:

158

alias (str): Connection alias name

159

connection: AsyncElasticsearch client instance

160

"""

161

162

def remove_async_connection(alias):

163

"""

164

Remove async connection from registry.

165

166

Args:

167

alias (str): Connection alias name

168

169

Raises:

170

KeyError: If connection alias doesn't exist

171

"""

172

173

def get_async_connection(alias='default'):

174

"""

175

Retrieve async connection by alias.

176

177

Args:

178

alias (str): Connection alias name (default: 'default')

179

180

Returns:

181

AsyncElasticsearch: Async Elasticsearch client instance

182

183

Raises:

184

KeyError: If connection alias doesn't exist

185

"""

186

187

def configure_async(**kwargs):

188

"""

189

Configure default async connection with given parameters.

190

191

Args:

192

**kwargs: Connection parameters (same as create_connection)

193

194

Returns:

195

AsyncElasticsearch: Configured default async connection

196

"""

197

```

198

199

### Async Connection Registry

200

201

Async connection registry for managing multiple async connections.

202

203

```python { .api }

204

class AsyncConnections:

205

"""

206

Global async connection registry singleton.

207

"""

208

209

def create_connection(self, alias='default', **kwargs):

210

"""Create and register async connection."""

211

212

def add_connection(self, alias, connection):

213

"""Add existing async connection."""

214

215

def remove_connection(self, alias):

216

"""Remove async connection."""

217

218

def get_connection(self, alias='default'):

219

"""Get async connection."""

220

221

def configure(self, **kwargs):

222

"""Configure default async connection."""

223

224

def all(self):

225

"""

226

Get all registered async connections.

227

228

Returns:

229

dict: Mapping of alias to async connection

230

"""

231

232

# Global async connections instance

233

async_connections: AsyncConnections

234

```

235

236

## Usage Examples

237

238

### Basic Connection Setup

239

240

```python

241

from elasticsearch_dsl import connections, Document, Text

242

243

# Simple connection to localhost

244

connections.create_connection(hosts=['localhost:9200'])

245

246

# Connection with authentication

247

connections.create_connection(

248

alias='secure',

249

hosts=['https://elasticsearch.example.com:9200'],

250

http_auth=('username', 'password'),

251

use_ssl=True,

252

verify_certs=True

253

)

254

255

# Use connection in document operations

256

class Article(Document):

257

title = Text()

258

content = Text()

259

260

class Index:

261

name = 'articles'

262

263

# Document operations will use default connection

264

article = Article(title='Test', content='Content')

265

article.save()

266

267

# Use specific connection

268

article.save(using='secure')

269

```

270

271

### Multiple Cluster Configuration

272

273

```python

274

from elasticsearch_dsl import connections

275

276

# Production cluster

277

connections.create_connection(

278

alias='production',

279

hosts=['prod-es-1.example.com:9200', 'prod-es-2.example.com:9200'],

280

http_auth=('prod_user', 'prod_password'),

281

use_ssl=True,

282

verify_certs=True,

283

ca_certs='/path/to/ca.pem',

284

timeout=30,

285

max_retries=3,

286

retry_on_timeout=True,

287

sniff_on_start=True,

288

sniff_on_connection_fail=True,

289

sniff_timeout=10,

290

randomize_hosts=True

291

)

292

293

# Development cluster

294

connections.create_connection(

295

alias='development',

296

hosts=['dev-es.example.com:9200'],

297

http_auth=('dev_user', 'dev_password'),

298

timeout=10,

299

max_retries=1

300

)

301

302

# Analytics cluster (read-only)

303

connections.create_connection(

304

alias='analytics',

305

hosts=['analytics-es.example.com:9200'],

306

http_auth=('analytics_user', 'analytics_password'),

307

use_ssl=True,

308

timeout=60 # Longer timeout for analytics queries

309

)

310

311

# Use different connections for different operations

312

from elasticsearch_dsl import Search

313

314

# Search production data

315

search = Search(using='production', index='logs')

316

response = search.execute()

317

318

# Run analytics on dedicated cluster

319

analytics_search = Search(using='analytics', index='metrics')

320

analytics_response = analytics_search.execute()

321

```

322

323

### SSL/TLS and Authentication

324

325

```python

326

from elasticsearch_dsl import connections

327

328

# SSL with client certificates

329

connections.create_connection(

330

alias='ssl_client_cert',

331

hosts=['https://secure-es.example.com:9200'],

332

use_ssl=True,

333

verify_certs=True,

334

ca_certs='/path/to/ca-certificates.crt',

335

client_cert='/path/to/client.crt',

336

client_key='/path/to/client.key',

337

ssl_assert_hostname=True

338

)

339

340

# API Key authentication

341

connections.create_connection(

342

alias='api_key',

343

hosts=['https://es.example.com:9200'],

344

api_key=('api_key_id', 'api_key_secret'),

345

use_ssl=True,

346

verify_certs=True

347

)

348

349

# Bearer token authentication

350

connections.create_connection(

351

alias='bearer_token',

352

hosts=['https://es.example.com:9200'],

353

bearer_auth='your_bearer_token_here',

354

use_ssl=True

355

)

356

357

# Elastic Cloud connection

358

connections.create_connection(

359

alias='elastic_cloud',

360

cloud_id='cluster_name:base64_encoded_endpoint',

361

http_auth=('elastic_username', 'elastic_password')

362

)

363

364

# Custom headers and opaque ID

365

connections.create_connection(

366

alias='custom_headers',

367

hosts=['https://es.example.com:9200'],

368

headers={'Custom-Header': 'value'},

369

opaque_id='my-application-v1.0',

370

http_auth=('username', 'password')

371

)

372

```

373

374

### Connection Pooling and Performance

375

376

```python

377

from elasticsearch_dsl import connections

378

379

# High-performance configuration

380

connections.create_connection(

381

alias='high_performance',

382

hosts=[

383

'es-node-01.example.com:9200',

384

'es-node-02.example.com:9200',

385

'es-node-03.example.com:9200'

386

],

387

# Connection pooling

388

connections_per_node=10,

389

390

# Retry configuration

391

max_retries=5,

392

retry_on_timeout=True,

393

retry_on_status=[429, 502, 503, 504],

394

395

# Sniffing for node discovery

396

sniff_on_start=True,

397

sniff_on_connection_fail=True,

398

sniff_timeout=5,

399

sniffer_delay=60,

400

401

# Performance optimizations

402

http_compress=True,

403

randomize_hosts=True,

404

405

# Timeouts

406

timeout=20,

407

408

# Authentication

409

http_auth=('username', 'password'),

410

use_ssl=True,

411

verify_certs=True

412

)

413

414

# Bulk operations configuration

415

connections.create_connection(

416

alias='bulk_operations',

417

hosts=['bulk-es.example.com:9200'],

418

timeout=300, # 5 minute timeout for bulk operations

419

max_retries=1, # Fewer retries for bulk

420

http_compress=True, # Important for bulk data

421

connections_per_node=20, # More connections for concurrent bulk

422

http_auth=('bulk_user', 'bulk_password')

423

)

424

```

425

426

### Async Connection Setup

427

428

```python

429

from elasticsearch_dsl import async_connections

430

import asyncio

431

432

async def setup_async_connections():

433

# Create async connection

434

await async_connections.create_connection(

435

alias='async_default',

436

hosts=['localhost:9200'],

437

timeout=30,

438

max_retries=3

439

)

440

441

# Async connection with authentication

442

await async_connections.create_connection(

443

alias='async_secure',

444

hosts=['https://es.example.com:9200'],

445

http_auth=('username', 'password'),

446

use_ssl=True,

447

verify_certs=True,

448

timeout=60

449

)

450

451

# Run async setup

452

asyncio.run(setup_async_connections())

453

454

# Use async connections with AsyncDocument and AsyncSearch

455

from elasticsearch_dsl import AsyncDocument, AsyncSearch

456

457

class AsyncArticle(AsyncDocument):

458

title = Text()

459

content = Text()

460

461

class Index:

462

name = 'async_articles'

463

464

async def async_operations():

465

# Create and save document

466

article = AsyncArticle(title='Async Test', content='Async content')

467

await article.save(using='async_default')

468

469

# Async search

470

search = AsyncSearch(using='async_secure', index='logs')

471

search = search.query('match', message='error')

472

response = await search.execute()

473

474

for hit in response:

475

print(f"Log: {hit.message}")

476

477

# Run async operations

478

asyncio.run(async_operations())

479

```

480

481

### Connection Health and Monitoring

482

483

```python

484

from elasticsearch_dsl import connections

485

from elasticsearch.exceptions import ConnectionError, TransportError

486

487

def check_connection_health(alias='default'):

488

"""Check health of Elasticsearch connection."""

489

try:

490

client = connections.get_connection(alias)

491

492

# Check cluster health

493

health = client.cluster.health()

494

print(f"Cluster status: {health['status']}")

495

print(f"Number of nodes: {health['number_of_nodes']}")

496

print(f"Active shards: {health['active_shards']}")

497

498

# Check node info

499

nodes = client.nodes.info()

500

for node_id, node_info in nodes['nodes'].items():

501

print(f"Node {node_id}: {node_info['name']} ({node_info['version']})")

502

503

# Test with simple search

504

client.search(index='_all', body={'query': {'match_all': {}}}, size=1)

505

print("Connection test successful!")

506

507

return True

508

509

except ConnectionError as e:

510

print(f"Connection error: {e}")

511

return False

512

except TransportError as e:

513

print(f"Transport error: {e}")

514

return False

515

except Exception as e:

516

print(f"Unexpected error: {e}")

517

return False

518

519

# Check all connections

520

for alias in connections.all():

521

print(f"Checking connection '{alias}':")

522

check_connection_health(alias)

523

print("-" * 40)

524

```

525

526

### Dynamic Connection Management

527

528

```python

529

from elasticsearch_dsl import connections, Document, Text

530

import os

531

532

class ConfigurableDocument(Document):

533

"""Document that uses environment-based connection selection."""

534

535

title = Text()

536

content = Text()

537

538

class Index:

539

name = 'configurable_docs'

540

541

def save(self, **kwargs):

542

# Use environment-specific connection

543

env = os.getenv('ENVIRONMENT', 'development')

544

connection_alias = f'es_{env}'

545

546

if connection_alias not in connections.all():

547

self._setup_connection(env)

548

549

return super().save(using=connection_alias, **kwargs)

550

551

def _setup_connection(self, env):

552

"""Setup connection based on environment."""

553

if env == 'production':

554

connections.create_connection(

555

alias='es_production',

556

hosts=os.getenv('ES_PROD_HOSTS', 'localhost:9200').split(','),

557

http_auth=(

558

os.getenv('ES_PROD_USER'),

559

os.getenv('ES_PROD_PASSWORD')

560

),

561

use_ssl=True,

562

verify_certs=True,

563

timeout=30

564

)

565

elif env == 'staging':

566

connections.create_connection(

567

alias='es_staging',

568

hosts=os.getenv('ES_STAGING_HOSTS', 'localhost:9200').split(','),

569

http_auth=(

570

os.getenv('ES_STAGING_USER'),

571

os.getenv('ES_STAGING_PASSWORD')

572

),

573

timeout=20

574

)

575

else: # development

576

connections.create_connection(

577

alias='es_development',

578

hosts=['localhost:9200'],

579

timeout=10

580

)

581

582

# Usage

583

doc = ConfigurableDocument(title='Test', content='Content')

584

doc.save() # Will use appropriate connection based on ENVIRONMENT variable

585

```

586

587

### Connection Error Handling

588

589

```python

590

from elasticsearch_dsl import connections, Search

591

from elasticsearch.exceptions import (

592

ConnectionError, ConnectionTimeout, TransportError,

593

NotFoundError, RequestError

594

)

595

import time

596

597

def robust_search_with_retry(index, query, max_retries=3, delay=1):

598

"""Perform search with connection retry logic."""

599

600

for attempt in range(max_retries):

601

try:

602

search = Search(index=index)

603

search = search.query('match', content=query)

604

response = search.execute()

605

return response

606

607

except ConnectionTimeout:

608

print(f"Attempt {attempt + 1}: Connection timeout")

609

if attempt < max_retries - 1:

610

time.sleep(delay * (2 ** attempt)) # Exponential backoff

611

continue

612

raise

613

614

except ConnectionError as e:

615

print(f"Attempt {attempt + 1}: Connection error - {e}")

616

if attempt < max_retries - 1:

617

# Try to recreate connection

618

try:

619

connections.remove_connection('default')

620

except KeyError:

621

pass

622

connections.create_connection(hosts=['localhost:9200'])

623

time.sleep(delay)

624

continue

625

raise

626

627

except TransportError as e:

628

if e.status_code in [429, 502, 503, 504]: # Retry on these errors

629

print(f"Attempt {attempt + 1}: Transport error {e.status_code}")

630

if attempt < max_retries - 1:

631

time.sleep(delay * (2 ** attempt))

632

continue

633

raise

634

635

except (NotFoundError, RequestError):

636

# Don't retry on client errors

637

raise

638

639

except Exception as e:

640

print(f"Unexpected error: {e}")

641

raise

642

643

# Usage

644

try:

645

results = robust_search_with_retry('articles', 'elasticsearch')

646

print(f"Found {len(results)} results")

647

except Exception as e:

648

print(f"Search failed after retries: {e}")

649

```