or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-management.mdconnection-pooling.mdcopy-operations.mdcursor-operations.mdexception-handling.mdindex.mdlisteners-notifications.mdprepared-statements.mdquery-execution.mdtransaction-management.mdtype-system.md

connection-pooling.mddocs/

0

# Connection Pooling

1

2

Advanced connection pooling with configurable pool sizes, automatic connection lifecycle management, load balancing, and transparent query execution through pooled connections.

3

4

## Capabilities

5

6

### Pool Creation

7

8

Create connection pools with comprehensive configuration options for optimal resource utilization and performance.

9

10

```python { .api }

11

def create_pool(

12

dsn: str = None,

13

*,

14

min_size: int = 10,

15

max_size: int = 10,

16

max_queries: int = 50000,

17

max_inactive_connection_lifetime: float = 300.0,

18

connect: callable = None,

19

setup: callable = None,

20

init: callable = None,

21

reset: callable = None,

22

loop = None,

23

connection_class: typing.Type[Connection] = Connection,

24

record_class: typing.Type[Record] = Record,

25

**connect_kwargs

26

) -> Pool:

27

"""

28

Create a connection pool.

29

30

Parameters:

31

dsn: PostgreSQL connection string

32

min_size: Minimum number of connections in the pool

33

max_size: Maximum number of connections in the pool

34

max_queries: Maximum queries per connection before replacement

35

max_inactive_connection_lifetime: Maximum idle time before connection closure

36

connect: Custom connection factory function

37

setup: Function called on each new connection

38

init: Function called on each acquired connection

39

reset: Function called when connection is released

40

loop: Event loop to use

41

connection_class: Custom connection class

42

record_class: Custom record class

43

**connect_kwargs: Additional connection parameters

44

45

Returns:

46

Pool instance

47

"""

48

```

49

50

#### Example Usage

51

52

```python

53

# Basic pool

54

pool = asyncpg.create_pool('postgresql://user:pass@localhost/mydb')

55

56

# Advanced pool configuration

57

pool = asyncpg.create_pool(

58

'postgresql://user:pass@localhost/mydb',

59

min_size=5,

60

max_size=20,

61

max_queries=10000,

62

max_inactive_connection_lifetime=600.0,

63

command_timeout=60.0

64

)

65

66

# Pool with custom setup

67

async def setup_connection(conn):

68

await conn.set_type_codec('json', encoder=json.dumps, decoder=json.loads)

69

await conn.execute("SET timezone = 'UTC'")

70

71

pool = asyncpg.create_pool(

72

dsn,

73

setup=setup_connection,

74

min_size=2,

75

max_size=10

76

)

77

78

await pool # Initialize the pool

79

```

80

81

### Connection Acquisition

82

83

Acquire and release connections from the pool with automatic lifecycle management.

84

85

```python { .api }

86

def acquire(self, *, timeout: float = None) -> PoolAcquireContext:

87

"""

88

Acquire a database connection from the pool.

89

90

Parameters:

91

timeout: Maximum time to wait for a connection

92

93

Returns:

94

Connection proxy context manager

95

"""

96

97

async def release(self, connection, *, timeout: float = None) -> None:

98

"""

99

Release a database connection back to the pool.

100

101

Parameters:

102

connection: Connection to release

103

timeout: Maximum time to wait for release

104

"""

105

```

106

107

#### Example Usage

108

109

```python

110

# Context manager (recommended)

111

async with pool.acquire() as conn:

112

result = await conn.fetch("SELECT * FROM users")

113

# Connection automatically released

114

115

# Manual acquisition/release

116

conn = await pool.acquire()

117

try:

118

result = await conn.fetch("SELECT * FROM users")

119

finally:

120

await pool.release(conn)

121

122

# With timeout

123

try:

124

async with pool.acquire(timeout=5.0) as conn:

125

result = await conn.execute("LONG RUNNING QUERY")

126

except asyncio.TimeoutError:

127

print("Could not acquire connection within timeout")

128

```

129

130

### Pool Query Methods

131

132

Execute queries directly through the pool without explicit connection management.

133

134

```python { .api }

135

async def execute(self, query: str, *args, timeout: float = None) -> str:

136

"""Execute SQL command using a pool connection."""

137

138

async def executemany(self, command: str, args, *, timeout: float = None) -> None:

139

"""Execute SQL command for multiple argument sets using a pool connection."""

140

141

async def fetch(self, query: str, *args, timeout: float = None, record_class = None) -> list[Record]:

142

"""Fetch all results using a pool connection."""

143

144

async def fetchval(self, query: str, *args, column: int = 0, timeout: float = None):

145

"""Fetch single value using a pool connection."""

146

147

async def fetchrow(self, query: str, *args, timeout: float = None, record_class = None) -> Record:

148

"""Fetch first row using a pool connection."""

149

150

async def fetchmany(self, query: str, args, *, timeout: float = None, record_class = None) -> list[list[Record]]:

151

"""Execute query for multiple argument sets using a pool connection."""

152

```

153

154

#### Example Usage

155

156

```python

157

# Direct pool queries (connection handled automatically)

158

users = await pool.fetch("SELECT * FROM users WHERE active = $1", True)

159

160

count = await pool.fetchval("SELECT COUNT(*) FROM orders")

161

162

await pool.execute(

163

"INSERT INTO logs(message, timestamp) VALUES($1, $2)",

164

"User logged in", datetime.now()

165

)

166

167

# Batch operations

168

orders = [(100, 'pending'), (200, 'shipped'), (300, 'delivered')]

169

await pool.executemany(

170

"INSERT INTO orders(amount, status) VALUES($1, $2)",

171

orders

172

)

173

```

174

175

### Pool COPY Operations

176

177

High-performance bulk operations using the pool's COPY functionality.

178

179

```python { .api }

180

async def copy_from_table(

181

self,

182

table_name: str,

183

*,

184

output,

185

columns: list = None,

186

schema_name: str = None,

187

timeout: float = None,

188

**kwargs

189

) -> str:

190

"""Copy table data to output using a pool connection."""

191

192

async def copy_to_table(

193

self,

194

table_name: str,

195

*,

196

source,

197

columns: list = None,

198

schema_name: str = None,

199

timeout: float = None,

200

**kwargs

201

) -> str:

202

"""Copy data from source to table using a pool connection."""

203

204

async def copy_records_to_table(

205

self,

206

table_name: str,

207

*,

208

records,

209

columns: list = None,

210

schema_name: str = None,

211

timeout: float = None,

212

where: str = None

213

) -> str:

214

"""Copy records to table using a pool connection."""

215

```

216

217

### Pool Management

218

219

Control pool lifecycle, monitor status, and manage configuration.

220

221

```python { .api }

222

async def close(self) -> None:

223

"""Attempt to gracefully close all connections in the pool."""

224

225

def terminate(self) -> None:

226

"""Terminate all connections in the pool immediately."""

227

228

async def expire_connections(self) -> None:

229

"""Expire all currently open connections."""

230

231

def is_closing(self) -> bool:

232

"""Return True if the pool is closing or closed."""

233

234

def set_connect_args(self, dsn: str = None, **connect_kwargs) -> None:

235

"""Update connection arguments for new connections."""

236

```

237

238

#### Example Usage

239

240

```python

241

# Graceful shutdown

242

await pool.close()

243

244

# Force shutdown

245

pool.terminate()

246

247

# Expire old connections (useful after schema changes)

248

await pool.expire_connections()

249

250

# Update connection parameters

251

pool.set_connect_args(

252

command_timeout=30.0,

253

server_settings={'timezone': 'America/New_York'}

254

)

255

```

256

257

### Pool Status Monitoring

258

259

Monitor pool health, connection usage, and performance metrics.

260

261

```python { .api }

262

def get_size(self) -> int:

263

"""Return the current number of connections in the pool."""

264

265

def get_min_size(self) -> int:

266

"""Return the minimum pool size."""

267

268

def get_max_size(self) -> int:

269

"""Return the maximum pool size."""

270

271

def get_idle_size(self) -> int:

272

"""Return the number of idle connections."""

273

```

274

275

#### Example Usage

276

277

```python

278

# Pool status monitoring

279

print(f"Pool size: {pool.get_size()}")

280

print(f"Idle connections: {pool.get_idle_size()}")

281

print(f"Active connections: {pool.get_size() - pool.get_idle_size()}")

282

283

# Health check

284

if pool.get_idle_size() == 0 and pool.get_size() == pool.get_max_size():

285

print("Warning: Pool is at maximum capacity with no idle connections")

286

287

# Auto-scaling logic

288

if pool.get_idle_size() < 2:

289

print("Consider increasing pool size")

290

```

291

292

### Pool Configuration Patterns

293

294

Common pool configuration patterns for different use cases.

295

296

#### Web Application Pool

297

298

```python

299

# Web application with variable load

300

pool = asyncpg.create_pool(

301

dsn,

302

min_size=5, # Always keep some connections ready

303

max_size=50, # Handle traffic spikes

304

max_queries=10000, # Prevent connection reuse issues

305

max_inactive_connection_lifetime=300, # 5 minutes

306

command_timeout=30.0 # Prevent hanging requests

307

)

308

```

309

310

#### Batch Processing Pool

311

312

```python

313

# Batch processing with long-running queries

314

pool = asyncpg.create_pool(

315

dsn,

316

min_size=2, # Minimal overhead

317

max_size=10, # Limited parallelism

318

max_queries=1000, # More connection reuse

319

max_inactive_connection_lifetime=1800, # 30 minutes

320

command_timeout=3600.0 # Long-running queries

321

)

322

```

323

324

#### High-Throughput Pool

325

326

```python

327

# High-throughput OLTP system

328

pool = asyncpg.create_pool(

329

dsn,

330

min_size=20, # Keep many connections ready

331

max_size=100, # High concurrency

332

max_queries=50000, # Longer connection lifetime

333

max_inactive_connection_lifetime=60, # Quick recycling

334

command_timeout=5.0 # Fast queries only

335

)

336

```

337

338

### Error Handling

339

340

Handle pool-specific errors and connection acquisition failures.

341

342

```python

343

try:

344

async with pool.acquire() as conn:

345

result = await conn.fetch("SELECT * FROM users")

346

except asyncio.TimeoutError:

347

print("Timeout acquiring connection from pool")

348

except asyncpg.TooManyConnectionsError:

349

print("Pool has reached maximum size")

350

except asyncpg.PostgresConnectionError:

351

print("Connection error in pool")

352

353

# Check pool state before operations

354

if pool.is_closing():

355

print("Pool is shutting down")

356

else:

357

result = await pool.fetchval("SELECT 1")

358

```

359

360

## Types

361

362

```python { .api }

363

class Pool:

364

"""A connection pool."""

365

366

class PoolAcquireContext:

367

"""Context manager for acquiring pool connections."""

368

369

async def __aenter__(self) -> Connection:

370

"""Acquire connection from pool."""

371

372

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

373

"""Release connection back to pool."""

374

```