CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pytest-postgresql

Postgresql fixtures and fixture factories for Pytest.

Pending
Overview
Eval results
Files

data-loading.mddocs/

Data Loading

Flexible data loading system for database initialization and test data setup, supporting SQL files, Python callables, and import strings with comprehensive error handling and retry logic.

Capabilities

Loader Builder

Converts various input types into callable loaders for database initialization.

def build_loader(load: Union[Callable, str, Path]) -> Callable:
    """
    Build a loader callable from various input types.
    
    Supports three input types:
    - Path objects: Loads SQL files using the sql() function
    - String: Import path like 'module.function' or 'module:function'
    - Callable: Returns the callable directly
    
    Parameters:
    - load: SQL file path, import string, or callable function
    
    Returns:
    Callable that accepts connection parameters as kwargs
    """

SQL File Loader

Loads and executes SQL files into the database.

def sql(sql_filename: Path, **kwargs: Any) -> None:
    """
    Load SQL file into database.
    
    Connects to database using provided connection parameters,
    reads the SQL file, and executes its contents.
    
    Parameters:
    - sql_filename: Path to SQL file to execute
    - **kwargs: Database connection parameters (host, port, user, etc.)
    
    Returns:
    None
    
    Raises:
    - FileNotFoundError: If SQL file doesn't exist
    - psycopg.Error: If SQL execution fails
    """

Retry Utility

Robust retry mechanism for handling temporary connection failures.

def retry(
    func: Callable[[], T],
    timeout: int = 60,
    possible_exception: Type[Exception] = Exception,
) -> T:
    """
    Retry function execution with timeout for handling temporary failures.
    
    Particularly useful for database connections that may fail initially
    due to server startup delays.
    
    Parameters:
    - func: Function to retry
    - timeout: Maximum retry time in seconds (default: 60)
    - possible_exception: Exception type to catch and retry (default: Exception)
    
    Returns:
    Result of successful function execution
    
    Raises:
    - TimeoutError: If function doesn't succeed within timeout
    - Exception: If function fails with non-retryable exception
    """

def get_current_datetime() -> datetime.datetime:
    """
    Get current datetime with Python version compatibility.
    
    Handles datetime retrieval across Python 3.8+ versions
    with proper UTC handling.
    
    Returns:
    Current datetime in UTC
    """

Usage Examples

SQL File Loading

from pytest_postgresql import factories
from pathlib import Path

# Load single SQL file
postgresql_with_schema = factories.postgresql_proc(
    load=[Path('/path/to/schema.sql')]
)

# Load multiple SQL files in order
postgresql_with_data = factories.postgresql_proc(
    load=[
        Path('/path/to/schema.sql'),
        Path('/path/to/seed_data.sql'),
        Path('/path/to/test_fixtures.sql')
    ]
)

def test_sql_loading(postgresql_with_data):
    """Test database loaded with SQL files."""
    # Schema and data are already loaded
    cur = postgresql_with_data.cursor()
    cur.execute("SELECT COUNT(*) FROM users;")
    count = cur.fetchone()[0]
    assert count > 0  # Data was loaded
    cur.close()

Python Callable Loading

from pytest_postgresql import factories
import psycopg

def create_test_schema(**kwargs):
    """Create test schema and tables."""
    with psycopg.connect(**kwargs) as conn:
        with conn.cursor() as cur:
            cur.execute("""
                CREATE SCHEMA IF NOT EXISTS test_schema;
                
                CREATE TABLE test_schema.users (
                    id SERIAL PRIMARY KEY,
                    username VARCHAR(50) UNIQUE NOT NULL,
                    email VARCHAR(100) UNIQUE NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                
                CREATE TABLE test_schema.posts (
                    id SERIAL PRIMARY KEY,
                    user_id INTEGER REFERENCES test_schema.users(id),
                    title VARCHAR(200) NOT NULL,
                    content TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                
                CREATE INDEX idx_posts_user_id ON test_schema.posts(user_id);
            """)
        conn.commit()

def insert_test_data(**kwargs):
    """Insert test data."""
    with psycopg.connect(**kwargs) as conn:
        with conn.cursor() as cur:
            # Insert test users
            cur.execute("""
                INSERT INTO test_schema.users (username, email) VALUES
                ('alice', 'alice@example.com'),
                ('bob', 'bob@example.com'),
                ('charlie', 'charlie@example.com');
            """)
            
            # Insert test posts
            cur.execute("""
                INSERT INTO test_schema.posts (user_id, title, content) VALUES
                (1, 'First Post', 'This is Alice''s first post'),
                (1, 'Second Post', 'Alice''s follow-up post'),
                (2, 'Bob''s Thoughts', 'Bob shares his ideas'),
                (3, 'Charlie''s Update', 'Latest from Charlie');
            """)
        conn.commit()

postgresql_with_python_data = factories.postgresql_proc(
    load=[create_test_schema, insert_test_data]
)

def test_python_callable_loading(postgresql_with_python_data):
    """Test database loaded with Python callables."""
    cur = postgresql_with_python_data.cursor()
    
    # Verify schema creation
    cur.execute("""
        SELECT table_name FROM information_schema.tables 
        WHERE table_schema = 'test_schema';
    """)
    tables = [row[0] for row in cur.fetchall()]
    assert 'users' in tables
    assert 'posts' in tables
    
    # Verify data insertion
    cur.execute("SELECT COUNT(*) FROM test_schema.users;")
    user_count = cur.fetchone()[0]
    assert user_count == 3
    
    cur.execute("SELECT COUNT(*) FROM test_schema.posts;")
    post_count = cur.fetchone()[0]
    assert post_count == 4
    
    cur.close()

Import String Loading

# myapp/fixtures.py
import psycopg

def load_application_schema(**kwargs):
    """Load application schema from fixtures module."""
    with psycopg.connect(**kwargs) as conn:
        with conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE applications (
                    id SERIAL PRIMARY KEY,
                    name VARCHAR(100) NOT NULL,
                    version VARCHAR(20) NOT NULL,
                    status VARCHAR(20) DEFAULT 'active'
                );
            """)
        conn.commit()

def load_sample_applications(**kwargs):
    """Load sample application data."""
    with psycopg.connect(**kwargs) as conn:
        with conn.cursor() as cur:
            cur.execute("""
                INSERT INTO applications (name, version, status) VALUES
                ('MyApp', '1.0.0', 'active'),
                ('TestApp', '0.9.0', 'beta'),
                ('LegacyApp', '2.1.0', 'deprecated');
            """)
        conn.commit()

# Use import strings in test configuration
from pytest_postgresql import factories

postgresql_with_imports = factories.postgresql_proc(
    load=[
        'myapp.fixtures.load_application_schema',
        'myapp.fixtures.load_sample_applications'
    ]
)

def test_import_string_loading(postgresql_with_imports):
    """Test database loaded with import strings."""
    cur = postgresql_with_imports.cursor()
    cur.execute("SELECT name, version FROM applications ORDER BY id;")
    apps = cur.fetchall()
    
    assert len(apps) == 3
    assert apps[0] == ('MyApp', '1.0.0')
    assert apps[1] == ('TestApp', '0.9.0')
    assert apps[2] == ('LegacyApp', '2.1.0')
    
    cur.close()

Mixed Loading Types

from pytest_postgresql import factories
from pathlib import Path

def custom_indexes(**kwargs):
    """Add custom indexes after schema creation."""
    with psycopg.connect(**kwargs) as conn:
        with conn.cursor() as cur:
            cur.execute("""
                CREATE INDEX CONCURRENTLY idx_users_email_lower 
                ON users (LOWER(email));
                
                CREATE INDEX CONCURRENTLY idx_posts_created_at 
                ON posts (created_at DESC);
            """)
        conn.commit()

postgresql_mixed_loading = factories.postgresql_proc(
    load=[
        Path('/path/to/base_schema.sql'),          # SQL file
        'myapp.fixtures.load_reference_data',     # Import string
        custom_indexes,                           # Python callable
        Path('/path/to/test_data.sql')           # Another SQL file
    ]
)

def test_mixed_loading(postgresql_mixed_loading):
    """Test mixed loading types."""
    # All loaders executed in order
    pass

Error Handling and Retry

from pytest_postgresql.loader import build_loader, sql
from pytest_postgresql.retry import retry
from pathlib import Path
import psycopg

def test_loader_error_handling():
    """Test error handling in data loading."""
    
    # Test SQL file loading with missing file
    try:
        sql(Path('/nonexistent/file.sql'), 
            host='localhost', port=5432, user='postgres', dbname='test')
    except FileNotFoundError:
        pass  # Expected
    
    # Test build_loader with invalid import
    try:
        loader = build_loader('nonexistent.module.function')
        # This will fail when the loader is called
    except (ImportError, AttributeError):
        pass  # Expected

def test_retry_mechanism():
    """Test retry functionality for unreliable operations."""
    attempt_count = 0
    
    def flaky_function():
        nonlocal attempt_count
        attempt_count += 1
        if attempt_count < 3:
            raise psycopg.OperationalError("Connection failed")
        return "success"
    
    # Retry will succeed on third attempt
    result = retry(flaky_function, timeout=10, possible_exception=psycopg.OperationalError)
    assert result == "success"
    assert attempt_count == 3

def robust_data_loader(**kwargs):
    """Robust data loader with retry logic."""
    def connect_and_load():
        with psycopg.connect(**kwargs) as conn:
            with conn.cursor() as cur:
                cur.execute("CREATE TABLE IF NOT EXISTS robust_test (id INT);")
            conn.commit()
    
    # Use retry for initial connection
    retry(connect_and_load, timeout=30, possible_exception=psycopg.OperationalError)

postgresql_robust = factories.postgresql_proc(
    load=[robust_data_loader]
)

Advanced Loading Patterns

Conditional Loading

def conditional_loader(**kwargs):
    """Load data conditionally based on environment."""
    import os
    
    with psycopg.connect(**kwargs) as conn:
        with conn.cursor() as cur:
            # Always create basic schema
            cur.execute("""
                CREATE TABLE environments (
                    id SERIAL PRIMARY KEY,
                    name VARCHAR(50),
                    config JSONB
                );
            """)
            
            # Load different data based on environment
            if os.getenv('TEST_ENV') == 'development':
                cur.execute("""
                    INSERT INTO environments (name, config) VALUES
                    ('dev', '{"debug": true, "logging": "verbose"}');
                """)
            elif os.getenv('TEST_ENV') == 'production':
                cur.execute("""
                    INSERT INTO environments (name, config) VALUES
                    ('prod', '{"debug": false, "logging": "error"}');
                """)
            else:
                cur.execute("""
                    INSERT INTO environments (name, config) VALUES
                    ('test', '{"debug": true, "logging": "info"}');
                """)
        conn.commit()

Parameterized Loading

def create_parameterized_loader(table_count=5, rows_per_table=100):
    """Create a parameterized data loader."""
    def parameterized_loader(**kwargs):
        with psycopg.connect(**kwargs) as conn:
            with conn.cursor() as cur:
                for i in range(table_count):
                    table_name = f"test_table_{i}"
                    cur.execute(f"""
                        CREATE TABLE {table_name} (
                            id SERIAL PRIMARY KEY,
                            data VARCHAR(100),
                            value INTEGER
                        );
                    """)
                    
                    # Insert test data
                    for j in range(rows_per_table):
                        cur.execute(f"""
                            INSERT INTO {table_name} (data, value) 
                            VALUES ('data_{j}', {j});
                        """)
            conn.commit()
    
    return parameterized_loader

postgresql_parameterized = factories.postgresql_proc(
    load=[create_parameterized_loader(table_count=3, rows_per_table=50)]
)

Transaction-Safe Loading

def transaction_safe_loader(**kwargs):
    """Load data with proper transaction handling."""
    with psycopg.connect(**kwargs) as conn:
        try:
            with conn.cursor() as cur:
                # Start transaction
                cur.execute("BEGIN;")
                
                # Create tables
                cur.execute("""
                    CREATE TABLE accounts (
                        id SERIAL PRIMARY KEY,
                        name VARCHAR(100),
                        balance DECIMAL(10,2)
                    );
                """)
                
                cur.execute("""
                    CREATE TABLE transactions (
                        id SERIAL PRIMARY KEY,
                        account_id INTEGER REFERENCES accounts(id),
                        amount DECIMAL(10,2),
                        transaction_type VARCHAR(20)
                    );
                """)
                
                # Insert data
                cur.execute("""
                    INSERT INTO accounts (name, balance) VALUES
                    ('Account A', 1000.00),
                    ('Account B', 500.00);
                """)
                
                cur.execute("""
                    INSERT INTO transactions (account_id, amount, transaction_type) VALUES
                    (1, -100.00, 'withdrawal'),
                    (2, 100.00, 'deposit');
                """)
                
                # Commit transaction
                cur.execute("COMMIT;")
                
        except Exception as e:
            # Rollback on error
            conn.rollback()
            raise e

Install with Tessl CLI

npx tessl i tessl/pypi-pytest-postgresql

docs

configuration.md

data-loading.md

database-connections.md

external-server.md

index.md

process-management.md

tile.json