PostgreSQL database adapter for Python
—
Flexible result formatting with built-in row factories for tuples, dictionaries, named tuples, and custom objects, plus protocols for creating custom result processors.
Pre-defined row factories for common result formatting needs, providing different ways to access query results.
def tuple_row(cursor) -> RowMaker[tuple]:
"""
Default row factory returning tuples.
Args:
cursor: Database cursor with result metadata
Returns:
RowMaker that creates tuples from result rows
"""
def dict_row(cursor) -> RowMaker[dict]:
"""
Row factory returning dictionaries with column names as keys.
Args:
cursor: Database cursor with result metadata
Returns:
RowMaker that creates dicts with column names as keys
"""
def namedtuple_row(cursor) -> RowMaker[NamedTuple]:
"""
Row factory returning named tuples with column names as attributes.
Args:
cursor: Database cursor with result metadata
Returns:
RowMaker that creates named tuples with column attributes
"""
def scalar_row(cursor) -> RowMaker[Any]:
"""
Row factory for single-column results returning the value directly.
Args:
cursor: Database cursor with result metadata
Returns:
RowMaker that returns single column values
Raises:
ProgrammingError: If result has more than one column
"""Create row factories that instantiate custom classes or call functions with result data.
def class_row(cls: type) -> RowFactory:
"""
Row factory that creates instances of specified class.
Args:
cls: Class to instantiate with row values
Returns:
RowFactory that creates instances of cls
Note:
Class constructor must accept row values as positional arguments
"""
def args_row(func: Callable) -> RowFactory:
"""
Row factory that calls function with row values as positional arguments.
Args:
func: Function to call with row values
Returns:
RowFactory that calls func(*row_values)
"""
def kwargs_row(func: Callable) -> RowFactory:
"""
Row factory that calls function with column names as keyword arguments.
Args:
func: Function to call with column name/value pairs
Returns:
RowFactory that calls func(**{col_name: col_value, ...})
"""Type protocols defining the interfaces for row factories and row makers.
from typing import Protocol, Sequence, Any, TypeVar
Row = TypeVar("Row", covariant=True)
class RowMaker(Protocol[Row]):
"""
Protocol for callable that converts sequence of values to row object.
The sequence contains database values already adapted to Python types.
Return value is the object your application receives.
"""
def __call__(self, values: Sequence[Any]) -> Row:
"""
Convert sequence of column values to row object.
Args:
values: Sequence of column values from database
Returns:
Row object of type Row
"""
class RowFactory(Protocol):
"""
Protocol for callable that creates RowMaker from cursor.
Row factories inspect cursor metadata to determine how to format results.
"""
def __call__(self, cursor) -> RowMaker:
"""
Create RowMaker for given cursor.
Args:
cursor: Database cursor with result metadata
Returns:
RowMaker configured for cursor's result structure
"""Type definitions for common row formats returned by built-in factories.
from typing import Any, Dict
# Type aliases for documentation
TupleRow = tuple[Any, ...]
DictRow = Dict[str, Any]
# Row type is parameterized for custom row factories
Row = TypeVar("Row", covariant=True, default=TupleRow)from psycopg import rows
# Default tuple rows
with conn.cursor() as cur:
cur.execute("SELECT id, name, email FROM users")
for row in cur:
id, name, email = row # Tuple unpacking
print(f"User {id}: {name} <{email}>")
# Dictionary rows
with conn.cursor(row_factory=rows.dict_row) as cur:
cur.execute("SELECT id, name, email FROM users")
for row in cur:
print(f"User {row['id']}: {row['name']} <{row['email']}>")
# Named tuple rows
with conn.cursor(row_factory=rows.namedtuple_row) as cur:
cur.execute("SELECT id, name, email FROM users")
for row in cur:
print(f"User {row.id}: {row.name} <{row.email}>")
# Scalar results
with conn.cursor(row_factory=rows.scalar_row) as cur:
cur.execute("SELECT COUNT(*) FROM users")
count = cur.fetchone() # Returns integer directly, not tuple
print(f"Total users: {count}")from dataclasses import dataclass
from psycopg import rows
@dataclass
class User:
id: int
name: str
email: str
created_at: datetime
# Using class_row factory
def get_users():
with conn.cursor(row_factory=rows.class_row(User)) as cur:
cur.execute("SELECT id, name, email, created_at FROM users")
return cur.fetchall() # Returns list of User objects
users = get_users()
for user in users:
print(f"{user.name} ({user.email}) created on {user.created_at}")# Process results with function
def process_user_data(id, name, email, created_at):
return {
'user_id': id,
'display_name': name.title(),
'contact': email.lower(),
'age_days': (datetime.now() - created_at).days
}
# Using args_row factory
with conn.cursor(row_factory=rows.args_row(process_user_data)) as cur:
cur.execute("SELECT id, name, email, created_at FROM users")
processed_users = cur.fetchall()
# Using kwargs_row factory
def format_user(**kwargs):
return f"{kwargs['name']} <{kwargs['email']}> (ID: {kwargs['id']})"
with conn.cursor(row_factory=rows.kwargs_row(format_user)) as cur:
cur.execute("SELECT id, name, email FROM users")
formatted = cur.fetchall() # Returns list of formatted stringsfrom psycopg.rows import RowFactory, RowMaker
from typing import Any, Sequence
def json_row(cursor) -> RowMaker[str]:
"""Row factory that returns each row as JSON string"""
import json
# Get column names from cursor
if cursor.description is None:
raise ProgrammingError("No result to format")
column_names = [col.name for col in cursor.description]
def make_json_row(values: Sequence[Any]) -> str:
row_dict = dict(zip(column_names, values))
return json.dumps(row_dict, default=str)
return make_json_row
# Usage
with conn.cursor(row_factory=json_row) as cur:
cur.execute("SELECT id, name, email FROM users LIMIT 1")
json_result = cur.fetchone()
print(json_result) # {"id": 1, "name": "Alice", "email": "alice@example.com"}from typing import Any, Sequence
import re
def validated_email_row(cursor) -> RowMaker[dict]:
"""Row factory that validates email format"""
if cursor.description is None:
raise ProgrammingError("No result to format")
column_names = [col.name for col in cursor.description]
email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
def make_validated_row(values: Sequence[Any]) -> dict:
row = dict(zip(column_names, values))
# Validate email if present
if 'email' in row and row['email']:
if not email_pattern.match(row['email']):
row['email_valid'] = False
row['email_error'] = 'Invalid email format'
else:
row['email_valid'] = True
row['email_error'] = None
return row
return make_validated_row
# Usage
with conn.cursor(row_factory=validated_email_row) as cur:
cur.execute("SELECT id, name, email FROM users")
for row in cur:
if not row.get('email_valid', True):
print(f"Warning: {row['name']} has invalid email: {row['email']}")def smart_row_factory(cursor) -> RowMaker:
"""Choose row factory based on result structure"""
if cursor.description is None:
raise ProgrammingError("No result to format")
column_count = len(cursor.description)
if column_count == 1:
# Single column - return scalar
return rows.scalar_row(cursor)
elif column_count <= 3:
# Few columns - use named tuple for attribute access
return rows.namedtuple_row(cursor)
else:
# Many columns - use dictionary for flexible access
return rows.dict_row(cursor)
# Usage
with conn.cursor(row_factory=smart_row_factory) as cur:
# Single column query -> scalar result
cur.execute("SELECT COUNT(*) FROM users")
count = cur.fetchone() # Returns int
# Two column query -> named tuple
cur.execute("SELECT name, email FROM users LIMIT 1")
user = cur.fetchone() # Returns named tuple with .name, .email
# Many columns -> dictionary
cur.execute("SELECT * FROM users LIMIT 1")
full_user = cur.fetchone() # Returns dict with all columnsdef fast_dict_row(cursor) -> RowMaker[dict]:
"""Optimized dictionary row factory for performance-critical code"""
if cursor.description is None:
raise ProgrammingError("No result to format")
# Pre-compute column names to avoid repeated attribute access
column_names = tuple(col.name for col in cursor.description)
# Use dict constructor with zip for better performance than dict comprehension
def make_fast_dict(values: Sequence[Any]) -> dict:
return dict(zip(column_names, values))
return make_fast_dictdef grouped_row_factory(group_by_column: str):
"""Row factory that groups results by specified column"""
def factory(cursor) -> RowMaker[dict]:
if cursor.description is None:
raise ProgrammingError("No result to format")
column_names = [col.name for col in cursor.description]
if group_by_column not in column_names:
raise ValueError(f"Group column '{group_by_column}' not in result")
group_index = column_names.index(group_by_column)
results_by_group = {}
def make_grouped_row(values: Sequence[Any]) -> dict:
group_value = values[group_index]
row_dict = dict(zip(column_names, values))
if group_value not in results_by_group:
results_by_group[group_value] = []
results_by_group[group_value].append(row_dict)
# Return current group state
return {
'current_row': row_dict,
'group_value': group_value,
'group_data': results_by_group[group_value]
}
return make_grouped_row
return factory
# Usage
with conn.cursor(row_factory=grouped_row_factory('department')) as cur:
cur.execute("SELECT name, department, salary FROM employees ORDER BY department")
for row in cur:
print(f"Employee: {row['current_row']['name']}")
print(f"Department: {row['group_value']}")
print(f"Colleagues in dept: {len(row['group_data'])}")Row factories can be set at different levels:
# Set default row factory for all cursors on connection
conn.row_factory = rows.dict_row
with conn.cursor() as cur:
cur.execute("SELECT * FROM users")
# Results are dictionaries# Set row factory for specific cursor
with conn.cursor(row_factory=rows.namedtuple_row) as cur:
cur.execute("SELECT * FROM users")
# Results are named tuples
# Change row factory on existing cursor
cur.row_factory = rows.dict_row
cur.execute("SELECT * FROM products")
# Results are now dictionaries# Different row factories for different queries on same cursor
with conn.cursor() as cur:
# Scalar results for aggregates
cur.row_factory = rows.scalar_row
cur.execute("SELECT COUNT(*) FROM users")
count = cur.fetchone()
# Dictionary results for detailed data
cur.row_factory = rows.dict_row
cur.execute("SELECT * FROM users LIMIT 10")
users = cur.fetchall()from typing import TypeVar, Generic
from psycopg.rows import RowMaker
T = TypeVar('T')
class TypedCursor(Generic[T]):
"""Type-safe cursor wrapper"""
def __init__(self, cursor, row_factory: RowMaker[T]):
self.cursor = cursor
self.cursor.row_factory = row_factory
def fetchone(self) -> T | None:
return self.cursor.fetchone()
def fetchall(self) -> list[T]:
return self.cursor.fetchall()
# Usage with type safety
@dataclass
class User:
id: int
name: str
email: str
typed_cursor = TypedCursor(conn.cursor(), rows.class_row(User))
users: list[User] = typed_cursor.fetchall() # Type checker knows this is list[User]Install with Tessl CLI
npx tessl i tessl/pypi-psycopg