A self-contained Python driver for communicating with MySQL servers, using an API that is compliant with the Python Database API Specification v2.0 (PEP 249).
—
Comprehensive exception hierarchy for robust error handling and debugging, providing specific error types that match MySQL server and client error conditions.
class Error(Exception):
"""
Base exception class for all MySQL Connector errors.
Attributes:
msg: Error message
errno: MySQL error number
sqlstate: SQL state code
"""
def __init__(self, msg: Optional[str] = None, errno: Optional[int] = None, sqlstate: Optional[str] = None) -> None:
pass
@property
def msg(self) -> str:
"""Error message."""
pass
@property
def errno(self) -> Optional[int]:
"""MySQL error number."""
pass
@property
def sqlstate(self) -> Optional[str]:
"""SQL state code."""
pass
class Warning(Exception):
"""
Warning exception class for non-critical issues.
Raised for MySQL warnings when raise_on_warnings is True.
"""
passclass InterfaceError(Error):
"""
Interface-related errors.
Problems with the database interface rather than the database itself.
"""
pass
class DatabaseError(Error):
"""
Database-related errors.
Base class for errors related to database operations.
"""
passclass DataError(DatabaseError):
"""
Data processing errors.
Problems with data types, values, or format.
"""
pass
class OperationalError(DatabaseError):
"""
Operation-related errors.
Connection issues, query execution problems, server errors.
"""
pass
class IntegrityError(DatabaseError):
"""
Database constraint violations.
Foreign key, unique constraint, check constraint violations.
"""
pass
class InternalError(DatabaseError):
"""
Internal database errors.
MySQL server internal errors.
"""
pass
class ProgrammingError(DatabaseError):
"""
Programming errors.
SQL syntax errors, parameter issues, API misuse.
"""
pass
class NotSupportedError(DatabaseError):
"""
Unsupported operation errors.
Features not supported by MySQL or connector version.
"""
passclass PoolError(Error):
"""
Connection pooling errors.
Pool configuration, exhaustion, and management issues.
"""
passclass ConnectionTimeoutError(OperationalError):
"""Connection timeout errors."""
pass
class ReadTimeoutError(OperationalError):
"""Read operation timeout errors."""
pass
class WriteTimeoutError(OperationalError):
"""Write operation timeout errors."""
passdef custom_error_exception(error: Dict[str, Any] = None, **kwargs) -> Type[Error]:
"""
Create custom error exceptions.
Args:
error: Error information dictionary
**kwargs: Additional error parameters
Returns:
Appropriate Error subclass instance
"""
pass
def get_mysql_exception(packet: bytes) -> Error:
"""
Extract MySQL exception from error packet.
Args:
packet: MySQL error packet bytes
Returns:
Appropriate Error subclass instance
"""
pass
def get_exception(packet: bytes) -> Error:
"""
Get appropriate exception from error packet.
Args:
packet: Error packet from MySQL server
Returns:
Appropriate Error subclass instance
"""
passCommon MySQL server errors that map to specific exception types:
# Connection errors (OperationalError)
ER_ACCESS_DENIED_ERROR = 1045 # Access denied for user
ER_BAD_HOST_ERROR = 1042 # Can't get hostname for your address
ER_CON_COUNT_ERROR = 1040 # Too many connections
ER_HOST_IS_BLOCKED = 1129 # Host is blocked
ER_HOST_NOT_PRIVILEGED = 1130 # Host not privileged to connect
# Syntax errors (ProgrammingError)
ER_PARSE_ERROR = 1064 # SQL syntax error
ER_BAD_FIELD_ERROR = 1054 # Unknown column
ER_BAD_TABLE_ERROR = 1051 # Unknown table
ER_NO_SUCH_TABLE = 1146 # Table doesn't exist
ER_BAD_DB_ERROR = 1049 # Unknown database
# Constraint violations (IntegrityError)
ER_DUP_ENTRY = 1062 # Duplicate entry for key
ER_NO_REFERENCED_ROW = 1216 # Foreign key constraint fails (insert/update)
ER_ROW_IS_REFERENCED = 1217 # Foreign key constraint fails (delete/update)
ER_NO_DEFAULT_FOR_FIELD = 1364 # Field doesn't have default value
# Data errors (DataError)
ER_BAD_NULL_ERROR = 1048 # Column cannot be null
ER_DATA_TOO_LONG = 1406 # Data too long for column
ER_TRUNCATED_WRONG_VALUE = 1292 # Truncated incorrect value
ER_WARN_DATA_OUT_OF_RANGE = 1264 # Out of range value for column
# Lock errors (OperationalError)
ER_LOCK_WAIT_TIMEOUT = 1205 # Lock wait timeout exceeded
ER_LOCK_DEADLOCK = 1213 # Deadlock found when trying to get lock
# Server errors (InternalError)
ER_DISK_FULL = 1021 # Disk full
ER_OUT_OF_MEMORY = 1037 # Out of memory
ER_SERVER_SHUTDOWN = 1053 # Server shutdown in progressMySQL client library errors:
# Connection errors
CR_CONNECTION_ERROR = 2003 # Can't connect to MySQL server
CR_CONN_HOST_ERROR = 2005 # Unknown MySQL server host
CR_SERVER_GONE_ERROR = 2006 # MySQL server has gone away
CR_SERVER_LOST = 2013 # Lost connection to MySQL server during query
# Protocol errors
CR_COMMANDS_OUT_OF_SYNC = 2014 # Commands out of sync
CR_UNKNOWN_ERROR = 2000 # Unknown MySQL error
CR_MALFORMED_PACKET = 2027 # Malformed packet
# SSL errors
CR_SSL_CONNECTION_ERROR = 2026 # SSL connection errorimport mysql.connector
from mysql.connector import Error, OperationalError, ProgrammingError
try:
connection = mysql.connector.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase'
)
cursor = connection.cursor()
cursor.execute("SELECT * FROM users WHERE id = %s", (123,))
result = cursor.fetchone()
print(result)
except Error as err:
print(f"Database error: {err}")
print(f"Error code: {err.errno}")
print(f"SQL state: {err.sqlstate}")
finally:
if 'cursor' in locals():
cursor.close()
if 'connection' in locals():
connection.close()import mysql.connector
from mysql.connector import (
Error, InterfaceError, DatabaseError, OperationalError,
ProgrammingError, IntegrityError, DataError, PoolError
)
def handle_database_operation():
try:
connection = mysql.connector.connect(
host='localhost',
user='myuser',
password='wrongpassword', # Intentional error
database='mydatabase'
)
cursor = connection.cursor()
cursor.execute("INSERT INTO users (email) VALUES (%s)", ('duplicate@example.com',))
connection.commit()
except InterfaceError as err:
print(f"Interface error: {err}")
print("Check connector installation or configuration")
except OperationalError as err:
if err.errno == 1045: # Access denied
print("Authentication failed - check username/password")
elif err.errno == 2003: # Can't connect
print("Connection failed - check host/port")
elif err.errno == 1042: # Bad host
print("Host resolution failed")
else:
print(f"Operational error: {err}")
except ProgrammingError as err:
if err.errno == 1064: # Syntax error
print(f"SQL syntax error: {err}")
elif err.errno == 1054: # Unknown column
print(f"Column not found: {err}")
elif err.errno == 1146: # Table doesn't exist
print(f"Table not found: {err}")
else:
print(f"Programming error: {err}")
except IntegrityError as err:
if err.errno == 1062: # Duplicate entry
print(f"Duplicate key violation: {err}")
elif err.errno == 1216: # Foreign key constraint
print(f"Foreign key constraint failed: {err}")
else:
print(f"Integrity constraint violation: {err}")
except DataError as err:
if err.errno == 1048: # Column cannot be null
print(f"Required field missing: {err}")
elif err.errno == 1406: # Data too long
print(f"Data exceeds column length: {err}")
else:
print(f"Data error: {err}")
except DatabaseError as err:
print(f"General database error: {err}")
except Error as err:
print(f"MySQL Connector error: {err}")
handle_database_operation()import mysql.connector
from mysql.connector import PoolError, Error
import time
def handle_pool_operations():
try:
# Create small pool for demonstration
config = {
'host': 'localhost',
'user': 'myuser',
'password': 'mypassword',
'database': 'mydatabase',
'pool_name': 'test_pool',
'pool_size': 2,
'pool_timeout': 5 # 5 second timeout
}
# Get connections to exhaust pool
connections = []
for i in range(3): # Try to get more than pool size
try:
conn = mysql.connector.connect(**config)
connections.append(conn)
print(f"Got connection {i+1}")
except PoolError as err:
print(f"Pool error: {err}")
break
except Error as err:
print(f"Database error: {err}")
finally:
# Return connections to pool
for conn in connections:
conn.close()
handle_pool_operations()import mysql.connector
from mysql.connector import Error, IntegrityError, OperationalError
def transfer_with_error_handling(from_account: int, to_account: int, amount: float):
connection = None
try:
connection = mysql.connector.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase'
)
connection.start_transaction()
cursor = connection.cursor()
# Check source account balance
cursor.execute("SELECT balance FROM accounts WHERE id = %s FOR UPDATE", (from_account,))
result = cursor.fetchone()
if not result:
raise ValueError(f"Account {from_account} not found")
current_balance = result[0]
if current_balance < amount:
raise ValueError(f"Insufficient funds: {current_balance} < {amount}")
# Perform transfer
cursor.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account))
cursor.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account))
# Commit transaction
connection.commit()
print(f"Successfully transferred {amount} from {from_account} to {to_account}")
except ValueError as err:
print(f"Business logic error: {err}")
if connection:
connection.rollback()
except IntegrityError as err:
print(f"Database constraint violation: {err}")
if connection:
connection.rollback()
except OperationalError as err:
if err.errno == 1205: # Lock timeout
print("Transaction timed out waiting for lock - try again later")
elif err.errno == 1213: # Deadlock
print("Deadlock detected - transaction aborted")
else:
print(f"Operational error: {err}")
if connection:
connection.rollback()
except Error as err:
print(f"Database error during transfer: {err}")
if connection:
connection.rollback()
finally:
if connection:
cursor.close()
connection.close()
# Test transfer with error handling
transfer_with_error_handling(1, 2, 100.0)import mysql.connector
from mysql.connector import Warning
def handle_warnings():
try:
connection = mysql.connector.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase',
get_warnings=True, # Automatically fetch warnings
raise_on_warnings=False # Don't raise exceptions for warnings
)
cursor = connection.cursor()
# Execute statement that may generate warnings
cursor.execute("INSERT INTO users (age) VALUES (%s)", (150,)) # Age out of normal range
# Check for warnings
if connection.warning_count > 0:
warnings = connection.get_warnings()
for level, code, message in warnings:
print(f"Warning {code}: {message} (Level: {level})")
connection.commit()
except Warning as warn:
print(f"Warning raised as exception: {warn}")
except Error as err:
print(f"Database error: {err}")
finally:
cursor.close()
connection.close()
handle_warnings()import mysql.connector
from mysql.connector import Error
import logging
# Configure logging
logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def execute_with_logging(query: str, params: tuple = None):
"""Execute query with comprehensive error logging."""
connection = None
cursor = None
try:
connection = mysql.connector.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase'
)
cursor = connection.cursor()
cursor.execute(query, params)
if cursor.with_rows:
results = cursor.fetchall()
logger.info(f"Query returned {len(results)} rows")
return results
else:
logger.info(f"Query affected {cursor.rowcount} rows")
return cursor.rowcount
except Error as err:
# Log error with context
logger.error(f"MySQL Error {err.errno}: {err.msg}")
logger.error(f"SQL State: {err.sqlstate}")
logger.error(f"Query: {query}")
logger.error(f"Parameters: {params}")
# Log specific error type
error_type = type(err).__name__
logger.error(f"Error Type: {error_type}")
# Re-raise for caller to handle
raise
except Exception as err:
logger.error(f"Unexpected error: {err}")
raise
finally:
if cursor:
cursor.close()
if connection:
connection.close()
# Usage with error logging
try:
results = execute_with_logging("SELECT * FROM users WHERE id = %s", (123,))
print(results)
except Error as err:
print(f"Database operation failed: {err}")import mysql.connector
from mysql.connector import Error, OperationalError
import time
import random
def execute_with_retry(query: str, params: tuple = None, max_retries: int = 3):
"""Execute query with retry logic for transient errors."""
# Error codes that should trigger retry
RETRYABLE_ERRORS = {
1205, # Lock wait timeout
1213, # Deadlock
2006, # Server has gone away
2013, # Lost connection during query
}
for attempt in range(max_retries + 1):
try:
connection = mysql.connector.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase'
)
cursor = connection.cursor()
cursor.execute(query, params)
if cursor.with_rows:
results = cursor.fetchall()
else:
results = cursor.rowcount
connection.commit()
cursor.close()
connection.close()
return results
except OperationalError as err:
cursor.close() if 'cursor' in locals() else None
connection.close() if 'connection' in locals() else None
if err.errno in RETRYABLE_ERRORS and attempt < max_retries:
# Calculate exponential backoff with jitter
backoff = (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed with retryable error {err.errno}. "
f"Retrying in {backoff:.2f} seconds...")
time.sleep(backoff)
continue
else:
print(f"Non-retryable error or max retries exceeded: {err}")
raise
except Error as err:
cursor.close() if 'cursor' in locals() else None
connection.close() if 'connection' in locals() else None
print(f"Non-retryable database error: {err}")
raise
# Usage with retry logic
try:
result = execute_with_retry("UPDATE accounts SET balance = balance + 100 WHERE id = 1")
print(f"Update successful, {result} rows affected")
except Error as err:
print(f"Operation failed after retries: {err}")Install with Tessl CLI
npx tessl i tessl/pypi-mysql-connector-python