PostgreSQL database adapter for Python
—
Complete DB-API 2.0 compliant exception hierarchy with PostgreSQL-specific error information, diagnostic details, and proper error classification for robust error handling.
Complete DB-API 2.0 compliant exception hierarchy for proper error handling and classification.
class Error(Exception):
"""
Base exception class for all psycopg errors.
Root of the exception hierarchy, inherits from Python's Exception.
All psycopg-specific exceptions derive from this class.
"""
class Warning(Exception):
"""
Warning exception for important database warnings.
Raised for database warning messages that don't prevent
operation completion but indicate potential issues.
"""
class InterfaceError(Error):
"""
Error related to database interface rather than database itself.
Raised for problems with the psycopg interface, such as:
- Invalid connection parameters
- Interface misuse
- Driver configuration issues
"""
class DatabaseError(Error):
"""
Base class for database-related errors.
Raised for errors that are related to the database rather
than the interface. All specific database errors inherit from this.
"""
class DataError(DatabaseError):
"""
Error due to problems with processed data.
Raised when:
- Data values are out of range
- Type conversion fails
- Data format is invalid
- Constraint violations due to data content
"""
class OperationalError(DatabaseError):
"""
Error related to database operation, not under programmer control.
Raised for database operational issues:
- Connection failures
- Memory allocation errors
- Database server errors
- Network timeouts
"""
class IntegrityError(DatabaseError):
"""
Error when database relational integrity is compromised.
Raised for constraint violations:
- Primary key violations
- Foreign key violations
- Unique constraint violations
- Check constraint violations
"""
class InternalError(DatabaseError):
"""
Error when database encounters internal error.
Raised when database system encounters unexpected internal errors:
- Internal database errors
- Transaction state errors
- System resource errors
"""
class ProgrammingError(DatabaseError):
"""
Error due to programmer error.
Raised for programming mistakes:
- SQL syntax errors
- Table or column doesn't exist
- Wrong number of parameters
- Invalid operation for current state
"""
class NotSupportedError(DatabaseError):
"""
Error when using unsupported database feature.
Raised when attempting to use features not supported by:
- Current PostgreSQL version
- Current psycopg configuration
- Database permissions
"""Additional exceptions specific to PostgreSQL functionality and psycopg features.
class ConnectionTimeout(OperationalError):
"""Raised when connection attempt times out"""
class QueryCanceled(OperationalError):
"""Raised when query is canceled by user or timeout"""
class TransactionRollbackError(OperationalError):
"""Raised when transaction is rolled back by server"""
class DeadlockDetected(OperationalError):
"""Raised when deadlock is detected and resolved"""
class SerializationFailure(OperationalError):
"""Raised when serializable transaction fails due to conflicts"""
class PipelineAborted(OperationalError):
"""Raised when pipeline operation fails due to pipeline being in aborted state"""
class UndefinedTable(ProgrammingError):
"""Raised when referencing non-existent table"""
class UndefinedColumn(ProgrammingError):
"""Raised when referencing non-existent column"""
class SyntaxError(ProgrammingError):
"""Raised for SQL syntax errors"""
class InvalidName(ProgrammingError):
"""Raised for invalid database object names"""
class InsufficientPrivilege(ProgrammingError):
"""Raised when operation requires higher privileges"""Detailed error information and diagnostic data from PostgreSQL server.
class Diagnostic:
"""
Container for PostgreSQL error diagnostic information.
Provides detailed information about database errors including
error codes, messages, context, and location information.
"""
@property
def severity(self) -> str | None:
"""Error severity level (ERROR, FATAL, PANIC, WARNING, NOTICE, etc.)"""
@property
def severity_nonlocalized(self) -> str | None:
"""Error severity in English (not localized)"""
@property
def sqlstate(self) -> str | None:
"""
SQLSTATE error code.
Standard 5-character error code identifying error type.
See PostgreSQL documentation for complete list.
"""
@property
def message_primary(self) -> str | None:
"""Primary error message"""
@property
def message_detail(self) -> str | None:
"""Detailed error message with additional context"""
@property
def message_hint(self) -> str | None:
"""Hint message suggesting how to fix the error"""
@property
def statement_position(self) -> int | None:
"""Character position in SQL statement where error occurred"""
@property
def internal_position(self) -> int | None:
"""Character position in internal query where error occurred"""
@property
def internal_query(self) -> str | None:
"""Internal query that caused the error"""
@property
def context(self) -> str | None:
"""Error context information"""
@property
def schema_name(self) -> str | None:
"""Schema name related to the error"""
@property
def table_name(self) -> str | None:
"""Table name related to the error"""
@property
def column_name(self) -> str | None:
"""Column name related to the error"""
@property
def datatype_name(self) -> str | None:
"""Data type name related to the error"""
@property
def constraint_name(self) -> str | None:
"""Constraint name related to the error"""
@property
def source_file(self) -> str | None:
"""PostgreSQL source file where error occurred"""
@property
def source_line(self) -> int | None:
"""Line number in PostgreSQL source file"""
@property
def source_function(self) -> str | None:
"""Function name in PostgreSQL source code"""
# Exception classes have diagnostic property
class DatabaseError(Error):
@property
def diag(self) -> Diagnostic:
"""Diagnostic information for this error"""Exceptions specific to connection pooling functionality.
# Note: Requires psycopg-pool package
from psycopg_pool import PoolError
class PoolError(Error):
"""Base class for connection pool errors"""
class PoolClosed(PoolError):
"""Raised when operating on closed pool"""
class PoolTimeout(PoolError):
"""Raised when pool operation times out"""
class TooManyRequests(PoolError):
"""Raised when pool cannot satisfy connection request"""import psycopg
from psycopg import errors
try:
with psycopg.connect("dbname=test user=postgres") as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM nonexistent_table")
except errors.UndefinedTable as e:
print(f"Table not found: {e}")
print(f"SQLSTATE: {e.diag.sqlstate}")
print(f"Detail: {e.diag.message_detail}")
except errors.ProgrammingError as e:
print(f"Programming error: {e}")
except errors.OperationalError as e:
print(f"Database operational error: {e}")
except errors.Error as e:
print(f"Database error: {e}")def analyze_database_error(error):
"""Analyze database error and provide detailed information"""
if not isinstance(error, psycopg.DatabaseError):
return f"Non-database error: {error}"
diag = error.diag
analysis = []
# Basic error info
analysis.append(f"Error Type: {type(error).__name__}")
analysis.append(f"SQLSTATE: {diag.sqlstate}")
analysis.append(f"Severity: {diag.severity}")
analysis.append(f"Message: {diag.message_primary}")
# Additional details
if diag.message_detail:
analysis.append(f"Detail: {diag.message_detail}")
if diag.message_hint:
analysis.append(f"Hint: {diag.message_hint}")
# Location information
if diag.statement_position:
analysis.append(f"Error at position: {diag.statement_position}")
# Object information
objects = []
if diag.schema_name:
objects.append(f"schema: {diag.schema_name}")
if diag.table_name:
objects.append(f"table: {diag.table_name}")
if diag.column_name:
objects.append(f"column: {diag.column_name}")
if diag.constraint_name:
objects.append(f"constraint: {diag.constraint_name}")
if objects:
analysis.append(f"Related objects: {', '.join(objects)}")
# Context
if diag.context:
analysis.append(f"Context: {diag.context}")
return "\n".join(analysis)
# Usage
try:
with conn.cursor() as cur:
cur.execute("INSERT INTO users (id, email) VALUES (1, 'duplicate@example.com')")
except psycopg.IntegrityError as e:
print(analyze_database_error(e))
# Output:
# Error Type: IntegrityError
# SQLSTATE: 23505
# Severity: ERROR
# Message: duplicate key value violates unique constraint "users_email_key"
# Detail: Key (email)=(duplicate@example.com) already exists.
# Related objects: table: users, constraint: users_email_keyimport time
from psycopg import errors
def execute_with_retry(conn, query, params=None, max_retries=3):
"""Execute query with automatic retry for transient errors"""
for attempt in range(max_retries + 1):
try:
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchall()
except errors.SerializationFailure:
if attempt < max_retries:
# Retry serialization failures
time.sleep(0.1 * (2 ** attempt)) # Exponential backoff
continue
raise
except errors.DeadlockDetected:
if attempt < max_retries:
# Retry deadlocks with random delay
time.sleep(0.1 + random.random() * 0.2)
continue
raise
except errors.ConnectionTimeout:
if attempt < max_retries:
# Retry connection timeouts
time.sleep(1.0)
continue
raise
except (errors.DataError, errors.ProgrammingError):
# Don't retry programming errors
raise
except errors.OperationalError as e:
# Retry some operational errors
if "server closed the connection" in str(e).lower():
if attempt < max_retries:
time.sleep(1.0)
continue
raise
def safe_database_operation(conn, operation_func, *args, **kwargs):
"""Safely execute database operation with comprehensive error handling"""
try:
return operation_func(conn, *args, **kwargs)
except errors.IntegrityError as e:
# Handle constraint violations
if e.diag.sqlstate == "23505": # Unique violation
raise ValueError(f"Duplicate value: {e.diag.message_detail}")
elif e.diag.sqlstate == "23503": # Foreign key violation
raise ValueError(f"Reference error: {e.diag.message_detail}")
else:
raise ValueError(f"Data integrity error: {e}")
except errors.DataError as e:
# Handle data format/type errors
raise ValueError(f"Invalid data: {e.diag.message_primary}")
except errors.ProgrammingError as e:
# Handle SQL/programming errors
if e.diag.sqlstate == "42P01": # Undefined table
raise RuntimeError(f"Table not found: {e.diag.table_name}")
elif e.diag.sqlstate == "42703": # Undefined column
raise RuntimeError(f"Column not found: {e.diag.column_name}")
else:
raise RuntimeError(f"SQL error: {e}")
except errors.OperationalError as e:
# Handle operational errors
raise ConnectionError(f"Database unavailable: {e}")def create_robust_connection(conninfo, max_attempts=3):
"""Create database connection with retry logic"""
for attempt in range(max_attempts):
try:
conn = psycopg.connect(conninfo)
# Test connection
with conn.cursor() as cur:
cur.execute("SELECT 1")
cur.fetchone()
return conn
except errors.OperationalError as e:
if attempt < max_attempts - 1:
print(f"Connection attempt {attempt + 1} failed: {e}")
time.sleep(2 ** attempt) # Exponential backoff
continue
# Analyze connection error
error_msg = str(e).lower()
if "connection refused" in error_msg:
raise ConnectionError("Database server is not running")
elif "authentication failed" in error_msg:
raise ConnectionError("Invalid credentials")
elif "database" in error_msg and "does not exist" in error_msg:
raise ConnectionError("Database does not exist")
else:
raise ConnectionError(f"Cannot connect to database: {e}")
except errors.InterfaceError as e:
raise ConnectionError(f"Connection interface error: {e}")
def connection_health_check(conn):
"""Check if connection is healthy and usable"""
try:
if conn.closed:
return False, "Connection is closed"
if conn.broken:
return False, "Connection is broken"
# Test with simple query
with conn.cursor() as cur:
cur.execute("SELECT 1")
result = cur.fetchone()
if result != (1,):
return False, "Unexpected query result"
return True, "Connection is healthy"
except Exception as e:
return False, f"Health check failed: {e}"def safe_transaction(conn, transaction_func, *args, **kwargs):
"""Execute function within transaction with proper error handling"""
savepoint_name = None
try:
# Start transaction
if conn.autocommit:
conn.autocommit = False
# Create savepoint for nested transactions
if conn.info.transaction_status != 0: # Already in transaction
savepoint_name = f"sp_{int(time.time() * 1000000)}"
with conn.cursor() as cur:
cur.execute(f"SAVEPOINT {savepoint_name}")
# Execute transaction function
result = transaction_func(conn, *args, **kwargs)
# Commit or release savepoint
if savepoint_name:
with conn.cursor() as cur:
cur.execute(f"RELEASE SAVEPOINT {savepoint_name}")
else:
conn.commit()
return result
except errors.SerializationFailure:
# Handle serialization failures
if savepoint_name:
with conn.cursor() as cur:
cur.execute(f"ROLLBACK TO SAVEPOINT {savepoint_name}")
else:
conn.rollback()
raise
except errors.DeadlockDetected:
# Handle deadlocks
if savepoint_name:
with conn.cursor() as cur:
cur.execute(f"ROLLBACK TO SAVEPOINT {savepoint_name}")
else:
conn.rollback()
raise
except Exception:
# Rollback on any other error
if savepoint_name:
try:
with conn.cursor() as cur:
cur.execute(f"ROLLBACK TO SAVEPOINT {savepoint_name}")
except:
conn.rollback() # Full rollback if savepoint fails
else:
conn.rollback()
raise
# Usage
def transfer_money(conn, from_account, to_account, amount):
"""Transfer money between accounts safely"""
def transfer_operation(conn):
with conn.cursor() as cur:
# Check source account balance
cur.execute(
"SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
(from_account,)
)
balance = cur.fetchone()
if not balance or balance[0] < amount:
raise ValueError("Insufficient funds")
# Perform transfer
cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account)
)
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account)
)
return True
return safe_transaction(conn, transfer_operation)Common PostgreSQL SQLSTATE codes for error classification:
# Class 08 - Connection Exception
CONNECTION_EXCEPTION = "08000"
CONNECTION_FAILURE = "08006"
CONNECTION_DOES_NOT_EXIST = "08003"
# Class 23 - Integrity Constraint Violation
INTEGRITY_CONSTRAINT_VIOLATION = "23000"
RESTRICT_VIOLATION = "23001"
NOT_NULL_VIOLATION = "23502"
FOREIGN_KEY_VIOLATION = "23503"
UNIQUE_VIOLATION = "23505"
CHECK_VIOLATION = "23514"
# Class 42 - Syntax Error or Access Rule Violation
SYNTAX_ERROR_OR_ACCESS_RULE_VIOLATION = "42000"
SYNTAX_ERROR = "42601"
INSUFFICIENT_PRIVILEGE = "42501"
UNDEFINED_TABLE = "42P01"
UNDEFINED_COLUMN = "42703"
UNDEFINED_FUNCTION = "42883"
DUPLICATE_TABLE = "42P07"
DUPLICATE_COLUMN = "42701"
# Class 40 - Transaction Rollback
TRANSACTION_ROLLBACK = "40000"
SERIALIZATION_FAILURE = "40001"
DEADLOCK_DETECTED = "40P01"
# Class 53 - Insufficient Resources
INSUFFICIENT_RESOURCES = "53000"
DISK_FULL = "53100"
OUT_OF_MEMORY = "53200"
TOO_MANY_CONNECTIONS = "53300"
# Class 57 - Operator Intervention
OPERATOR_INTERVENTION = "57000"
QUERY_CANCELED = "57014"
ADMIN_SHUTDOWN = "57P01"
CRASH_SHUTDOWN = "57P02"
CANNOT_CONNECT_NOW = "57P03"Install with Tessl CLI
npx tessl i tessl/pypi-psycopg