or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mdconnection-pooling.mdconnections-cursors.mdcursors-rows.mderror-handling.mdindex.mdsql-composition.mdtypes-adaptation.md

connection-pooling.mddocs/

0

# Connection Pooling

1

2

Thread-safe and simple connection pools for managing database connections in multi-threaded applications. Connection pooling reduces connection overhead, manages connection limits, and provides automatic connection recycling with configurable pool sizes.

3

4

## Capabilities

5

6

### Abstract Connection Pool

7

8

Base class providing core pooling functionality with connection management, recycling, and cleanup.

9

10

```python { .api }

11

class AbstractConnectionPool:

12

"""Generic key-based pooling code."""

13

14

def __init__(self, minconn, maxconn, *args, **kwargs):

15

"""

16

Initialize connection pool.

17

18

Parameters:

19

- minconn (int): Minimum number of connections to maintain

20

- maxconn (int): Maximum number of connections allowed

21

- *args: Arguments passed to psycopg2.connect()

22

- **kwargs: Keyword arguments passed to psycopg2.connect()

23

"""

24

25

def _connect(self, key=None):

26

"""

27

Create new connection.

28

29

Parameters:

30

- key: Optional connection key

31

32

Returns:

33

connection: New database connection

34

"""

35

36

def _getconn(self, key=None):

37

"""

38

Get connection from pool.

39

40

Parameters:

41

- key: Optional connection key

42

43

Returns:

44

connection: Available connection

45

"""

46

47

def _putconn(self, conn, key=None, close=False):

48

"""

49

Return connection to pool.

50

51

Parameters:

52

- conn (connection): Connection to return

53

- key: Connection key

54

- close (bool): Force close connection

55

"""

56

57

def _closeall(self):

58

"""Close all connections in pool."""

59

60

@property

61

def closed(self):

62

"""Pool closed status."""

63

64

@property

65

def minconn(self):

66

"""Minimum connections maintained."""

67

68

@property

69

def maxconn(self):

70

"""Maximum connections allowed."""

71

```

72

73

### Simple Connection Pool

74

75

Connection pool for single-threaded applications without locking mechanisms.

76

77

```python { .api }

78

class SimpleConnectionPool(AbstractConnectionPool):

79

"""Connection pool for single-threaded applications."""

80

81

def getconn(self, key=None):

82

"""

83

Get connection from pool.

84

85

Parameters:

86

- key: Optional connection identifier

87

88

Returns:

89

connection: Available database connection

90

91

Raises:

92

PoolError: If pool is closed or exhausted

93

"""

94

95

def putconn(self, conn, key=None, close=False):

96

"""

97

Return connection to pool.

98

99

Parameters:

100

- conn (connection): Connection to return

101

- key: Connection key (auto-detected if None)

102

- close (bool): Force close instead of recycling

103

104

Raises:

105

PoolError: If pool is closed or connection invalid

106

"""

107

108

def closeall(self):

109

"""

110

Close all connections in pool.

111

112

Raises:

113

PoolError: If pool already closed

114

"""

115

```

116

117

Usage examples:

118

119

```python

120

from psycopg2.pool import SimpleConnectionPool

121

122

# Create simple pool

123

pool = SimpleConnectionPool(

124

2, # minimum connections

125

10, # maximum connections

126

host="localhost",

127

database="mydb",

128

user="postgres",

129

password="secret"

130

)

131

132

# Get connection

133

conn = pool.getconn()

134

135

try:

136

# Use connection

137

cur = conn.cursor()

138

cur.execute("SELECT * FROM users")

139

users = cur.fetchall()

140

cur.close()

141

finally:

142

# Always return connection

143

pool.putconn(conn)

144

145

# Keyed connections

146

conn1 = pool.getconn("user_queries")

147

conn2 = pool.getconn("admin_queries")

148

149

# Return keyed connections

150

pool.putconn(conn1, "user_queries")

151

pool.putconn(conn2, "admin_queries")

152

153

# Close pool when done

154

pool.closeall()

155

```

156

157

### Threaded Connection Pool

158

159

Thread-safe connection pool with locking for multi-threaded applications.

160

161

```python { .api }

162

class ThreadedConnectionPool(AbstractConnectionPool):

163

"""Thread-safe connection pool."""

164

165

def __init__(self, minconn, maxconn, *args, **kwargs):

166

"""

167

Initialize threaded pool.

168

169

Parameters:

170

- minconn (int): Minimum connections to maintain

171

- maxconn (int): Maximum connections allowed

172

- *args: Arguments for psycopg2.connect()

173

- **kwargs: Keyword arguments for psycopg2.connect()

174

"""

175

176

def getconn(self, key=None):

177

"""

178

Thread-safe get connection from pool.

179

180

Parameters:

181

- key: Optional connection identifier

182

183

Returns:

184

connection: Available database connection

185

186

Raises:

187

PoolError: If pool is closed or exhausted

188

"""

189

190

def putconn(self, conn=None, key=None, close=False):

191

"""

192

Thread-safe return connection to pool.

193

194

Parameters:

195

- conn (connection, optional): Connection to return

196

- key: Connection key (auto-detected if conn is None)

197

- close (bool): Force close instead of recycling

198

199

Raises:

200

PoolError: If pool is closed or connection invalid

201

"""

202

203

def closeall(self):

204

"""

205

Thread-safe close all connections.

206

207

Raises:

208

PoolError: If pool already closed

209

"""

210

```

211

212

Usage examples:

213

214

```python

215

from psycopg2.pool import ThreadedConnectionPool

216

import threading

217

218

# Create thread-safe pool

219

pool = ThreadedConnectionPool(

220

1, # minimum connections

221

20, # maximum connections

222

host="localhost",

223

database="mydb",

224

user="postgres",

225

password="secret"

226

)

227

228

def worker_function(worker_id):

229

"""Worker function for threading example."""

230

conn = None

231

try:

232

# Get connection (thread-safe)

233

conn = pool.getconn()

234

235

# Use connection

236

cur = conn.cursor()

237

cur.execute("SELECT COUNT(*) FROM large_table WHERE worker_id = %s",

238

(worker_id,))

239

count = cur.fetchone()[0]

240

cur.close()

241

242

print(f"Worker {worker_id}: {count} records")

243

244

finally:

245

if conn:

246

# Return connection (thread-safe)

247

pool.putconn(conn)

248

249

# Create multiple threads

250

threads = []

251

for i in range(5):

252

thread = threading.Thread(target=worker_function, args=(i,))

253

threads.append(thread)

254

thread.start()

255

256

# Wait for completion

257

for thread in threads:

258

thread.join()

259

260

# Cleanup

261

pool.closeall()

262

```

263

264

### Pool Context Managers

265

266

Connection pools support context manager protocol for automatic cleanup.

267

268

```python

269

# Pool context manager

270

with ThreadedConnectionPool(1, 10, **conn_params) as pool:

271

conn = pool.getconn()

272

try:

273

# Use connection

274

cur = conn.cursor()

275

cur.execute("SELECT 1")

276

result = cur.fetchone()

277

finally:

278

pool.putconn(conn)

279

# Pool automatically closed

280

281

# Connection context manager with pool

282

def get_pooled_connection():

283

conn = pool.getconn()

284

try:

285

yield conn

286

finally:

287

pool.putconn(conn)

288

289

# Usage

290

with get_pooled_connection() as conn:

291

cur = conn.cursor()

292

cur.execute("SELECT * FROM users")

293

users = cur.fetchall()

294

```

295

296

### Advanced Pool Management

297

298

Connection pools provide advanced features for monitoring and management.

299

300

```python

301

# Pool status monitoring

302

print(f"Pool closed: {pool.closed}")

303

print(f"Min connections: {pool.minconn}")

304

print(f"Max connections: {pool.maxconn}")

305

306

# Force close connections

307

conn = pool.getconn()

308

pool.putconn(conn, close=True) # Force close instead of recycling

309

310

# Connection validation and recycling

311

# Pools automatically handle:

312

# - Connection validation before reuse

313

# - Transaction rollback on return

314

# - Connection replacement for closed connections

315

# - Automatic cleanup of idle connections

316

```

317

318

### Pool Error Handling

319

320

Comprehensive error handling for pool operations.

321

322

```python

323

from psycopg2.pool import PoolError

324

325

try:

326

conn = pool.getconn()

327

# Use connection

328

except PoolError as e:

329

if "exhausted" in str(e):

330

print("No connections available - consider increasing maxconn")

331

elif "closed" in str(e):

332

print("Pool has been closed")

333

else:

334

print(f"Pool error: {e}")

335

336

# Handling connection failures

337

try:

338

pool.putconn(invalid_conn)

339

except PoolError as e:

340

print(f"Cannot return connection: {e}")

341

```

342

343

### Custom Connection Factories with Pools

344

345

Use custom connection classes with connection pools.

346

347

```python

348

from psycopg2.extras import DictConnection

349

350

# Pool with custom connection factory

351

class DictConnectionPool(ThreadedConnectionPool):

352

def _connect(self, key=None):

353

"""Create connection with DictConnection factory."""

354

conn = psycopg2.connect(*self._args,

355

connection_factory=DictConnection,

356

**self._kwargs)

357

if key is not None:

358

self._used[key] = conn

359

self._rused[id(conn)] = key

360

else:

361

self._pool.append(conn)

362

return conn

363

364

# Use custom pool

365

dict_pool = DictConnectionPool(1, 10, **conn_params)

366

conn = dict_pool.getconn() # Returns DictConnection

367

cur = conn.cursor() # Returns DictCursor by default

368

```

369

370

## Types

371

372

### Pool Exceptions

373

374

```python { .api }

375

class PoolError(psycopg2.Error):

376

"""Exception raised by connection pool operations."""

377

pass

378

```

379

380

### Pool Configuration

381

382

```python { .api }

383

PoolConfig = {

384

'minconn': int, # Minimum connections to maintain

385

'maxconn': int, # Maximum connections allowed

386

'host': str, # Database host

387

'port': int, # Database port

388

'database': str, # Database name

389

'user': str, # Username

390

'password': str, # Password

391

# ... other psycopg2.connect() parameters

392

}

393

```

394

395

### Pool State

396

397

```python { .api }

398

PoolState = {

399

'closed': bool, # Pool closed status

400

'minconn': int, # Configured minimum connections

401

'maxconn': int, # Configured maximum connections

402

'_pool': list, # Available connections

403

'_used': dict, # Connections in use (key -> connection)

404

'_rused': dict, # Reverse mapping (connection_id -> key)

405

'_keys': int # Key counter

406

}

407

```

408

409

### Thread Safety

410

411

```python

412

# ThreadedConnectionPool uses threading.Lock for:

413

# - getconn() operations

414

# - putconn() operations

415

# - closeall() operations

416

# - Internal pool state management

417

418

# SimpleConnectionPool provides no locking:

419

# - Suitable for single-threaded applications

420

# - Higher performance due to no locking overhead

421

# - Must not be shared between threads

422

```