Python-PostgreSQL Database Adapter
—
Efficient batch execution functions and utility operations for improved performance with multiple queries, connection string management, and specialized database operations.
High-performance batch execution for multiple parameter sets, optimizing network round-trips and improving throughput.
def execute_batch(cur, sql, argslist, page_size=100):
"""
Execute SQL with multiple parameter sets efficiently.
Parameters:
- cur: Database cursor
- sql (str): SQL statement with placeholders
- argslist (sequence): Sequence of parameter tuples
- page_size (int): Number of parameters to execute per batch
"""
def execute_values(cur, sql, argslist, template=None, page_size=100, fetch=False):
"""
Execute INSERT with VALUES clause efficiently.
Parameters:
- cur: Database cursor
- sql (str): SQL statement with VALUES placeholder
- argslist (sequence): Sequence of parameter tuples
- template (str, optional): Values template (default: %s placeholders)
- page_size (int): Number of rows per batch
- fetch (bool): Return results from RETURNING clause
Returns:
list: Results if fetch=True
"""Usage Example:
import psycopg2
from psycopg2.extras import execute_batch, execute_values
conn = psycopg2.connect("host=localhost dbname=mydb user=myuser")
# Prepare batch data
users = [
("Alice", "alice@example.com", 28),
("Bob", "bob@example.com", 32),
("Charlie", "charlie@example.com", 24),
("Diana", "diana@example.com", 30)
]
# execute_batch - efficient multiple executions
with conn.cursor() as cur:
execute_batch(
cur,
"INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
users,
page_size=2 # Process 2 rows at a time
)
print(f"Inserted {cur.rowcount} users with execute_batch")
# execute_values - efficient VALUES clause insertion
with conn.cursor() as cur:
execute_values(
cur,
"INSERT INTO users (name, email, age) VALUES %s",
users,
template=None, # Use default (%s, %s, %s)
page_size=100
)
print(f"Inserted {cur.rowcount} users with execute_values")
# execute_values with RETURNING clause
with conn.cursor() as cur:
results = execute_values(
cur,
"INSERT INTO users (name, email, age) VALUES %s RETURNING id, name",
users,
fetch=True
)
for user_id, name in results:
print(f"Created user {name} with ID {user_id}")
# Performance comparison with large datasets
import time
large_dataset = [("User {}".format(i), f"user{i}@example.com", 25+i%40) for i in range(10000)]
# Regular executemany (slower)
start_time = time.time()
with conn.cursor() as cur:
cur.executemany(
"INSERT INTO users_test (name, email, age) VALUES (%s, %s, %s)",
large_dataset
)
executemany_time = time.time() - start_time
# execute_batch (faster)
start_time = time.time()
with conn.cursor() as cur:
execute_batch(
cur,
"INSERT INTO users_test2 (name, email, age) VALUES (%s, %s, %s)",
large_dataset,
page_size=1000
)
batch_time = time.time() - start_time
# execute_values (fastest for inserts)
start_time = time.time()
with conn.cursor() as cur:
execute_values(
cur,
"INSERT INTO users_test3 (name, email, age) VALUES %s",
large_dataset,
page_size=1000
)
values_time = time.time() - start_time
print(f"executemany: {executemany_time:.2f}s")
print(f"execute_batch: {batch_time:.2f}s")
print(f"execute_values: {values_time:.2f}s")
conn.commit()
conn.close()Functions for building and parsing PostgreSQL connection strings programmatically.
def make_dsn(dsn=None, **kwargs):
"""
Build connection string from parameters.
Parameters:
- dsn (str, optional): Base connection string
- **kwargs: Connection parameters to add/override
Returns:
str: Complete connection string
"""
def parse_dsn(dsn):
"""
Parse connection string into components.
Parameters:
- dsn (str): Connection string to parse
Returns:
dict: Dictionary of connection parameters
"""Usage Example:
from psycopg2.extensions import make_dsn, parse_dsn
# Build connection string from parameters
dsn = make_dsn(
host="localhost",
port=5432,
dbname="mydb",
user="myuser",
password="secret"
)
print(dsn) # "host=localhost port=5432 dbname=mydb user=myuser password=secret"
# Add to existing DSN
base_dsn = "host=localhost dbname=mydb"
full_dsn = make_dsn(base_dsn, user="newuser", password="newpass", connect_timeout=10)
print(full_dsn)
# Parse connection string
params = parse_dsn("host=localhost port=5432 dbname=mydb user=myuser sslmode=require")
print(params)
# {'host': 'localhost', 'port': '5432', 'dbname': 'mydb', 'user': 'myuser', 'sslmode': 'require'}
# Use parsed parameters to modify connection
params['password'] = 'newsecret'
params['connect_timeout'] = '30'
new_dsn = make_dsn(**params)
# Connect using built DSN
conn = psycopg2.connect(new_dsn)Safe quoting of SQL identifiers to prevent injection and handle special characters.
def quote_ident(name, scope=None):
"""
Quote SQL identifier safely.
Parameters:
- name (str): Identifier name to quote
- scope (connection/cursor, optional): Quoting context
Returns:
str: Properly quoted identifier
"""Usage Example:
from psycopg2.extensions import quote_ident
# Quote table and column names safely
table_name = quote_ident("user data") # "user data"
column_name = quote_ident("email-address") # "email-address"
# Safe dynamic query building
def build_select_query(table, columns, condition_column=None):
"""Build SELECT query with safe identifier quoting."""
quoted_table = quote_ident(table)
quoted_columns = [quote_ident(col) for col in columns]
query = f"SELECT {', '.join(quoted_columns)} FROM {quoted_table}"
if condition_column:
quoted_condition = quote_ident(condition_column)
query += f" WHERE {quoted_condition} = %s"
return query
# Usage
query = build_select_query("user accounts", ["full name", "email-addr"], "user id")
print(query)
# SELECT "full name", "email-addr" FROM "user accounts" WHERE "user id" = %s
with conn.cursor() as cur:
cur.execute(query, (123,))
results = cur.fetchall()Custom wait callback functions for asynchronous operations and connection polling.
def wait_select(conn):
"""
Wait callback for select-based waiting.
Parameters:
- conn: Database connection
"""
def set_wait_callback(f):
"""
Set global wait callback function.
Parameters:
- f (callable): Wait callback function
"""
def get_wait_callback():
"""
Get current wait callback function.
Returns:
callable: Current wait callback
"""Usage Example:
import psycopg2
from psycopg2.extensions import set_wait_callback, get_wait_callback, wait_select
import select
# Custom wait callback with logging
def logging_wait_callback(conn):
"""Wait callback that logs polling activity."""
print(f"Waiting for connection {conn}")
wait_select(conn)
print(f"Connection {conn} ready")
# Set custom wait callback
original_callback = get_wait_callback()
set_wait_callback(logging_wait_callback)
# Asynchronous connection (will use custom callback)
async_conn = psycopg2.connect(
"host=localhost dbname=mydb user=myuser",
async_=True
)
# Poll until connection is ready
while async_conn.poll() != psycopg2.extensions.POLL_OK:
# Custom callback is used during polling
pass
print("Async connection established")
# Restore original callback
set_wait_callback(original_callback)
async_conn.close()Utility for encrypting passwords using PostgreSQL's password encryption methods.
def encrypt_password(password, user, scope=None, algorithm=None):
"""
Encrypt password using PostgreSQL methods.
Parameters:
- password (str): Plain text password
- user (str): Username
- scope (connection, optional): Connection for server-side encryption
- algorithm (str, optional): Encryption algorithm
Returns:
str: Encrypted password string
"""Usage Example:
from psycopg2.extensions import encrypt_password
# Client-side password encryption
encrypted = encrypt_password("mysecret", "myuser")
print(f"Encrypted password: {encrypted}")
# Use encrypted password in user management
conn = psycopg2.connect("host=localhost dbname=postgres user=admin password=adminpass")
with conn.cursor() as cur:
# Create user with encrypted password
cur.execute(
"CREATE USER %s WITH PASSWORD %s",
(psycopg2.sql.Identifier("newuser"), encrypted)
)
conn.commit()
conn.close()# execute_batch parameters
cur: cursor # Database cursor
sql: str # SQL statement with placeholders
argslist: sequence # Sequence of parameter tuples
page_size: int # Batch size (default: 100)
# execute_values parameters
cur: cursor # Database cursor
sql: str # SQL with VALUES placeholder
argslist: sequence # Parameter tuples sequence
template: str | None # Values template (default: None)
page_size: int # Batch size (default: 100)
fetch: bool # Return results (default: False)# make_dsn parameters
dsn: str | None # Base DSN (default: None)
**kwargs: dict # Connection parameters
# Connection parameter names
host: str # Database host
port: int # Database port
dbname: str # Database name
user: str # Username
password: str # Password
sslmode: str # SSL mode
connect_timeout: int # Connection timeout
application_name: str # Application name# Wait callback function signature
def wait_callback(conn) -> None:
"""Custom wait callback for async operations."""
# Polling result constants
POLL_OK: int # 0 - Operation complete
POLL_READ: int # 1 - Wait for read
POLL_WRITE: int # 2 - Wait for write
POLL_ERROR: int # 3 - Error occurredInstall with Tessl CLI
npx tessl i tessl/pypi-psycopg2