A self-contained Python driver for communicating with MySQL servers, using an API that is compliant with the Python Database API Specification v2.0 (PEP 249).
—
Comprehensive authentication support for all MySQL authentication methods including enterprise security features, SSL/TLS encryption, and advanced authentication plugins.
MySQL Connector/Python supports all MySQL authentication plugins through a flexible plugin system that handles various authentication methods from basic password authentication to enterprise features like Kerberos, LDAP, and WebAuthn.
class MySQLAuthenticationPlugin:
"""
Base class for MySQL authentication plugins.
Provides interface for implementing authentication methods.
"""
plugin_name: str = None # Plugin name identifier
requires_ssl: bool = False # Whether plugin requires SSL connection
def prepare_password(self, password: str) -> bytes:
"""
Prepare password for authentication.
Args:
password: Plain text password
Returns:
Prepared password bytes
"""
pass
def auth_response(self, auth_data: bytes, **kwargs) -> bytes:
"""
Generate authentication response.
Args:
auth_data: Server authentication challenge data
**kwargs: Additional authentication parameters
Returns:
Authentication response bytes
"""
passclass mysql_native_password:
"""
MySQL native password authentication plugin.
Uses SHA1-based hashing for password authentication.
"""
plugin_name: str = 'mysql_native_password'
requires_ssl: bool = False
@staticmethod
def prepare_password(password: str) -> bytes:
"""Prepare password using SHA1 hashing algorithm."""
pass
def auth_response(self, auth_data: bytes, password: str = '', **kwargs) -> bytes:
"""Generate native password authentication response."""
passclass caching_sha2_password:
"""
Caching SHA2 password authentication plugin.
Default authentication method for MySQL 8.0+.
Uses SHA256-based hashing with server-side caching.
"""
plugin_name: str = 'caching_sha2_password'
requires_ssl: bool = False # SSL recommended but not required
@staticmethod
def prepare_password(password: str) -> bytes:
"""Prepare password using SHA256 hashing algorithm."""
pass
def auth_response(self, auth_data: bytes, password: str = '', ssl_enabled: bool = False, **kwargs) -> bytes:
"""Generate caching SHA2 authentication response."""
passclass sha256_password:
"""
SHA256 password authentication plugin.
Uses RSA encryption over SSL or plain connection with public key.
"""
plugin_name: str = 'sha256_password'
requires_ssl: bool = True # Requires SSL for secure password transmission
@staticmethod
def prepare_password(password: str) -> bytes:
"""Prepare password for SHA256 authentication."""
pass
def auth_response(self, auth_data: bytes, password: str = '', ssl_enabled: bool = False, **kwargs) -> bytes:
"""Generate SHA256 authentication response."""
passclass mysql_clear_password:
"""
Clear text password authentication plugin.
Sends password in plain text - requires SSL for security.
"""
plugin_name: str = 'mysql_clear_password'
requires_ssl: bool = True # SSL required for security
def auth_response(self, auth_data: bytes, password: str = '', **kwargs) -> bytes:
"""Send clear text password (requires SSL)."""
passclass authentication_kerberos_client:
"""
Kerberos authentication plugin for MySQL Enterprise.
Integrates with Active Directory and Kerberos infrastructure.
"""
plugin_name: str = 'authentication_kerberos_client'
requires_ssl: bool = False
def __init__(self, **kwargs) -> None:
"""Initialize Kerberos authentication with configuration."""
pass
def auth_response(self, auth_data: bytes, **kwargs) -> bytes:
"""Perform Kerberos authentication handshake."""
passclass authentication_ldap_sasl_client:
"""
LDAP SASL authentication plugin for MySQL Enterprise.
Supports LDAP authentication with SASL mechanisms.
"""
plugin_name: str = 'authentication_ldap_sasl_client'
requires_ssl: bool = True
def __init__(self, **kwargs) -> None:
"""Initialize LDAP SASL authentication."""
pass
def auth_response(self, auth_data: bytes, password: str = '', **kwargs) -> bytes:
"""Perform LDAP SASL authentication."""
passclass authentication_oci_client:
"""
Oracle Cloud Infrastructure (OCI) authentication plugin.
Uses OCI Identity and Access Management for authentication.
"""
plugin_name: str = 'authentication_oci_client'
requires_ssl: bool = True
def __init__(self, **kwargs) -> None:
"""Initialize OCI authentication with configuration."""
pass
def auth_response(self, auth_data: bytes, **kwargs) -> bytes:
"""Perform OCI authentication using IAM tokens."""
passclass authentication_webauthn_client:
"""
WebAuthn authentication plugin for MySQL Enterprise.
Supports FIDO2/WebAuthn multi-factor authentication.
"""
plugin_name: str = 'authentication_webauthn_client'
requires_ssl: bool = True
def __init__(self, **kwargs) -> None:
"""Initialize WebAuthn authentication."""
pass
def auth_response(self, auth_data: bytes, **kwargs) -> bytes:
"""Perform WebAuthn authentication challenge."""
passclass authentication_openid_connect_client:
"""
OpenID Connect authentication plugin for MySQL Enterprise.
Supports OAuth 2.0/OpenID Connect authentication flows.
"""
plugin_name: str = 'authentication_openid_connect_client'
requires_ssl: bool = True
def __init__(self, **kwargs) -> None:
"""Initialize OpenID Connect authentication."""
pass
def auth_response(self, auth_data: bytes, **kwargs) -> bytes:
"""Perform OpenID Connect authentication flow."""
passssl_config = {
'ssl_disabled': bool, # Disable SSL/TLS (default: False)
'ssl_cert': str, # Client certificate file path
'ssl_key': str, # Client private key file path
'ssl_ca': str, # Certificate authority file path
'ssl_capath': str, # Certificate authority directory path
'ssl_cipher': str, # SSL cipher specification
'ssl_verify_cert': bool, # Verify server certificate (default: False)
'ssl_verify_identity': bool, # Verify server identity (default: False)
'tls_versions': List[str], # Allowed TLS versions (['TLSv1.2', 'TLSv1.3'])
'tls_ciphersuites': List[str], # Allowed TLS 1.3 cipher suites
}# SSL verification levels
SSL_MODES = {
'DISABLED': {
'ssl_disabled': True
},
'PREFERRED': {
'ssl_disabled': False,
'ssl_verify_cert': False,
'ssl_verify_identity': False
},
'REQUIRED': {
'ssl_disabled': False,
'ssl_verify_cert': False,
'ssl_verify_identity': False
},
'VERIFY_CA': {
'ssl_disabled': False,
'ssl_verify_cert': True,
'ssl_verify_identity': False
},
'VERIFY_IDENTITY': {
'ssl_disabled': False,
'ssl_verify_cert': True,
'ssl_verify_identity': True
}
}mfa_config = {
'password1': str, # First factor password
'password2': str, # Second factor password (MFA)
'password3': str, # Third factor password (MFA)
'auth_plugin': str, # Primary authentication plugin
'auth_plugin_map': Dict[str, Type], # Plugin mapping for different factors
}import mysql.connector
# Native password authentication (MySQL 5.7 and earlier default)
connection = mysql.connector.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase',
auth_plugin='mysql_native_password'
)
cursor = connection.cursor()
cursor.execute("SELECT USER(), @@authentication_windows_use_principal_name")
result = cursor.fetchone()
print(f"Connected as: {result[0]}")
cursor.close()
connection.close()import mysql.connector
# Caching SHA2 password authentication (MySQL 8.0+ default)
connection = mysql.connector.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase',
auth_plugin='caching_sha2_password'
)
print("Connected using caching_sha2_password")
connection.close()import mysql.connector
# SSL configuration with client certificates
ssl_config = {
'ssl_cert': '/path/to/client-cert.pem',
'ssl_key': '/path/to/client-key.pem',
'ssl_ca': '/path/to/ca-cert.pem',
'ssl_verify_cert': True,
'ssl_verify_identity': True,
'tls_versions': ['TLSv1.2', 'TLSv1.3']
}
connection = mysql.connector.connect(
host='mysql.example.com',
user='myuser',
password='mypassword',
database='mydatabase',
**ssl_config
)
print("Secure SSL connection established with certificate verification")
connection.close()import mysql.connector
# Multi-factor authentication setup
mfa_config = {
'host': 'localhost',
'user': 'mfa_user',
'password1': 'first_factor_password', # Primary password
'password2': 'second_factor_password', # Second factor (e.g., SMS/TOTP)
'password3': 'third_factor_password', # Third factor (e.g., hardware token)
'database': 'mydatabase',
'auth_plugin': 'caching_sha2_password'
}
try:
connection = mysql.connector.connect(**mfa_config)
print("Multi-factor authentication successful")
connection.close()
except mysql.connector.Error as err:
print(f"MFA authentication failed: {err}")import mysql.connector
# Kerberos authentication configuration
kerberos_config = {
'host': 'mysql.corporate.com',
'user': 'kerberos_user@CORPORATE.COM',
'database': 'mydatabase',
'auth_plugin': 'authentication_kerberos_client',
'authentication_kerberos_client_mode': 1, # GSSAPI mode
'ssl_disabled': False, # SSL recommended with Kerberos
'ssl_verify_cert': True
}
try:
connection = mysql.connector.connect(**kerberos_config)
print("Kerberos authentication successful")
cursor = connection.cursor()
cursor.execute("SELECT USER()")
result = cursor.fetchone()
print(f"Authenticated as: {result[0]}")
cursor.close()
connection.close()
except mysql.connector.Error as err:
print(f"Kerberos authentication failed: {err}")import mysql.connector
# LDAP SASL authentication configuration
ldap_config = {
'host': 'mysql.corporate.com',
'user': 'ldap_user',
'password': 'ldap_password',
'database': 'mydatabase',
'auth_plugin': 'authentication_ldap_sasl_client',
'authentication_ldap_sasl_client_mode': 1, # SCRAM-SHA-1 mode
'ssl_disabled': False, # SSL required for LDAP SASL
'ssl_verify_cert': True
}
try:
connection = mysql.connector.connect(**ldap_config)
print("LDAP SASL authentication successful")
connection.close()
except mysql.connector.Error as err:
print(f"LDAP authentication failed: {err}")import mysql.connector
# OCI authentication configuration
oci_config = {
'host': 'mysql.oraclecloud.com',
'user': 'oci_user',
'database': 'mydatabase',
'auth_plugin': 'authentication_oci_client',
'oci_config_file': '~/.oci/config', # OCI config file
'oci_config_profile': 'DEFAULT', # OCI profile name
'ssl_disabled': False,
'ssl_verify_cert': True
}
try:
connection = mysql.connector.connect(**oci_config)
print("OCI authentication successful")
connection.close()
except mysql.connector.Error as err:
print(f"OCI authentication failed: {err}")import mysql.connector
# WebAuthn authentication configuration
webauthn_config = {
'host': 'mysql.enterprise.com',
'user': 'webauthn_user',
'database': 'mydatabase',
'auth_plugin': 'authentication_webauthn_client',
'authentication_webauthn_client_mode': 1, # Platform authenticator mode
'ssl_disabled': False, # SSL required for WebAuthn
'ssl_verify_cert': True
}
try:
# Note: WebAuthn would typically require user interaction with hardware token
connection = mysql.connector.connect(**webauthn_config)
print("WebAuthn authentication successful")
connection.close()
except mysql.connector.Error as err:
print(f"WebAuthn authentication failed: {err}")import mysql.connector
from mysql.connector.authentication import MySQLAuthenticationPlugin
class CustomAuthPlugin(MySQLAuthenticationPlugin):
"""Custom authentication plugin implementation."""
plugin_name = 'custom_auth_plugin'
requires_ssl = True
def prepare_password(self, password: str) -> bytes:
"""Custom password preparation logic."""
# Implement custom password hashing/preparation
return password.encode('utf-8')
def auth_response(self, auth_data: bytes, password: str = '', **kwargs) -> bytes:
"""Custom authentication response logic."""
# Implement custom authentication challenge response
prepared_password = self.prepare_password(password)
# Custom logic to combine with auth_data
return prepared_password
# Register custom plugin
auth_plugin_map = {
'custom_auth_plugin': CustomAuthPlugin
}
connection = mysql.connector.connect(
host='localhost',
user='custom_user',
password='custom_password',
database='mydatabase',
auth_plugin='custom_auth_plugin',
auth_plugin_map=auth_plugin_map
)
print("Custom authentication successful")
connection.close()import mysql.connector
# Comprehensive security configuration
secure_config = {
'host': 'mysql.example.com',
'port': 3306,
'user': 'secure_user',
'password': 'strong_password_123!',
'database': 'mydatabase',
# Authentication
'auth_plugin': 'caching_sha2_password',
# SSL/TLS security
'ssl_disabled': False,
'ssl_verify_cert': True,
'ssl_verify_identity': True,
'ssl_cert': '/path/to/client-cert.pem',
'ssl_key': '/path/to/client-key.pem',
'ssl_ca': '/path/to/ca-cert.pem',
'tls_versions': ['TLSv1.2', 'TLSv1.3'],
'tls_ciphersuites': [
'TLS_AES_256_GCM_SHA384',
'TLS_CHACHA20_POLY1305_SHA256'
],
# Connection security
'connect_timeout': 10,
'read_timeout': 30,
'write_timeout': 30,
'compress': False, # Disable compression for security
# Application security
'autocommit': False, # Explicit transaction management
'allow_local_infile': False, # Disable LOCAL INFILE for security
'allow_local_infile_in_path': None,
# Session security
'sql_mode': 'STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE,ERROR_FOR_DIVISION_BY_ZERO'
}
try:
connection = mysql.connector.connect(**secure_config)
print("Secure connection established successfully")
# Verify connection security
cursor = connection.cursor()
cursor.execute("SHOW STATUS LIKE 'Ssl_cipher'")
result = cursor.fetchone()
if result:
print(f"SSL Cipher: {result[1]}")
cursor.execute("SELECT @@ssl_ca, @@ssl_cert, @@ssl_key")
ssl_info = cursor.fetchone()
print(f"SSL Configuration: CA={ssl_info[0]}, Cert={ssl_info[1]}, Key={ssl_info[2]}")
cursor.close()
connection.close()
except mysql.connector.Error as err:
print(f"Secure connection failed: {err}")import mysql.connector
def connect_with_fallback_auth(host: str, user: str, password: str, database: str):
"""Connect with authentication plugin fallback."""
# Try authentication methods in order of preference
auth_methods = [
'caching_sha2_password', # MySQL 8.0+ default
'mysql_native_password', # Legacy compatibility
'sha256_password' # SHA256 for secure environments
]
for auth_plugin in auth_methods:
try:
connection = mysql.connector.connect(
host=host,
user=user,
password=password,
database=database,
auth_plugin=auth_plugin,
ssl_disabled=False,
ssl_verify_cert=True
)
print(f"Connected successfully using {auth_plugin}")
return connection
except mysql.connector.Error as err:
print(f"Authentication with {auth_plugin} failed: {err}")
continue
raise mysql.connector.Error("All authentication methods failed")
# Usage
try:
connection = connect_with_fallback_auth(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase'
)
# Use connection
cursor = connection.cursor()
cursor.execute("SELECT 'Authentication successful' as message")
result = cursor.fetchone()
print(result[0])
cursor.close()
connection.close()
except mysql.connector.Error as err:
print(f"Connection failed: {err}")Install with Tessl CLI
npx tessl i tessl/pypi-mysql-connector-python