0
# Connection Pooling
1
2
Advanced connection pooling with configurable pool sizes, automatic connection lifecycle management, load balancing, and transparent query execution through pooled connections.
3
4
## Capabilities
5
6
### Pool Creation
7
8
Create connection pools with comprehensive configuration options for optimal resource utilization and performance.
9
10
```python { .api }
11
def create_pool(
12
dsn: str = None,
13
*,
14
min_size: int = 10,
15
max_size: int = 10,
16
max_queries: int = 50000,
17
max_inactive_connection_lifetime: float = 300.0,
18
connect: callable = None,
19
setup: callable = None,
20
init: callable = None,
21
reset: callable = None,
22
loop = None,
23
connection_class: typing.Type[Connection] = Connection,
24
record_class: typing.Type[Record] = Record,
25
**connect_kwargs
26
) -> Pool:
27
"""
28
Create a connection pool.
29
30
Parameters:
31
dsn: PostgreSQL connection string
32
min_size: Minimum number of connections in the pool
33
max_size: Maximum number of connections in the pool
34
max_queries: Maximum queries per connection before replacement
35
max_inactive_connection_lifetime: Maximum idle time before connection closure
36
connect: Custom connection factory function
37
setup: Function called on each new connection
38
init: Function called on each acquired connection
39
reset: Function called when connection is released
40
loop: Event loop to use
41
connection_class: Custom connection class
42
record_class: Custom record class
43
**connect_kwargs: Additional connection parameters
44
45
Returns:
46
Pool instance
47
"""
48
```
49
50
#### Example Usage
51
52
```python
53
# Basic pool
54
pool = asyncpg.create_pool('postgresql://user:pass@localhost/mydb')
55
56
# Advanced pool configuration
57
pool = asyncpg.create_pool(
58
'postgresql://user:pass@localhost/mydb',
59
min_size=5,
60
max_size=20,
61
max_queries=10000,
62
max_inactive_connection_lifetime=600.0,
63
command_timeout=60.0
64
)
65
66
# Pool with custom setup
67
async def setup_connection(conn):
68
await conn.set_type_codec('json', encoder=json.dumps, decoder=json.loads)
69
await conn.execute("SET timezone = 'UTC'")
70
71
pool = asyncpg.create_pool(
72
dsn,
73
setup=setup_connection,
74
min_size=2,
75
max_size=10
76
)
77
78
await pool # Initialize the pool
79
```
80
81
### Connection Acquisition
82
83
Acquire and release connections from the pool with automatic lifecycle management.
84
85
```python { .api }
86
def acquire(self, *, timeout: float = None) -> PoolAcquireContext:
87
"""
88
Acquire a database connection from the pool.
89
90
Parameters:
91
timeout: Maximum time to wait for a connection
92
93
Returns:
94
Connection proxy context manager
95
"""
96
97
async def release(self, connection, *, timeout: float = None) -> None:
98
"""
99
Release a database connection back to the pool.
100
101
Parameters:
102
connection: Connection to release
103
timeout: Maximum time to wait for release
104
"""
105
```
106
107
#### Example Usage
108
109
```python
110
# Context manager (recommended)
111
async with pool.acquire() as conn:
112
result = await conn.fetch("SELECT * FROM users")
113
# Connection automatically released
114
115
# Manual acquisition/release
116
conn = await pool.acquire()
117
try:
118
result = await conn.fetch("SELECT * FROM users")
119
finally:
120
await pool.release(conn)
121
122
# With timeout
123
try:
124
async with pool.acquire(timeout=5.0) as conn:
125
result = await conn.execute("LONG RUNNING QUERY")
126
except asyncio.TimeoutError:
127
print("Could not acquire connection within timeout")
128
```
129
130
### Pool Query Methods
131
132
Execute queries directly through the pool without explicit connection management.
133
134
```python { .api }
135
async def execute(self, query: str, *args, timeout: float = None) -> str:
136
"""Execute SQL command using a pool connection."""
137
138
async def executemany(self, command: str, args, *, timeout: float = None) -> None:
139
"""Execute SQL command for multiple argument sets using a pool connection."""
140
141
async def fetch(self, query: str, *args, timeout: float = None, record_class = None) -> list[Record]:
142
"""Fetch all results using a pool connection."""
143
144
async def fetchval(self, query: str, *args, column: int = 0, timeout: float = None):
145
"""Fetch single value using a pool connection."""
146
147
async def fetchrow(self, query: str, *args, timeout: float = None, record_class = None) -> Record:
148
"""Fetch first row using a pool connection."""
149
150
async def fetchmany(self, query: str, args, *, timeout: float = None, record_class = None) -> list[list[Record]]:
151
"""Execute query for multiple argument sets using a pool connection."""
152
```
153
154
#### Example Usage
155
156
```python
157
# Direct pool queries (connection handled automatically)
158
users = await pool.fetch("SELECT * FROM users WHERE active = $1", True)
159
160
count = await pool.fetchval("SELECT COUNT(*) FROM orders")
161
162
await pool.execute(
163
"INSERT INTO logs(message, timestamp) VALUES($1, $2)",
164
"User logged in", datetime.now()
165
)
166
167
# Batch operations
168
orders = [(100, 'pending'), (200, 'shipped'), (300, 'delivered')]
169
await pool.executemany(
170
"INSERT INTO orders(amount, status) VALUES($1, $2)",
171
orders
172
)
173
```
174
175
### Pool COPY Operations
176
177
High-performance bulk operations using the pool's COPY functionality.
178
179
```python { .api }
180
async def copy_from_table(
181
self,
182
table_name: str,
183
*,
184
output,
185
columns: list = None,
186
schema_name: str = None,
187
timeout: float = None,
188
**kwargs
189
) -> str:
190
"""Copy table data to output using a pool connection."""
191
192
async def copy_to_table(
193
self,
194
table_name: str,
195
*,
196
source,
197
columns: list = None,
198
schema_name: str = None,
199
timeout: float = None,
200
**kwargs
201
) -> str:
202
"""Copy data from source to table using a pool connection."""
203
204
async def copy_records_to_table(
205
self,
206
table_name: str,
207
*,
208
records,
209
columns: list = None,
210
schema_name: str = None,
211
timeout: float = None,
212
where: str = None
213
) -> str:
214
"""Copy records to table using a pool connection."""
215
```
216
217
### Pool Management
218
219
Control pool lifecycle, monitor status, and manage configuration.
220
221
```python { .api }
222
async def close(self) -> None:
223
"""Attempt to gracefully close all connections in the pool."""
224
225
def terminate(self) -> None:
226
"""Terminate all connections in the pool immediately."""
227
228
async def expire_connections(self) -> None:
229
"""Expire all currently open connections."""
230
231
def is_closing(self) -> bool:
232
"""Return True if the pool is closing or closed."""
233
234
def set_connect_args(self, dsn: str = None, **connect_kwargs) -> None:
235
"""Update connection arguments for new connections."""
236
```
237
238
#### Example Usage
239
240
```python
241
# Graceful shutdown
242
await pool.close()
243
244
# Force shutdown
245
pool.terminate()
246
247
# Expire old connections (useful after schema changes)
248
await pool.expire_connections()
249
250
# Update connection parameters
251
pool.set_connect_args(
252
command_timeout=30.0,
253
server_settings={'timezone': 'America/New_York'}
254
)
255
```
256
257
### Pool Status Monitoring
258
259
Monitor pool health, connection usage, and performance metrics.
260
261
```python { .api }
262
def get_size(self) -> int:
263
"""Return the current number of connections in the pool."""
264
265
def get_min_size(self) -> int:
266
"""Return the minimum pool size."""
267
268
def get_max_size(self) -> int:
269
"""Return the maximum pool size."""
270
271
def get_idle_size(self) -> int:
272
"""Return the number of idle connections."""
273
```
274
275
#### Example Usage
276
277
```python
278
# Pool status monitoring
279
print(f"Pool size: {pool.get_size()}")
280
print(f"Idle connections: {pool.get_idle_size()}")
281
print(f"Active connections: {pool.get_size() - pool.get_idle_size()}")
282
283
# Health check
284
if pool.get_idle_size() == 0 and pool.get_size() == pool.get_max_size():
285
print("Warning: Pool is at maximum capacity with no idle connections")
286
287
# Auto-scaling logic
288
if pool.get_idle_size() < 2:
289
print("Consider increasing pool size")
290
```
291
292
### Pool Configuration Patterns
293
294
Common pool configuration patterns for different use cases.
295
296
#### Web Application Pool
297
298
```python
299
# Web application with variable load
300
pool = asyncpg.create_pool(
301
dsn,
302
min_size=5, # Always keep some connections ready
303
max_size=50, # Handle traffic spikes
304
max_queries=10000, # Prevent connection reuse issues
305
max_inactive_connection_lifetime=300, # 5 minutes
306
command_timeout=30.0 # Prevent hanging requests
307
)
308
```
309
310
#### Batch Processing Pool
311
312
```python
313
# Batch processing with long-running queries
314
pool = asyncpg.create_pool(
315
dsn,
316
min_size=2, # Minimal overhead
317
max_size=10, # Limited parallelism
318
max_queries=1000, # More connection reuse
319
max_inactive_connection_lifetime=1800, # 30 minutes
320
command_timeout=3600.0 # Long-running queries
321
)
322
```
323
324
#### High-Throughput Pool
325
326
```python
327
# High-throughput OLTP system
328
pool = asyncpg.create_pool(
329
dsn,
330
min_size=20, # Keep many connections ready
331
max_size=100, # High concurrency
332
max_queries=50000, # Longer connection lifetime
333
max_inactive_connection_lifetime=60, # Quick recycling
334
command_timeout=5.0 # Fast queries only
335
)
336
```
337
338
### Error Handling
339
340
Handle pool-specific errors and connection acquisition failures.
341
342
```python
343
try:
344
async with pool.acquire() as conn:
345
result = await conn.fetch("SELECT * FROM users")
346
except asyncio.TimeoutError:
347
print("Timeout acquiring connection from pool")
348
except asyncpg.TooManyConnectionsError:
349
print("Pool has reached maximum size")
350
except asyncpg.PostgresConnectionError:
351
print("Connection error in pool")
352
353
# Check pool state before operations
354
if pool.is_closing():
355
print("Pool is shutting down")
356
else:
357
result = await pool.fetchval("SELECT 1")
358
```
359
360
## Types
361
362
```python { .api }
363
class Pool:
364
"""A connection pool."""
365
366
class PoolAcquireContext:
367
"""Context manager for acquiring pool connections."""
368
369
async def __aenter__(self) -> Connection:
370
"""Acquire connection from pool."""
371
372
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
373
"""Release connection back to pool."""
374
```