CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asyncpg

An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

exception-handling.mddocs/

Exception Handling

Comprehensive exception hierarchy mapping all PostgreSQL error codes to Python exceptions with detailed error information, proper inheritance structure, and practical error handling patterns.

Capabilities

Exception Hierarchy

AsyncPG provides a complete mapping of PostgreSQL SQLSTATE codes to specific Python exception classes, enabling precise error handling.

class PostgresError(Exception):
    """Base class for all PostgreSQL server errors."""
    
    # Error details available as attributes
    severity: str           # Error severity (ERROR, FATAL, etc.)
    severity_en: str        # English severity
    sqlstate: str          # PostgreSQL SQLSTATE code
    message: str           # Primary error message
    detail: str            # Detailed error information
    hint: str              # Suggestion for fixing the error
    position: str          # Character position in query (if applicable)
    internal_position: str # Internal query position
    internal_query: str    # Internal query text
    context: str           # Error context
    schema_name: str       # Schema name (if applicable)
    table_name: str        # Table name (if applicable) 
    column_name: str       # Column name (if applicable)
    data_type_name: str    # Data type name (if applicable)
    constraint_name: str   # Constraint name (if applicable)

class FatalPostgresError(PostgresError):
    """A fatal error that should result in server disconnection."""

class UnknownPostgresError(FatalPostgresError):
    """An error with an unknown SQLSTATE code."""

Client-Side Exceptions

Exceptions originating from the asyncpg client rather than PostgreSQL server.

class InterfaceError(Exception):
    """An error caused by improper use of asyncpg API."""

class ClientConfigurationError(InterfaceError, ValueError):
    """An error caused by improper client configuration."""

class InterfaceWarning(UserWarning):
    """A warning for asyncpg API usage."""

class DataError(InterfaceError, ValueError):
    """Invalid query input data."""

class InternalClientError(Exception):
    """All unexpected errors not classified otherwise."""

class ProtocolError(InternalClientError):
    """Unexpected condition in PostgreSQL protocol handling."""

class UnsupportedClientFeatureError(InterfaceError):
    """Requested feature is unsupported by asyncpg."""

class UnsupportedServerFeatureError(InterfaceError):
    """Requested feature is unsupported by PostgreSQL server."""

class OutdatedSchemaCacheError(InternalClientError):
    """A value decoding error caused by a schema change."""

class TargetServerAttributeNotMatched(InternalClientError):
    """Could not find a host satisfying target attribute requirement."""

Connection Errors

Errors related to database connectivity and authentication.

class PostgresConnectionError(PostgresError):
    """Base class for connection-related errors."""

class ConnectionDoesNotExistError(PostgresConnectionError):
    """The connection does not exist (SQLSTATE: 08003)."""

class ConnectionFailureError(PostgresConnectionError):
    """Connection failure (SQLSTATE: 08006)."""

class ClientCannotConnectError(PostgresConnectionError):
    """Client cannot connect to server (SQLSTATE: 08001)."""

class ConnectionRejectionError(PostgresConnectionError):
    """Server rejected connection (SQLSTATE: 08004)."""

class ProtocolViolationError(PostgresConnectionError):
    """Protocol violation (SQLSTATE: 08P01)."""

class InvalidAuthorizationSpecificationError(PostgresError):
    """Authentication failed (SQLSTATE: 28000)."""

class InvalidPasswordError(InvalidAuthorizationSpecificationError):
    """Invalid password (SQLSTATE: 28P01)."""

Example Usage

import asyncpg
import asyncio

async def connect_with_retry(dsn, max_retries=3):
    """Connect with automatic retry for transient failures."""
    
    for attempt in range(max_retries):
        try:
            return await asyncpg.connect(dsn, timeout=10.0)
            
        except asyncpg.ConnectionFailureError:
            print(f"Connection failed, attempt {attempt + 1}/{max_retries}")
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
            
        except asyncpg.InvalidPasswordError:
            print("Authentication failed - check credentials")
            raise  # Don't retry authentication errors
            
        except asyncpg.ClientCannotConnectError:
            print("Cannot reach server - check host and port")
            raise  # Don't retry unreachable server

Data Errors

Errors related to data validation, type conversion, and constraint violations.

class PostgresDataError(PostgresError):
    """Base class for PostgreSQL data-related errors (SQLSTATE: 22000)."""

class InvalidTextRepresentationError(PostgresDataError):
    """Invalid input syntax for data type (SQLSTATE: 22P02)."""

class InvalidBinaryRepresentationError(PostgresDataError):
    """Invalid binary representation (SQLSTATE: 22P03)."""

class NumericValueOutOfRangeError(PostgresDataError):
    """Numeric value out of range (SQLSTATE: 22003)."""

class DivisionByZeroError(PostgresDataError):
    """Division by zero (SQLSTATE: 22012)."""

class StringDataRightTruncationError(PostgresDataError):
    """String data right truncation (SQLSTATE: 22001)."""

class DatetimeFieldOverflowError(PostgresDataError):
    """Datetime field overflow (SQLSTATE: 22008)."""

class InvalidDatetimeFormatError(PostgresDataError):
    """Invalid datetime format (SQLSTATE: 22007)."""

class InvalidTimeZoneDisplacementValueError(PostgresDataError):
    """Invalid timezone displacement (SQLSTATE: 22009)."""

Example Usage

async def safe_insert_user(conn, name, age, email):
    """Insert user with comprehensive data validation error handling."""
    
    try:
        return await conn.execute(
            "INSERT INTO users(name, age, email) VALUES($1, $2, $3)",
            name, age, email
        )
        
    except asyncpg.InvalidTextRepresentationError as e:
        print(f"Invalid data format: {e}")
        if 'age' in str(e):
            raise ValueError("Age must be a valid integer")
        elif 'email' in str(e):
            raise ValueError("Email format is invalid")
        else:
            raise ValueError(f"Invalid data format: {e}")
            
    except asyncpg.NumericValueOutOfRangeError:
        raise ValueError("Age value is out of valid range")
        
    except asyncpg.StringDataRightTruncationError as e:
        if 'name' in e.column_name:
            raise ValueError("Name is too long (maximum 100 characters)")
        elif 'email' in e.column_name:
            raise ValueError("Email is too long (maximum 255 characters)")

Integrity Constraint Violations

Errors related to database constraints and referential integrity.

class IntegrityConstraintViolationError(PostgresError):
    """Base class for constraint violations (SQLSTATE: 23000)."""

class NotNullViolationError(IntegrityConstraintViolationError):
    """NOT NULL constraint violation (SQLSTATE: 23502)."""

class ForeignKeyViolationError(IntegrityConstraintViolationError):
    """Foreign key constraint violation (SQLSTATE: 23503)."""

class UniqueViolationError(IntegrityConstraintViolationError):
    """Unique constraint violation (SQLSTATE: 23505)."""

class CheckViolationError(IntegrityConstraintViolationError):
    """Check constraint violation (SQLSTATE: 23514)."""

class ExclusionViolationError(IntegrityConstraintViolationError):
    """Exclusion constraint violation (SQLSTATE: 23P01)."""

Example Usage

async def create_user_safely(conn, user_data):
    """Create user with proper constraint violation handling."""
    
    try:
        user_id = await conn.fetchval(
            """
            INSERT INTO users(username, email, age)
            VALUES($1, $2, $3)
            RETURNING id
            """,
            user_data['username'],
            user_data['email'], 
            user_data['age']
        )
        return user_id
        
    except asyncpg.UniqueViolationError as e:
        if 'username' in e.constraint_name:
            raise ValueError("Username already exists")
        elif 'email' in e.constraint_name:
            raise ValueError("Email address already registered")
        else:
            raise ValueError("Duplicate value in unique field")
            
    except asyncpg.NotNullViolationError as e:
        raise ValueError(f"Required field missing: {e.column_name}")
        
    except asyncpg.CheckViolationError as e:
        if 'age' in e.constraint_name:
            raise ValueError("Age must be between 13 and 120")
        else:
            raise ValueError(f"Data validation failed: {e.constraint_name}")
            
    except asyncpg.ForeignKeyViolationError as e:
        raise ValueError(f"Referenced record does not exist: {e.constraint_name}")

async def update_order_status(conn, order_id, status, user_id):
    """Update order with referential integrity checks."""
    
    try:
        await conn.execute(
            "UPDATE orders SET status = $1, updated_by = $2 WHERE id = $3",
            status, user_id, order_id
        )
        
    except asyncpg.ForeignKeyViolationError as e:
        if 'updated_by' in e.constraint_name:
            raise ValueError("Invalid user ID")
        else:
            raise ValueError("Referenced record does not exist")

Transaction Errors

Errors related to transaction state and concurrency control.

class InvalidTransactionStateError(PostgresError):
    """Base class for transaction state errors (SQLSTATE: 25000)."""

class ActiveSQLTransactionError(InvalidTransactionStateError):
    """Cannot start transaction - already in one (SQLSTATE: 25001)."""

class NoActiveSQLTransactionError(InvalidTransactionStateError):
    """No active transaction (SQLSTATE: 25P01)."""

class InFailedSQLTransactionError(InvalidTransactionStateError):
    """Current transaction is aborted (SQLSTATE: 25P02)."""

class ReadOnlySQLTransactionError(InvalidTransactionStateError):
    """Cannot execute in read-only transaction (SQLSTATE: 25006)."""

class TransactionRollbackError(PostgresError):
    """Base class for transaction rollback errors (SQLSTATE: 40000)."""

class SerializationError(TransactionRollbackError):
    """Serialization failure (SQLSTATE: 40001)."""

class DeadlockDetectedError(TransactionRollbackError):
    """Deadlock detected (SQLSTATE: 40P01)."""

Example Usage

async def transfer_money_with_retry(conn, from_account, to_account, amount, max_retries=3):
    """Money transfer with serialization error retry."""
    
    for attempt in range(max_retries):
        try:
            async with conn.transaction(isolation='serializable'):
                # Check balance
                balance = await conn.fetchval(
                    "SELECT balance FROM accounts WHERE id = $1",
                    from_account
                )
                
                if balance < amount:
                    raise ValueError("Insufficient funds")
                
                # Perform transfer
                await conn.execute(
                    "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
                    amount, from_account
                )
                await conn.execute(  
                    "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
                    amount, to_account
                )
                
                return True  # Success
                
        except asyncpg.SerializationError:
            if attempt == max_retries - 1:
                raise TransferFailedError("Transfer failed due to concurrent modifications")
            # Retry with exponential backoff
            await asyncio.sleep(0.1 * (2 ** attempt))
            
        except asyncpg.DeadlockDetectedError:
            if attempt == max_retries - 1:
                raise TransferFailedError("Transfer failed due to deadlock")
            await asyncio.sleep(0.05 * (2 ** attempt))
            
        except asyncpg.ReadOnlySQLTransactionError:
            raise TransferFailedError("Cannot perform transfer in read-only transaction")

async def safe_transaction_operation(conn, operation):
    """Execute operation with transaction error handling."""
    
    try:
        async with conn.transaction():
            return await operation(conn)
            
    except asyncpg.ActiveSQLTransactionError:
        # Already in transaction, execute directly
        return await operation(conn)
        
    except asyncpg.InFailedSQLTransactionError:
        # Transaction is in failed state, must rollback
        print("Transaction failed, rolling back and retrying")
        raise  # Let caller handle retry logic

Syntax and Access Errors

Errors related to SQL syntax, permissions, and schema objects.

class SyntaxOrAccessError(PostgresError):
    """Base class for syntax and access errors (SQLSTATE: 42000)."""

class PostgresSyntaxError(SyntaxOrAccessError):
    """SQL syntax error (SQLSTATE: 42601)."""

class InsufficientPrivilegeError(SyntaxOrAccessError):
    """Insufficient privilege (SQLSTATE: 42501)."""

class UndefinedTableError(SyntaxOrAccessError):
    """Table does not exist (SQLSTATE: 42P01)."""

class UndefinedColumnError(SyntaxOrAccessError):
    """Column does not exist (SQLSTATE: 42703)."""

class UndefinedFunctionError(SyntaxOrAccessError):
    """Function does not exist (SQLSTATE: 42883)."""

class DuplicateTableError(SyntaxOrAccessError):
    """Table already exists (SQLSTATE: 42P07)."""

class DuplicateColumnError(SyntaxOrAccessError):
    """Column already exists (SQLSTATE: 42701)."""

class AmbiguousColumnError(SyntaxOrAccessError):
    """Column reference is ambiguous (SQLSTATE: 42702)."""

Example Usage

async def dynamic_query_executor(conn, table_name, columns, conditions):
    """Execute dynamic queries with comprehensive error handling."""
    
    # Build query dynamically
    column_list = ', '.join(columns)
    where_clause = ' AND '.join(f"{k} = ${i+1}" for i, k in enumerate(conditions.keys()))
    query = f"SELECT {column_list} FROM {table_name} WHERE {where_clause}"
    
    try:
        return await conn.fetch(query, *conditions.values())
        
    except asyncpg.UndefinedTableError:
        raise ValueError(f"Table '{table_name}' does not exist")
        
    except asyncpg.UndefinedColumnError as e:
        raise ValueError(f"Column does not exist: {e.message}")
        
    except asyncpg.PostgresSyntaxError as e:
        raise ValueError(f"Invalid SQL syntax: {e.message}")
        
    except asyncpg.InsufficientPrivilegeError:
        raise PermissionError(f"Access denied to table '{table_name}'")
        
    except asyncpg.AmbiguousColumnError as e:
        raise ValueError(f"Ambiguous column reference: {e.message}")

async def safe_table_creation(conn, table_name, schema):
    """Create table with proper error handling."""
    
    try:
        await conn.execute(f"CREATE TABLE {table_name} ({schema})")
        return True
        
    except asyncpg.DuplicateTableError:
        print(f"Table {table_name} already exists")
        return False
        
    except asyncpg.PostgresSyntaxError as e:
        raise ValueError(f"Invalid table schema: {e.message}")
        
    except asyncpg.InsufficientPrivilegeError:
        raise PermissionError("Cannot create table - insufficient privileges")

System Errors

Errors related to system resources and server limitations.

class InsufficientResourcesError(PostgresError):
    """Base class for resource errors (SQLSTATE: 53000)."""

class DiskFullError(InsufficientResourcesError):
    """Disk full (SQLSTATE: 53100)."""

class OutOfMemoryError(InsufficientResourcesError):
    """Out of memory (SQLSTATE: 53200)."""

class TooManyConnectionsError(InsufficientResourcesError):
    """Too many connections (SQLSTATE: 53300)."""

class ProgramLimitExceededError(PostgresError):
    """Base class for program limit errors (SQLSTATE: 54000)."""

class StatementTooComplexError(ProgramLimitExceededError):
    """Statement too complex (SQLSTATE: 54001)."""

class TooManyColumnsError(ProgramLimitExceededError):
    """Too many columns (SQLSTATE: 54011)."""

Error Information Access

Access detailed error information for logging and debugging.

async def detailed_error_handler():
    """Demonstrate accessing detailed error information."""
    
    try:
        await conn.execute("INSERT INTO users(id, name) VALUES(1, 'Alice')")
        
    except asyncpg.PostgresError as e:
        # Access all available error details
        error_info = {
            'sqlstate': e.sqlstate,
            'severity': e.severity,
            'message': e.message,
            'detail': e.detail,
            'hint': e.hint,
            'position': e.position,
            'context': e.context,
            'schema_name': e.schema_name,
            'table_name': e.table_name,
            'column_name': e.column_name,
            'constraint_name': e.constraint_name,
        }
        
        # Log comprehensive error information
        print(f"PostgreSQL Error {e.sqlstate}: {e.message}")
        if e.detail:
            print(f"Detail: {e.detail}")
        if e.hint:
            print(f"Hint: {e.hint}")
        if e.position:
            print(f"Position: {e.position}")
            
        # Error-specific handling
        if isinstance(e, asyncpg.UniqueViolationError):
            handle_duplicate_key(e.constraint_name)
        elif isinstance(e, asyncpg.ForeignKeyViolationError):
            handle_referential_integrity(e.constraint_name)

Types

# Base exception types
class PostgresError(Exception):
    """Base PostgreSQL error with detailed attributes."""
    
class FatalPostgresError(PostgresError):
    """Fatal error requiring disconnection."""
    
class UnknownPostgresError(FatalPostgresError):
    """Error with unknown SQLSTATE code."""
    
class InterfaceError(Exception):
    """Client-side API usage error."""
    
class InterfaceWarning(UserWarning):
    """Client-side API usage warning."""
    
class DataError(InterfaceError, ValueError):
    """Invalid query input data."""
    
class InternalClientError(Exception):
    """Internal asyncpg error."""
    
# Error detail attributes available on PostgresError instances
ErrorDetails = typing.TypedDict('ErrorDetails', {
    'severity': str,
    'severity_en': str, 
    'sqlstate': str,
    'message': str,
    'detail': str,
    'hint': str,
    'position': str,
    'internal_position': str,
    'internal_query': str,
    'context': str,
    'schema_name': str,
    'table_name': str,
    'column_name': str,
    'data_type_name': str,
    'constraint_name': str,
})

Install with Tessl CLI

npx tessl i tessl/pypi-asyncpg

docs

connection-management.md

connection-pooling.md

copy-operations.md

cursor-operations.md

exception-handling.md

index.md

listeners-notifications.md

prepared-statements.md

query-execution.md

transaction-management.md

type-system.md

tile.json