0
# Batch Operations and Utilities
1
2
Efficient batch execution functions and utility operations for improved performance with multiple queries, connection string management, and specialized database operations.
3
4
## Capabilities
5
6
### Batch Execution Functions
7
8
High-performance batch execution for multiple parameter sets, optimizing network round-trips and improving throughput.
9
10
```python { .api }
11
def execute_batch(cur, sql, argslist, page_size=100):
12
"""
13
Execute SQL with multiple parameter sets efficiently.
14
15
Parameters:
16
- cur: Database cursor
17
- sql (str): SQL statement with placeholders
18
- argslist (sequence): Sequence of parameter tuples
19
- page_size (int): Number of parameters to execute per batch
20
"""
21
22
def execute_values(cur, sql, argslist, template=None, page_size=100, fetch=False):
23
"""
24
Execute INSERT with VALUES clause efficiently.
25
26
Parameters:
27
- cur: Database cursor
28
- sql (str): SQL statement with VALUES placeholder
29
- argslist (sequence): Sequence of parameter tuples
30
- template (str, optional): Values template (default: %s placeholders)
31
- page_size (int): Number of rows per batch
32
- fetch (bool): Return results from RETURNING clause
33
34
Returns:
35
list: Results if fetch=True
36
"""
37
```
38
39
**Usage Example:**
40
41
```python
42
import psycopg2
43
from psycopg2.extras import execute_batch, execute_values
44
45
conn = psycopg2.connect("host=localhost dbname=mydb user=myuser")
46
47
# Prepare batch data
48
users = [
49
("Alice", "alice@example.com", 28),
50
("Bob", "bob@example.com", 32),
51
("Charlie", "charlie@example.com", 24),
52
("Diana", "diana@example.com", 30)
53
]
54
55
# execute_batch - efficient multiple executions
56
with conn.cursor() as cur:
57
execute_batch(
58
cur,
59
"INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
60
users,
61
page_size=2 # Process 2 rows at a time
62
)
63
print(f"Inserted {cur.rowcount} users with execute_batch")
64
65
# execute_values - efficient VALUES clause insertion
66
with conn.cursor() as cur:
67
execute_values(
68
cur,
69
"INSERT INTO users (name, email, age) VALUES %s",
70
users,
71
template=None, # Use default (%s, %s, %s)
72
page_size=100
73
)
74
print(f"Inserted {cur.rowcount} users with execute_values")
75
76
# execute_values with RETURNING clause
77
with conn.cursor() as cur:
78
results = execute_values(
79
cur,
80
"INSERT INTO users (name, email, age) VALUES %s RETURNING id, name",
81
users,
82
fetch=True
83
)
84
for user_id, name in results:
85
print(f"Created user {name} with ID {user_id}")
86
87
# Performance comparison with large datasets
88
import time
89
90
large_dataset = [("User {}".format(i), f"user{i}@example.com", 25+i%40) for i in range(10000)]
91
92
# Regular executemany (slower)
93
start_time = time.time()
94
with conn.cursor() as cur:
95
cur.executemany(
96
"INSERT INTO users_test (name, email, age) VALUES (%s, %s, %s)",
97
large_dataset
98
)
99
executemany_time = time.time() - start_time
100
101
# execute_batch (faster)
102
start_time = time.time()
103
with conn.cursor() as cur:
104
execute_batch(
105
cur,
106
"INSERT INTO users_test2 (name, email, age) VALUES (%s, %s, %s)",
107
large_dataset,
108
page_size=1000
109
)
110
batch_time = time.time() - start_time
111
112
# execute_values (fastest for inserts)
113
start_time = time.time()
114
with conn.cursor() as cur:
115
execute_values(
116
cur,
117
"INSERT INTO users_test3 (name, email, age) VALUES %s",
118
large_dataset,
119
page_size=1000
120
)
121
values_time = time.time() - start_time
122
123
print(f"executemany: {executemany_time:.2f}s")
124
print(f"execute_batch: {batch_time:.2f}s")
125
print(f"execute_values: {values_time:.2f}s")
126
127
conn.commit()
128
conn.close()
129
```
130
131
### Connection String Utilities
132
133
Functions for building and parsing PostgreSQL connection strings programmatically.
134
135
```python { .api }
136
def make_dsn(dsn=None, **kwargs):
137
"""
138
Build connection string from parameters.
139
140
Parameters:
141
- dsn (str, optional): Base connection string
142
- **kwargs: Connection parameters to add/override
143
144
Returns:
145
str: Complete connection string
146
"""
147
148
def parse_dsn(dsn):
149
"""
150
Parse connection string into components.
151
152
Parameters:
153
- dsn (str): Connection string to parse
154
155
Returns:
156
dict: Dictionary of connection parameters
157
"""
158
```
159
160
**Usage Example:**
161
162
```python
163
from psycopg2.extensions import make_dsn, parse_dsn
164
165
# Build connection string from parameters
166
dsn = make_dsn(
167
host="localhost",
168
port=5432,
169
dbname="mydb",
170
user="myuser",
171
password="secret"
172
)
173
print(dsn) # "host=localhost port=5432 dbname=mydb user=myuser password=secret"
174
175
# Add to existing DSN
176
base_dsn = "host=localhost dbname=mydb"
177
full_dsn = make_dsn(base_dsn, user="newuser", password="newpass", connect_timeout=10)
178
print(full_dsn)
179
180
# Parse connection string
181
params = parse_dsn("host=localhost port=5432 dbname=mydb user=myuser sslmode=require")
182
print(params)
183
# {'host': 'localhost', 'port': '5432', 'dbname': 'mydb', 'user': 'myuser', 'sslmode': 'require'}
184
185
# Use parsed parameters to modify connection
186
params['password'] = 'newsecret'
187
params['connect_timeout'] = '30'
188
new_dsn = make_dsn(**params)
189
190
# Connect using built DSN
191
conn = psycopg2.connect(new_dsn)
192
```
193
194
### SQL Identifier Quoting
195
196
Safe quoting of SQL identifiers to prevent injection and handle special characters.
197
198
```python { .api }
199
def quote_ident(name, scope=None):
200
"""
201
Quote SQL identifier safely.
202
203
Parameters:
204
- name (str): Identifier name to quote
205
- scope (connection/cursor, optional): Quoting context
206
207
Returns:
208
str: Properly quoted identifier
209
"""
210
```
211
212
**Usage Example:**
213
214
```python
215
from psycopg2.extensions import quote_ident
216
217
# Quote table and column names safely
218
table_name = quote_ident("user data") # "user data"
219
column_name = quote_ident("email-address") # "email-address"
220
221
# Safe dynamic query building
222
def build_select_query(table, columns, condition_column=None):
223
"""Build SELECT query with safe identifier quoting."""
224
quoted_table = quote_ident(table)
225
quoted_columns = [quote_ident(col) for col in columns]
226
227
query = f"SELECT {', '.join(quoted_columns)} FROM {quoted_table}"
228
229
if condition_column:
230
quoted_condition = quote_ident(condition_column)
231
query += f" WHERE {quoted_condition} = %s"
232
233
return query
234
235
# Usage
236
query = build_select_query("user accounts", ["full name", "email-addr"], "user id")
237
print(query)
238
# SELECT "full name", "email-addr" FROM "user accounts" WHERE "user id" = %s
239
240
with conn.cursor() as cur:
241
cur.execute(query, (123,))
242
results = cur.fetchall()
243
```
244
245
### Wait Callbacks
246
247
Custom wait callback functions for asynchronous operations and connection polling.
248
249
```python { .api }
250
def wait_select(conn):
251
"""
252
Wait callback for select-based waiting.
253
254
Parameters:
255
- conn: Database connection
256
"""
257
258
def set_wait_callback(f):
259
"""
260
Set global wait callback function.
261
262
Parameters:
263
- f (callable): Wait callback function
264
"""
265
266
def get_wait_callback():
267
"""
268
Get current wait callback function.
269
270
Returns:
271
callable: Current wait callback
272
"""
273
```
274
275
**Usage Example:**
276
277
```python
278
import psycopg2
279
from psycopg2.extensions import set_wait_callback, get_wait_callback, wait_select
280
import select
281
282
# Custom wait callback with logging
283
def logging_wait_callback(conn):
284
"""Wait callback that logs polling activity."""
285
print(f"Waiting for connection {conn}")
286
wait_select(conn)
287
print(f"Connection {conn} ready")
288
289
# Set custom wait callback
290
original_callback = get_wait_callback()
291
set_wait_callback(logging_wait_callback)
292
293
# Asynchronous connection (will use custom callback)
294
async_conn = psycopg2.connect(
295
"host=localhost dbname=mydb user=myuser",
296
async_=True
297
)
298
299
# Poll until connection is ready
300
while async_conn.poll() != psycopg2.extensions.POLL_OK:
301
# Custom callback is used during polling
302
pass
303
304
print("Async connection established")
305
306
# Restore original callback
307
set_wait_callback(original_callback)
308
309
async_conn.close()
310
```
311
312
### Password Encryption
313
314
Utility for encrypting passwords using PostgreSQL's password encryption methods.
315
316
```python { .api }
317
def encrypt_password(password, user, scope=None, algorithm=None):
318
"""
319
Encrypt password using PostgreSQL methods.
320
321
Parameters:
322
- password (str): Plain text password
323
- user (str): Username
324
- scope (connection, optional): Connection for server-side encryption
325
- algorithm (str, optional): Encryption algorithm
326
327
Returns:
328
str: Encrypted password string
329
"""
330
```
331
332
**Usage Example:**
333
334
```python
335
from psycopg2.extensions import encrypt_password
336
337
# Client-side password encryption
338
encrypted = encrypt_password("mysecret", "myuser")
339
print(f"Encrypted password: {encrypted}")
340
341
# Use encrypted password in user management
342
conn = psycopg2.connect("host=localhost dbname=postgres user=admin password=adminpass")
343
344
with conn.cursor() as cur:
345
# Create user with encrypted password
346
cur.execute(
347
"CREATE USER %s WITH PASSWORD %s",
348
(psycopg2.sql.Identifier("newuser"), encrypted)
349
)
350
351
conn.commit()
352
conn.close()
353
```
354
355
## Types
356
357
### Batch Function Parameters
358
359
```python { .api }
360
# execute_batch parameters
361
cur: cursor # Database cursor
362
sql: str # SQL statement with placeholders
363
argslist: sequence # Sequence of parameter tuples
364
page_size: int # Batch size (default: 100)
365
366
# execute_values parameters
367
cur: cursor # Database cursor
368
sql: str # SQL with VALUES placeholder
369
argslist: sequence # Parameter tuples sequence
370
template: str | None # Values template (default: None)
371
page_size: int # Batch size (default: 100)
372
fetch: bool # Return results (default: False)
373
```
374
375
### DSN Utility Parameters
376
377
```python { .api }
378
# make_dsn parameters
379
dsn: str | None # Base DSN (default: None)
380
**kwargs: dict # Connection parameters
381
382
# Connection parameter names
383
host: str # Database host
384
port: int # Database port
385
dbname: str # Database name
386
user: str # Username
387
password: str # Password
388
sslmode: str # SSL mode
389
connect_timeout: int # Connection timeout
390
application_name: str # Application name
391
```
392
393
### Wait Callback Interface
394
395
```python { .api }
396
# Wait callback function signature
397
def wait_callback(conn) -> None:
398
"""Custom wait callback for async operations."""
399
400
# Polling result constants
401
POLL_OK: int # 0 - Operation complete
402
POLL_READ: int # 1 - Wait for read
403
POLL_WRITE: int # 2 - Wait for write
404
POLL_ERROR: int # 3 - Error occurred
405
```