or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-cursors.mdbatch-operations.mdconnection-pooling.mdconnections-cursors.mderror-handling.mdindex.mdreplication.mdsql-composition.mdtimezone-support.mdtype-adaptation.md

connection-pooling.mddocs/

0

# Connection Pooling

1

2

Thread-safe and non-thread-safe connection pools for managing database connections efficiently in multi-threaded applications, reducing connection overhead and improving performance.

3

4

## Capabilities

5

6

### Pool Exception Handling

7

8

Specialized exception for pool-related errors.

9

10

```python { .api }

11

class PoolError(psycopg2.Error):

12

"""Pool-related errors."""

13

```

14

15

### Abstract Connection Pool

16

17

Base class for all connection pool implementations with core pooling functionality.

18

19

```python { .api }

20

class AbstractConnectionPool:

21

"""Base connection pool class."""

22

23

def __init__(self, minconn, maxconn, *args, **kwargs):

24

"""

25

Initialize pool.

26

27

Parameters:

28

- minconn (int): Minimum connections to maintain

29

- maxconn (int): Maximum connections allowed

30

- *args: Connection arguments passed to psycopg2.connect()

31

- **kwargs: Connection keyword arguments

32

"""

33

34

@property

35

def minconn(self):

36

"""Minimum connections to maintain."""

37

38

@property

39

def maxconn(self):

40

"""Maximum connections allowed."""

41

42

@property

43

def closed(self):

44

"""Pool closed status."""

45

46

def _connect(self, key=None):

47

"""

48

Create new connection (internal).

49

50

Parameters:

51

- key: Connection key for identification

52

53

Returns:

54

connection: New database connection

55

"""

56

57

def _getkey(self):

58

"""Generate unique key (internal)."""

59

60

def _getconn(self, key=None):

61

"""

62

Get connection (internal).

63

64

Parameters:

65

- key: Connection key

66

67

Returns:

68

connection: Database connection

69

"""

70

71

def _putconn(self, conn, key=None, close=False):

72

"""

73

Return connection (internal).

74

75

Parameters:

76

- conn: Connection to return

77

- key: Connection key

78

- close (bool): Force close connection

79

"""

80

81

def _closeall(self):

82

"""Close all connections (internal)."""

83

```

84

85

### Simple Connection Pool

86

87

Non-thread-safe connection pool for single-threaded applications with direct access to pool methods.

88

89

```python { .api }

90

class SimpleConnectionPool(AbstractConnectionPool):

91

"""Non-threadsafe connection pool."""

92

93

def __init__(self, minconn, maxconn, *args, **kwargs):

94

"""

95

Initialize simple pool.

96

97

Parameters:

98

- minconn (int): Minimum connections (1-maxconn)

99

- maxconn (int): Maximum connections

100

- *args: Connection arguments

101

- **kwargs: Connection keyword arguments

102

"""

103

104

def getconn(self, key=None):

105

"""

106

Get connection from pool.

107

108

Parameters:

109

- key: Optional connection key for tracking

110

111

Returns:

112

connection: Database connection

113

114

Raises:

115

PoolError: If no connections available

116

"""

117

118

def putconn(self, conn=None, key=None, close=False):

119

"""

120

Return connection to pool.

121

122

Parameters:

123

- conn: Connection to return (if None, uses key)

124

- key: Connection key

125

- close (bool): Force close connection instead of pooling

126

"""

127

128

def closeall(self):

129

"""Close all connections and reset pool."""

130

```

131

132

**Usage Example:**

133

134

```python

135

import psycopg2

136

from psycopg2.pool import SimpleConnectionPool

137

138

# Create connection pool

139

pool = SimpleConnectionPool(

140

minconn=1,

141

maxconn=5,

142

host="localhost",

143

database="mydb",

144

user="myuser",

145

password="mypass"

146

)

147

148

# Get connection from pool

149

conn = pool.getconn()

150

151

try:

152

with conn.cursor() as cur:

153

cur.execute("SELECT * FROM users")

154

users = cur.fetchall()

155

156

# Connection is still open, can be reused

157

with conn.cursor() as cur:

158

cur.execute("INSERT INTO logs (message) VALUES (%s)", ("User query",))

159

conn.commit()

160

161

finally:

162

# Return connection to pool (don't close it)

163

pool.putconn(conn)

164

165

# Get another connection (might be the same one)

166

conn2 = pool.getconn()

167

with conn2.cursor() as cur:

168

cur.execute("SELECT COUNT(*) FROM users")

169

count = cur.fetchone()[0]

170

print(f"Total users: {count}")

171

172

pool.putconn(conn2)

173

174

# Clean up - close all connections

175

pool.closeall()

176

```

177

178

### Threaded Connection Pool

179

180

Thread-safe connection pool for multi-threaded applications with automatic locking for concurrent access.

181

182

```python { .api }

183

class ThreadedConnectionPool(AbstractConnectionPool):

184

"""Thread-safe connection pool."""

185

186

def __init__(self, minconn, maxconn, *args, **kwargs):

187

"""

188

Initialize threaded pool with locking.

189

190

Parameters:

191

- minconn (int): Minimum connections (1-maxconn)

192

- maxconn (int): Maximum connections

193

- *args: Connection arguments

194

- **kwargs: Connection keyword arguments

195

"""

196

197

def getconn(self, key=None):

198

"""

199

Get connection (thread-safe).

200

201

Parameters:

202

- key: Optional connection key for tracking

203

204

Returns:

205

connection: Database connection

206

207

Raises:

208

PoolError: If no connections available

209

"""

210

211

def putconn(self, conn=None, key=None, close=False):

212

"""

213

Return connection (thread-safe).

214

215

Parameters:

216

- conn: Connection to return (if None, uses key)

217

- key: Connection key

218

- close (bool): Force close connection

219

"""

220

221

def closeall(self):

222

"""Close all connections (thread-safe)."""

223

```

224

225

**Usage Example:**

226

227

```python

228

import psycopg2

229

import threading

230

import time

231

from psycopg2.pool import ThreadedConnectionPool

232

233

# Create thread-safe pool

234

pool = ThreadedConnectionPool(

235

minconn=2,

236

maxconn=10,

237

host="localhost",

238

database="mydb",

239

user="myuser",

240

password="mypass"

241

)

242

243

def worker_function(worker_id):

244

"""Worker function for multi-threading."""

245

try:

246

# Get connection (thread-safe)

247

conn = pool.getconn()

248

print(f"Worker {worker_id} got connection")

249

250

with conn.cursor() as cur:

251

# Simulate work

252

cur.execute("SELECT pg_sleep(1)")

253

cur.execute("SELECT %s as worker_id", (worker_id,))

254

result = cur.fetchone()

255

print(f"Worker {worker_id} completed: {result[0]}")

256

257

except Exception as e:

258

print(f"Worker {worker_id} error: {e}")

259

finally:

260

# Always return connection to pool

261

pool.putconn(conn)

262

print(f"Worker {worker_id} returned connection")

263

264

# Create multiple threads

265

threads = []

266

for i in range(5):

267

thread = threading.Thread(target=worker_function, args=(i,))

268

threads.append(thread)

269

thread.start()

270

271

# Wait for all threads to complete

272

for thread in threads:

273

thread.join()

274

275

print("All workers completed")

276

pool.closeall()

277

```

278

279

### Pool Configuration and Management

280

281

Advanced pool configuration and monitoring patterns.

282

283

**Usage Example:**

284

285

```python

286

import psycopg2

287

from psycopg2.pool import ThreadedConnectionPool, PoolError

288

import contextlib

289

import logging

290

291

# Configure logging

292

logging.basicConfig(level=logging.INFO)

293

logger = logging.getLogger(__name__)

294

295

class ManagedConnectionPool:

296

"""Wrapper for enhanced pool management."""

297

298

def __init__(self, minconn, maxconn, **db_params):

299

self.pool = ThreadedConnectionPool(minconn, maxconn, **db_params)

300

self.logger = logger

301

302

@contextlib.contextmanager

303

def get_connection(self):

304

"""Context manager for automatic connection handling."""

305

conn = None

306

try:

307

conn = self.pool.getconn()

308

self.logger.info("Connection acquired from pool")

309

yield conn

310

except PoolError as e:

311

self.logger.error(f"Pool error: {e}")

312

raise

313

except Exception as e:

314

if conn:

315

conn.rollback()

316

self.logger.error(f"Database error: {e}")

317

raise

318

finally:

319

if conn:

320

self.pool.putconn(conn)

321

self.logger.info("Connection returned to pool")

322

323

def execute_query(self, query, params=None):

324

"""Execute query with automatic connection management."""

325

with self.get_connection() as conn:

326

with conn.cursor() as cur:

327

cur.execute(query, params)

328

if cur.description: # SELECT query

329

return cur.fetchall()

330

else: # INSERT/UPDATE/DELETE

331

conn.commit()

332

return cur.rowcount

333

334

def close(self):

335

"""Close all pool connections."""

336

self.pool.closeall()

337

self.logger.info("Pool closed")

338

339

# Usage

340

managed_pool = ManagedConnectionPool(

341

minconn=2,

342

maxconn=8,

343

host="localhost",

344

database="mydb",

345

user="myuser",

346

password="mypass"

347

)

348

349

try:

350

# Simple query execution

351

users = managed_pool.execute_query("SELECT id, name FROM users LIMIT 5")

352

for user in users:

353

print(f"User: {user[1]}")

354

355

# Insert with parameters

356

rows_affected = managed_pool.execute_query(

357

"INSERT INTO logs (message, level) VALUES (%s, %s)",

358

("Application started", "INFO")

359

)

360

print(f"Inserted {rows_affected} rows")

361

362

# Using context manager directly

363

with managed_pool.get_connection() as conn:

364

with conn.cursor() as cur:

365

cur.execute("BEGIN")

366

cur.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))

367

cur.execute("INSERT INTO profiles (user_id, bio) VALUES (currval('users_id_seq'), %s)", ("Bio",))

368

conn.commit()

369

370

finally:

371

managed_pool.close()

372

```

373

374

### Pool Monitoring and Health Checks

375

376

Monitor pool health and connection status.

377

378

**Usage Example:**

379

380

```python

381

import psycopg2

382

from psycopg2.pool import ThreadedConnectionPool

383

import threading

384

import time

385

386

class MonitoredPool:

387

"""Connection pool with monitoring capabilities."""

388

389

def __init__(self, minconn, maxconn, **db_params):

390

self.pool = ThreadedConnectionPool(minconn, maxconn, **db_params)

391

self.stats = {

392

'connections_created': 0,

393

'connections_borrowed': 0,

394

'connections_returned': 0,

395

'active_connections': 0

396

}

397

self._lock = threading.Lock()

398

399

def getconn(self, key=None):

400

"""Get connection with stats tracking."""

401

with self._lock:

402

self.stats['connections_borrowed'] += 1

403

self.stats['active_connections'] += 1

404

405

conn = self.pool.getconn(key)

406

407

# Test connection health

408

try:

409

with conn.cursor() as cur:

410

cur.execute("SELECT 1")

411

except psycopg2.Error:

412

# Connection is bad, close it and get a new one

413

self.pool.putconn(conn, close=True)

414

conn = self.pool.getconn(key)

415

with self._lock:

416

self.stats['connections_created'] += 1

417

418

return conn

419

420

def putconn(self, conn, key=None, close=False):

421

"""Return connection with stats tracking."""

422

self.pool.putconn(conn, key, close)

423

with self._lock:

424

self.stats['connections_returned'] += 1

425

self.stats['active_connections'] -= 1

426

427

def get_stats(self):

428

"""Get pool statistics."""

429

with self._lock:

430

return self.stats.copy()

431

432

def health_check(self):

433

"""Perform pool health check."""

434

try:

435

conn = self.pool.getconn()

436

with conn.cursor() as cur:

437

cur.execute("SELECT version()")

438

version = cur.fetchone()[0]

439

self.pool.putconn(conn)

440

return True, f"Pool healthy - {version}"

441

except Exception as e:

442

return False, f"Pool unhealthy - {e}"

443

444

def closeall(self):

445

"""Close all connections."""

446

self.pool.closeall()

447

448

# Usage

449

monitored_pool = MonitoredPool(

450

minconn=2,

451

maxconn=6,

452

host="localhost",

453

database="mydb",

454

user="myuser",

455

password="mypass"

456

)

457

458

# Simulate usage

459

def simulate_work():

460

conn = monitored_pool.getconn()

461

time.sleep(0.1) # Simulate work

462

monitored_pool.putconn(conn)

463

464

# Run multiple workers

465

threads = [threading.Thread(target=simulate_work) for _ in range(10)]

466

for t in threads:

467

t.start()

468

for t in threads:

469

t.join()

470

471

# Check stats

472

stats = monitored_pool.get_stats()

473

print(f"Pool stats: {stats}")

474

475

# Health check

476

healthy, message = monitored_pool.health_check()

477

print(f"Health check: {message}")

478

479

monitored_pool.closeall()

480

```

481

482

### Error Handling and Recovery

483

484

Robust error handling patterns for connection pools.

485

486

**Usage Example:**

487

488

```python

489

import psycopg2

490

from psycopg2.pool import ThreadedConnectionPool, PoolError

491

import time

492

import logging

493

494

logger = logging.getLogger(__name__)

495

496

class ResilientPool:

497

"""Connection pool with error recovery."""

498

499

def __init__(self, minconn, maxconn, **db_params):

500

self.minconn = minconn

501

self.maxconn = maxconn

502

self.db_params = db_params

503

self.pool = None

504

self._create_pool()

505

506

def _create_pool(self):

507

"""Create or recreate pool."""

508

try:

509

if self.pool:

510

self.pool.closeall()

511

self.pool = ThreadedConnectionPool(

512

self.minconn, self.maxconn, **self.db_params

513

)

514

logger.info("Pool created successfully")

515

except Exception as e:

516

logger.error(f"Failed to create pool: {e}")

517

raise

518

519

def get_connection(self, retry_count=3):

520

"""Get connection with retry logic."""

521

for attempt in range(retry_count):

522

try:

523

if not self.pool:

524

self._create_pool()

525

526

conn = self.pool.getconn()

527

528

# Test connection

529

with conn.cursor() as cur:

530

cur.execute("SELECT 1")

531

532

return conn

533

534

except PoolError as e:

535

logger.warning(f"Pool exhausted (attempt {attempt + 1}): {e}")

536

if attempt < retry_count - 1:

537

time.sleep(0.1 * (attempt + 1)) # Exponential backoff

538

else:

539

raise

540

541

except psycopg2.OperationalError as e:

542

logger.error(f"Database connection error (attempt {attempt + 1}): {e}")

543

if attempt < retry_count - 1:

544

# Try to recreate pool

545

self._create_pool()

546

time.sleep(0.5 * (attempt + 1))

547

else:

548

raise

549

550

def return_connection(self, conn, close_on_error=True):

551

"""Return connection with error handling."""

552

try:

553

if conn.closed:

554

logger.warning("Returning closed connection")

555

return

556

557

# Check if connection is in a transaction

558

if conn.status != 1: # STATUS_READY

559

conn.rollback()

560

logger.info("Rolled back transaction before returning connection")

561

562

self.pool.putconn(conn)

563

564

except Exception as e:

565

logger.error(f"Error returning connection: {e}")

566

if close_on_error:

567

try:

568

self.pool.putconn(conn, close=True)

569

except:

570

pass

571

572

def close(self):

573

"""Close pool."""

574

if self.pool:

575

self.pool.closeall()

576

self.pool = None

577

578

# Usage

579

resilient_pool = ResilientPool(

580

minconn=1,

581

maxconn=5,

582

host="localhost",

583

database="mydb",

584

user="myuser",

585

password="mypass"

586

)

587

588

try:

589

conn = resilient_pool.get_connection()

590

591

with conn.cursor() as cur:

592

cur.execute("SELECT * FROM users LIMIT 1")

593

result = cur.fetchone()

594

print(f"Query result: {result}")

595

596

resilient_pool.return_connection(conn)

597

598

except Exception as e:

599

print(f"Error: {e}")

600

601

finally:

602

resilient_pool.close()

603

```

604

605

## Types

606

607

### Pool Classes Hierarchy

608

609

```python { .api }

610

class AbstractConnectionPool:

611

"""Base pool class."""

612

613

minconn: int # Minimum connections

614

maxconn: int # Maximum connections

615

closed: bool # Pool status

616

617

class SimpleConnectionPool(AbstractConnectionPool):

618

"""Non-thread-safe pool."""

619

620

def getconn(self, key=None) -> connection:

621

"""Get connection."""

622

623

def putconn(self, conn=None, key=None, close=False) -> None:

624

"""Return connection."""

625

626

def closeall(self) -> None:

627

"""Close all connections."""

628

629

class ThreadedConnectionPool(AbstractConnectionPool):

630

"""Thread-safe pool."""

631

632

def getconn(self, key=None) -> connection:

633

"""Get connection (thread-safe)."""

634

635

def putconn(self, conn=None, key=None, close=False) -> None:

636

"""Return connection (thread-safe)."""

637

638

def closeall(self) -> None:

639

"""Close all connections (thread-safe)."""

640

```

641

642

### Pool Error Types

643

644

```python { .api }

645

class PoolError(psycopg2.Error):

646

"""Pool-related errors."""

647

```

648

649

### Pool Configuration Parameters

650

651

```python { .api }

652

# Pool initialization parameters

653

minconn: int # Minimum connections (1 <= minconn <= maxconn)

654

maxconn: int # Maximum connections

655

*args: tuple # Positional arguments for psycopg2.connect()

656

**kwargs: dict # Keyword arguments for psycopg2.connect()

657

658

# Common connection parameters

659

host: str # Database host

660

port: int # Database port (default: 5432)

661

database: str # Database name

662

user: str # Username

663

password: str # Password

664

```