Postgresql fixtures and fixture factories for Pytest.
—
Flexible data loading system for database initialization and test data setup, supporting SQL files, Python callables, and import strings with comprehensive error handling and retry logic.
Converts various input types into callable loaders for database initialization.
def build_loader(load: Union[Callable, str, Path]) -> Callable:
"""
Build a loader callable from various input types.
Supports three input types:
- Path objects: Loads SQL files using the sql() function
- String: Import path like 'module.function' or 'module:function'
- Callable: Returns the callable directly
Parameters:
- load: SQL file path, import string, or callable function
Returns:
Callable that accepts connection parameters as kwargs
"""Loads and executes SQL files into the database.
def sql(sql_filename: Path, **kwargs: Any) -> None:
"""
Load SQL file into database.
Connects to database using provided connection parameters,
reads the SQL file, and executes its contents.
Parameters:
- sql_filename: Path to SQL file to execute
- **kwargs: Database connection parameters (host, port, user, etc.)
Returns:
None
Raises:
- FileNotFoundError: If SQL file doesn't exist
- psycopg.Error: If SQL execution fails
"""Robust retry mechanism for handling temporary connection failures.
def retry(
func: Callable[[], T],
timeout: int = 60,
possible_exception: Type[Exception] = Exception,
) -> T:
"""
Retry function execution with timeout for handling temporary failures.
Particularly useful for database connections that may fail initially
due to server startup delays.
Parameters:
- func: Function to retry
- timeout: Maximum retry time in seconds (default: 60)
- possible_exception: Exception type to catch and retry (default: Exception)
Returns:
Result of successful function execution
Raises:
- TimeoutError: If function doesn't succeed within timeout
- Exception: If function fails with non-retryable exception
"""
def get_current_datetime() -> datetime.datetime:
"""
Get current datetime with Python version compatibility.
Handles datetime retrieval across Python 3.8+ versions
with proper UTC handling.
Returns:
Current datetime in UTC
"""from pytest_postgresql import factories
from pathlib import Path
# Load single SQL file
postgresql_with_schema = factories.postgresql_proc(
load=[Path('/path/to/schema.sql')]
)
# Load multiple SQL files in order
postgresql_with_data = factories.postgresql_proc(
load=[
Path('/path/to/schema.sql'),
Path('/path/to/seed_data.sql'),
Path('/path/to/test_fixtures.sql')
]
)
def test_sql_loading(postgresql_with_data):
"""Test database loaded with SQL files."""
# Schema and data are already loaded
cur = postgresql_with_data.cursor()
cur.execute("SELECT COUNT(*) FROM users;")
count = cur.fetchone()[0]
assert count > 0 # Data was loaded
cur.close()from pytest_postgresql import factories
import psycopg
def create_test_schema(**kwargs):
"""Create test schema and tables."""
with psycopg.connect(**kwargs) as conn:
with conn.cursor() as cur:
cur.execute("""
CREATE SCHEMA IF NOT EXISTS test_schema;
CREATE TABLE test_schema.users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE test_schema.posts (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES test_schema.users(id),
title VARCHAR(200) NOT NULL,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_posts_user_id ON test_schema.posts(user_id);
""")
conn.commit()
def insert_test_data(**kwargs):
"""Insert test data."""
with psycopg.connect(**kwargs) as conn:
with conn.cursor() as cur:
# Insert test users
cur.execute("""
INSERT INTO test_schema.users (username, email) VALUES
('alice', 'alice@example.com'),
('bob', 'bob@example.com'),
('charlie', 'charlie@example.com');
""")
# Insert test posts
cur.execute("""
INSERT INTO test_schema.posts (user_id, title, content) VALUES
(1, 'First Post', 'This is Alice''s first post'),
(1, 'Second Post', 'Alice''s follow-up post'),
(2, 'Bob''s Thoughts', 'Bob shares his ideas'),
(3, 'Charlie''s Update', 'Latest from Charlie');
""")
conn.commit()
postgresql_with_python_data = factories.postgresql_proc(
load=[create_test_schema, insert_test_data]
)
def test_python_callable_loading(postgresql_with_python_data):
"""Test database loaded with Python callables."""
cur = postgresql_with_python_data.cursor()
# Verify schema creation
cur.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'test_schema';
""")
tables = [row[0] for row in cur.fetchall()]
assert 'users' in tables
assert 'posts' in tables
# Verify data insertion
cur.execute("SELECT COUNT(*) FROM test_schema.users;")
user_count = cur.fetchone()[0]
assert user_count == 3
cur.execute("SELECT COUNT(*) FROM test_schema.posts;")
post_count = cur.fetchone()[0]
assert post_count == 4
cur.close()# myapp/fixtures.py
import psycopg
def load_application_schema(**kwargs):
"""Load application schema from fixtures module."""
with psycopg.connect(**kwargs) as conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE applications (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
version VARCHAR(20) NOT NULL,
status VARCHAR(20) DEFAULT 'active'
);
""")
conn.commit()
def load_sample_applications(**kwargs):
"""Load sample application data."""
with psycopg.connect(**kwargs) as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO applications (name, version, status) VALUES
('MyApp', '1.0.0', 'active'),
('TestApp', '0.9.0', 'beta'),
('LegacyApp', '2.1.0', 'deprecated');
""")
conn.commit()
# Use import strings in test configuration
from pytest_postgresql import factories
postgresql_with_imports = factories.postgresql_proc(
load=[
'myapp.fixtures.load_application_schema',
'myapp.fixtures.load_sample_applications'
]
)
def test_import_string_loading(postgresql_with_imports):
"""Test database loaded with import strings."""
cur = postgresql_with_imports.cursor()
cur.execute("SELECT name, version FROM applications ORDER BY id;")
apps = cur.fetchall()
assert len(apps) == 3
assert apps[0] == ('MyApp', '1.0.0')
assert apps[1] == ('TestApp', '0.9.0')
assert apps[2] == ('LegacyApp', '2.1.0')
cur.close()from pytest_postgresql import factories
from pathlib import Path
def custom_indexes(**kwargs):
"""Add custom indexes after schema creation."""
with psycopg.connect(**kwargs) as conn:
with conn.cursor() as cur:
cur.execute("""
CREATE INDEX CONCURRENTLY idx_users_email_lower
ON users (LOWER(email));
CREATE INDEX CONCURRENTLY idx_posts_created_at
ON posts (created_at DESC);
""")
conn.commit()
postgresql_mixed_loading = factories.postgresql_proc(
load=[
Path('/path/to/base_schema.sql'), # SQL file
'myapp.fixtures.load_reference_data', # Import string
custom_indexes, # Python callable
Path('/path/to/test_data.sql') # Another SQL file
]
)
def test_mixed_loading(postgresql_mixed_loading):
"""Test mixed loading types."""
# All loaders executed in order
passfrom pytest_postgresql.loader import build_loader, sql
from pytest_postgresql.retry import retry
from pathlib import Path
import psycopg
def test_loader_error_handling():
"""Test error handling in data loading."""
# Test SQL file loading with missing file
try:
sql(Path('/nonexistent/file.sql'),
host='localhost', port=5432, user='postgres', dbname='test')
except FileNotFoundError:
pass # Expected
# Test build_loader with invalid import
try:
loader = build_loader('nonexistent.module.function')
# This will fail when the loader is called
except (ImportError, AttributeError):
pass # Expected
def test_retry_mechanism():
"""Test retry functionality for unreliable operations."""
attempt_count = 0
def flaky_function():
nonlocal attempt_count
attempt_count += 1
if attempt_count < 3:
raise psycopg.OperationalError("Connection failed")
return "success"
# Retry will succeed on third attempt
result = retry(flaky_function, timeout=10, possible_exception=psycopg.OperationalError)
assert result == "success"
assert attempt_count == 3
def robust_data_loader(**kwargs):
"""Robust data loader with retry logic."""
def connect_and_load():
with psycopg.connect(**kwargs) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE IF NOT EXISTS robust_test (id INT);")
conn.commit()
# Use retry for initial connection
retry(connect_and_load, timeout=30, possible_exception=psycopg.OperationalError)
postgresql_robust = factories.postgresql_proc(
load=[robust_data_loader]
)def conditional_loader(**kwargs):
"""Load data conditionally based on environment."""
import os
with psycopg.connect(**kwargs) as conn:
with conn.cursor() as cur:
# Always create basic schema
cur.execute("""
CREATE TABLE environments (
id SERIAL PRIMARY KEY,
name VARCHAR(50),
config JSONB
);
""")
# Load different data based on environment
if os.getenv('TEST_ENV') == 'development':
cur.execute("""
INSERT INTO environments (name, config) VALUES
('dev', '{"debug": true, "logging": "verbose"}');
""")
elif os.getenv('TEST_ENV') == 'production':
cur.execute("""
INSERT INTO environments (name, config) VALUES
('prod', '{"debug": false, "logging": "error"}');
""")
else:
cur.execute("""
INSERT INTO environments (name, config) VALUES
('test', '{"debug": true, "logging": "info"}');
""")
conn.commit()def create_parameterized_loader(table_count=5, rows_per_table=100):
"""Create a parameterized data loader."""
def parameterized_loader(**kwargs):
with psycopg.connect(**kwargs) as conn:
with conn.cursor() as cur:
for i in range(table_count):
table_name = f"test_table_{i}"
cur.execute(f"""
CREATE TABLE {table_name} (
id SERIAL PRIMARY KEY,
data VARCHAR(100),
value INTEGER
);
""")
# Insert test data
for j in range(rows_per_table):
cur.execute(f"""
INSERT INTO {table_name} (data, value)
VALUES ('data_{j}', {j});
""")
conn.commit()
return parameterized_loader
postgresql_parameterized = factories.postgresql_proc(
load=[create_parameterized_loader(table_count=3, rows_per_table=50)]
)def transaction_safe_loader(**kwargs):
"""Load data with proper transaction handling."""
with psycopg.connect(**kwargs) as conn:
try:
with conn.cursor() as cur:
# Start transaction
cur.execute("BEGIN;")
# Create tables
cur.execute("""
CREATE TABLE accounts (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
balance DECIMAL(10,2)
);
""")
cur.execute("""
CREATE TABLE transactions (
id SERIAL PRIMARY KEY,
account_id INTEGER REFERENCES accounts(id),
amount DECIMAL(10,2),
transaction_type VARCHAR(20)
);
""")
# Insert data
cur.execute("""
INSERT INTO accounts (name, balance) VALUES
('Account A', 1000.00),
('Account B', 500.00);
""")
cur.execute("""
INSERT INTO transactions (account_id, amount, transaction_type) VALUES
(1, -100.00, 'withdrawal'),
(2, 100.00, 'deposit');
""")
# Commit transaction
cur.execute("COMMIT;")
except Exception as e:
# Rollback on error
conn.rollback()
raise eInstall with Tessl CLI
npx tessl i tessl/pypi-pytest-postgresql