An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive exception hierarchy mapping all PostgreSQL error codes to Python exceptions with detailed error information, proper inheritance structure, and practical error handling patterns.
AsyncPG provides a complete mapping of PostgreSQL SQLSTATE codes to specific Python exception classes, enabling precise error handling.
class PostgresError(Exception):
"""Base class for all PostgreSQL server errors."""
# Error details available as attributes
severity: str # Error severity (ERROR, FATAL, etc.)
severity_en: str # English severity
sqlstate: str # PostgreSQL SQLSTATE code
message: str # Primary error message
detail: str # Detailed error information
hint: str # Suggestion for fixing the error
position: str # Character position in query (if applicable)
internal_position: str # Internal query position
internal_query: str # Internal query text
context: str # Error context
schema_name: str # Schema name (if applicable)
table_name: str # Table name (if applicable)
column_name: str # Column name (if applicable)
data_type_name: str # Data type name (if applicable)
constraint_name: str # Constraint name (if applicable)
class FatalPostgresError(PostgresError):
"""A fatal error that should result in server disconnection."""
class UnknownPostgresError(FatalPostgresError):
"""An error with an unknown SQLSTATE code."""Exceptions originating from the asyncpg client rather than PostgreSQL server.
class InterfaceError(Exception):
"""An error caused by improper use of asyncpg API."""
class ClientConfigurationError(InterfaceError, ValueError):
"""An error caused by improper client configuration."""
class InterfaceWarning(UserWarning):
"""A warning for asyncpg API usage."""
class DataError(InterfaceError, ValueError):
"""Invalid query input data."""
class InternalClientError(Exception):
"""All unexpected errors not classified otherwise."""
class ProtocolError(InternalClientError):
"""Unexpected condition in PostgreSQL protocol handling."""
class UnsupportedClientFeatureError(InterfaceError):
"""Requested feature is unsupported by asyncpg."""
class UnsupportedServerFeatureError(InterfaceError):
"""Requested feature is unsupported by PostgreSQL server."""
class OutdatedSchemaCacheError(InternalClientError):
"""A value decoding error caused by a schema change."""
class TargetServerAttributeNotMatched(InternalClientError):
"""Could not find a host satisfying target attribute requirement."""Errors related to database connectivity and authentication.
class PostgresConnectionError(PostgresError):
"""Base class for connection-related errors."""
class ConnectionDoesNotExistError(PostgresConnectionError):
"""The connection does not exist (SQLSTATE: 08003)."""
class ConnectionFailureError(PostgresConnectionError):
"""Connection failure (SQLSTATE: 08006)."""
class ClientCannotConnectError(PostgresConnectionError):
"""Client cannot connect to server (SQLSTATE: 08001)."""
class ConnectionRejectionError(PostgresConnectionError):
"""Server rejected connection (SQLSTATE: 08004)."""
class ProtocolViolationError(PostgresConnectionError):
"""Protocol violation (SQLSTATE: 08P01)."""
class InvalidAuthorizationSpecificationError(PostgresError):
"""Authentication failed (SQLSTATE: 28000)."""
class InvalidPasswordError(InvalidAuthorizationSpecificationError):
"""Invalid password (SQLSTATE: 28P01)."""import asyncpg
import asyncio
async def connect_with_retry(dsn, max_retries=3):
"""Connect with automatic retry for transient failures."""
for attempt in range(max_retries):
try:
return await asyncpg.connect(dsn, timeout=10.0)
except asyncpg.ConnectionFailureError:
print(f"Connection failed, attempt {attempt + 1}/{max_retries}")
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
except asyncpg.InvalidPasswordError:
print("Authentication failed - check credentials")
raise # Don't retry authentication errors
except asyncpg.ClientCannotConnectError:
print("Cannot reach server - check host and port")
raise # Don't retry unreachable serverErrors related to data validation, type conversion, and constraint violations.
class PostgresDataError(PostgresError):
"""Base class for PostgreSQL data-related errors (SQLSTATE: 22000)."""
class InvalidTextRepresentationError(PostgresDataError):
"""Invalid input syntax for data type (SQLSTATE: 22P02)."""
class InvalidBinaryRepresentationError(PostgresDataError):
"""Invalid binary representation (SQLSTATE: 22P03)."""
class NumericValueOutOfRangeError(PostgresDataError):
"""Numeric value out of range (SQLSTATE: 22003)."""
class DivisionByZeroError(PostgresDataError):
"""Division by zero (SQLSTATE: 22012)."""
class StringDataRightTruncationError(PostgresDataError):
"""String data right truncation (SQLSTATE: 22001)."""
class DatetimeFieldOverflowError(PostgresDataError):
"""Datetime field overflow (SQLSTATE: 22008)."""
class InvalidDatetimeFormatError(PostgresDataError):
"""Invalid datetime format (SQLSTATE: 22007)."""
class InvalidTimeZoneDisplacementValueError(PostgresDataError):
"""Invalid timezone displacement (SQLSTATE: 22009)."""async def safe_insert_user(conn, name, age, email):
"""Insert user with comprehensive data validation error handling."""
try:
return await conn.execute(
"INSERT INTO users(name, age, email) VALUES($1, $2, $3)",
name, age, email
)
except asyncpg.InvalidTextRepresentationError as e:
print(f"Invalid data format: {e}")
if 'age' in str(e):
raise ValueError("Age must be a valid integer")
elif 'email' in str(e):
raise ValueError("Email format is invalid")
else:
raise ValueError(f"Invalid data format: {e}")
except asyncpg.NumericValueOutOfRangeError:
raise ValueError("Age value is out of valid range")
except asyncpg.StringDataRightTruncationError as e:
if 'name' in e.column_name:
raise ValueError("Name is too long (maximum 100 characters)")
elif 'email' in e.column_name:
raise ValueError("Email is too long (maximum 255 characters)")Errors related to database constraints and referential integrity.
class IntegrityConstraintViolationError(PostgresError):
"""Base class for constraint violations (SQLSTATE: 23000)."""
class NotNullViolationError(IntegrityConstraintViolationError):
"""NOT NULL constraint violation (SQLSTATE: 23502)."""
class ForeignKeyViolationError(IntegrityConstraintViolationError):
"""Foreign key constraint violation (SQLSTATE: 23503)."""
class UniqueViolationError(IntegrityConstraintViolationError):
"""Unique constraint violation (SQLSTATE: 23505)."""
class CheckViolationError(IntegrityConstraintViolationError):
"""Check constraint violation (SQLSTATE: 23514)."""
class ExclusionViolationError(IntegrityConstraintViolationError):
"""Exclusion constraint violation (SQLSTATE: 23P01)."""async def create_user_safely(conn, user_data):
"""Create user with proper constraint violation handling."""
try:
user_id = await conn.fetchval(
"""
INSERT INTO users(username, email, age)
VALUES($1, $2, $3)
RETURNING id
""",
user_data['username'],
user_data['email'],
user_data['age']
)
return user_id
except asyncpg.UniqueViolationError as e:
if 'username' in e.constraint_name:
raise ValueError("Username already exists")
elif 'email' in e.constraint_name:
raise ValueError("Email address already registered")
else:
raise ValueError("Duplicate value in unique field")
except asyncpg.NotNullViolationError as e:
raise ValueError(f"Required field missing: {e.column_name}")
except asyncpg.CheckViolationError as e:
if 'age' in e.constraint_name:
raise ValueError("Age must be between 13 and 120")
else:
raise ValueError(f"Data validation failed: {e.constraint_name}")
except asyncpg.ForeignKeyViolationError as e:
raise ValueError(f"Referenced record does not exist: {e.constraint_name}")
async def update_order_status(conn, order_id, status, user_id):
"""Update order with referential integrity checks."""
try:
await conn.execute(
"UPDATE orders SET status = $1, updated_by = $2 WHERE id = $3",
status, user_id, order_id
)
except asyncpg.ForeignKeyViolationError as e:
if 'updated_by' in e.constraint_name:
raise ValueError("Invalid user ID")
else:
raise ValueError("Referenced record does not exist")Errors related to transaction state and concurrency control.
class InvalidTransactionStateError(PostgresError):
"""Base class for transaction state errors (SQLSTATE: 25000)."""
class ActiveSQLTransactionError(InvalidTransactionStateError):
"""Cannot start transaction - already in one (SQLSTATE: 25001)."""
class NoActiveSQLTransactionError(InvalidTransactionStateError):
"""No active transaction (SQLSTATE: 25P01)."""
class InFailedSQLTransactionError(InvalidTransactionStateError):
"""Current transaction is aborted (SQLSTATE: 25P02)."""
class ReadOnlySQLTransactionError(InvalidTransactionStateError):
"""Cannot execute in read-only transaction (SQLSTATE: 25006)."""
class TransactionRollbackError(PostgresError):
"""Base class for transaction rollback errors (SQLSTATE: 40000)."""
class SerializationError(TransactionRollbackError):
"""Serialization failure (SQLSTATE: 40001)."""
class DeadlockDetectedError(TransactionRollbackError):
"""Deadlock detected (SQLSTATE: 40P01)."""async def transfer_money_with_retry(conn, from_account, to_account, amount, max_retries=3):
"""Money transfer with serialization error retry."""
for attempt in range(max_retries):
try:
async with conn.transaction(isolation='serializable'):
# Check balance
balance = await conn.fetchval(
"SELECT balance FROM accounts WHERE id = $1",
from_account
)
if balance < amount:
raise ValueError("Insufficient funds")
# Perform transfer
await conn.execute(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, from_account
)
await conn.execute(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount, to_account
)
return True # Success
except asyncpg.SerializationError:
if attempt == max_retries - 1:
raise TransferFailedError("Transfer failed due to concurrent modifications")
# Retry with exponential backoff
await asyncio.sleep(0.1 * (2 ** attempt))
except asyncpg.DeadlockDetectedError:
if attempt == max_retries - 1:
raise TransferFailedError("Transfer failed due to deadlock")
await asyncio.sleep(0.05 * (2 ** attempt))
except asyncpg.ReadOnlySQLTransactionError:
raise TransferFailedError("Cannot perform transfer in read-only transaction")
async def safe_transaction_operation(conn, operation):
"""Execute operation with transaction error handling."""
try:
async with conn.transaction():
return await operation(conn)
except asyncpg.ActiveSQLTransactionError:
# Already in transaction, execute directly
return await operation(conn)
except asyncpg.InFailedSQLTransactionError:
# Transaction is in failed state, must rollback
print("Transaction failed, rolling back and retrying")
raise # Let caller handle retry logicErrors related to SQL syntax, permissions, and schema objects.
class SyntaxOrAccessError(PostgresError):
"""Base class for syntax and access errors (SQLSTATE: 42000)."""
class PostgresSyntaxError(SyntaxOrAccessError):
"""SQL syntax error (SQLSTATE: 42601)."""
class InsufficientPrivilegeError(SyntaxOrAccessError):
"""Insufficient privilege (SQLSTATE: 42501)."""
class UndefinedTableError(SyntaxOrAccessError):
"""Table does not exist (SQLSTATE: 42P01)."""
class UndefinedColumnError(SyntaxOrAccessError):
"""Column does not exist (SQLSTATE: 42703)."""
class UndefinedFunctionError(SyntaxOrAccessError):
"""Function does not exist (SQLSTATE: 42883)."""
class DuplicateTableError(SyntaxOrAccessError):
"""Table already exists (SQLSTATE: 42P07)."""
class DuplicateColumnError(SyntaxOrAccessError):
"""Column already exists (SQLSTATE: 42701)."""
class AmbiguousColumnError(SyntaxOrAccessError):
"""Column reference is ambiguous (SQLSTATE: 42702)."""async def dynamic_query_executor(conn, table_name, columns, conditions):
"""Execute dynamic queries with comprehensive error handling."""
# Build query dynamically
column_list = ', '.join(columns)
where_clause = ' AND '.join(f"{k} = ${i+1}" for i, k in enumerate(conditions.keys()))
query = f"SELECT {column_list} FROM {table_name} WHERE {where_clause}"
try:
return await conn.fetch(query, *conditions.values())
except asyncpg.UndefinedTableError:
raise ValueError(f"Table '{table_name}' does not exist")
except asyncpg.UndefinedColumnError as e:
raise ValueError(f"Column does not exist: {e.message}")
except asyncpg.PostgresSyntaxError as e:
raise ValueError(f"Invalid SQL syntax: {e.message}")
except asyncpg.InsufficientPrivilegeError:
raise PermissionError(f"Access denied to table '{table_name}'")
except asyncpg.AmbiguousColumnError as e:
raise ValueError(f"Ambiguous column reference: {e.message}")
async def safe_table_creation(conn, table_name, schema):
"""Create table with proper error handling."""
try:
await conn.execute(f"CREATE TABLE {table_name} ({schema})")
return True
except asyncpg.DuplicateTableError:
print(f"Table {table_name} already exists")
return False
except asyncpg.PostgresSyntaxError as e:
raise ValueError(f"Invalid table schema: {e.message}")
except asyncpg.InsufficientPrivilegeError:
raise PermissionError("Cannot create table - insufficient privileges")Errors related to system resources and server limitations.
class InsufficientResourcesError(PostgresError):
"""Base class for resource errors (SQLSTATE: 53000)."""
class DiskFullError(InsufficientResourcesError):
"""Disk full (SQLSTATE: 53100)."""
class OutOfMemoryError(InsufficientResourcesError):
"""Out of memory (SQLSTATE: 53200)."""
class TooManyConnectionsError(InsufficientResourcesError):
"""Too many connections (SQLSTATE: 53300)."""
class ProgramLimitExceededError(PostgresError):
"""Base class for program limit errors (SQLSTATE: 54000)."""
class StatementTooComplexError(ProgramLimitExceededError):
"""Statement too complex (SQLSTATE: 54001)."""
class TooManyColumnsError(ProgramLimitExceededError):
"""Too many columns (SQLSTATE: 54011)."""Access detailed error information for logging and debugging.
async def detailed_error_handler():
"""Demonstrate accessing detailed error information."""
try:
await conn.execute("INSERT INTO users(id, name) VALUES(1, 'Alice')")
except asyncpg.PostgresError as e:
# Access all available error details
error_info = {
'sqlstate': e.sqlstate,
'severity': e.severity,
'message': e.message,
'detail': e.detail,
'hint': e.hint,
'position': e.position,
'context': e.context,
'schema_name': e.schema_name,
'table_name': e.table_name,
'column_name': e.column_name,
'constraint_name': e.constraint_name,
}
# Log comprehensive error information
print(f"PostgreSQL Error {e.sqlstate}: {e.message}")
if e.detail:
print(f"Detail: {e.detail}")
if e.hint:
print(f"Hint: {e.hint}")
if e.position:
print(f"Position: {e.position}")
# Error-specific handling
if isinstance(e, asyncpg.UniqueViolationError):
handle_duplicate_key(e.constraint_name)
elif isinstance(e, asyncpg.ForeignKeyViolationError):
handle_referential_integrity(e.constraint_name)# Base exception types
class PostgresError(Exception):
"""Base PostgreSQL error with detailed attributes."""
class FatalPostgresError(PostgresError):
"""Fatal error requiring disconnection."""
class UnknownPostgresError(FatalPostgresError):
"""Error with unknown SQLSTATE code."""
class InterfaceError(Exception):
"""Client-side API usage error."""
class InterfaceWarning(UserWarning):
"""Client-side API usage warning."""
class DataError(InterfaceError, ValueError):
"""Invalid query input data."""
class InternalClientError(Exception):
"""Internal asyncpg error."""
# Error detail attributes available on PostgresError instances
ErrorDetails = typing.TypedDict('ErrorDetails', {
'severity': str,
'severity_en': str,
'sqlstate': str,
'message': str,
'detail': str,
'hint': str,
'position': str,
'internal_position': str,
'internal_query': str,
'context': str,
'schema_name': str,
'table_name': str,
'column_name': str,
'data_type_name': str,
'constraint_name': str,
})