MySQL driver written in Python providing comprehensive database connectivity with both traditional SQL and modern document operations
—
Integration with MySQL Fabric for high availability and scaling, supporting automatic failover, load balancing, and sharding configurations.
Create and manage MySQL Fabric connections for high availability and scalability.
def connect(**config):
"""
Create MySQL Fabric connection with high availability support.
Parameters:
- fabric (dict): Fabric configuration with host, port, user, password
- group (str): Server group name for connection routing
- key (str): Sharding key for database sharding
- tables (list): Tables to include in sharding scope
- mode (int): Connection mode (MODE_READONLY or MODE_READWRITE)
- attempts (int): Number of connection attempts (default: 3)
- attempt_delay (int): Delay between attempts in seconds (default: 1)
- **config: Additional MySQL connection parameters
Returns:
MySQLFabricConnection: Fabric-aware connection object
Raises:
InterfaceError: Fabric connection failed
"""
MODE_READONLY = 1
MODE_READWRITE = 2
class MySQLFabricConnection:
"""
MySQL Fabric connection wrapper providing high availability features.
Manages connections to MySQL Fabric infrastructure for automatic
failover, load balancing, and database sharding across multiple
MySQL server instances.
"""
def __init__(self, **config):
"""
Initialize Fabric connection.
Parameters:
- **config: Fabric and MySQL connection configuration
"""
def connect(self, **config):
"""Establish connection through Fabric"""
def disconnect(self):
"""Disconnect from Fabric and MySQL servers"""
def is_connected(self):
"""Check if connection is active"""
def reset_cache(self):
"""Reset Fabric server group cache"""
def set_property(self, **properties):
"""Set connection properties"""
@property
def fabric(self):
"""Get Fabric interface object"""
@property
def group(self):
"""Current server group name"""
@property
def mode(self):
"""Current connection mode"""Usage Example:
import mysql.connector.fabric as fabric
# Basic Fabric connection
config = {
'fabric': {
'host': 'fabric-host.example.com',
'port': 32274,
'user': 'fabric_user',
'password': 'fabric_password'
},
'user': 'mysql_user',
'password': 'mysql_password',
'database': 'myapp',
'group': 'myapp_servers',
'mode': fabric.MODE_READWRITE
}
connection = fabric.connect(**config)
cursor = connection.cursor()
# Execute queries - Fabric handles server selection
cursor.execute("SELECT COUNT(*) FROM users")
result = cursor.fetchone()
print(f"User count: {result[0]}")
cursor.close()
connection.close()Direct interface to MySQL Fabric management system.
class Fabric:
"""
Direct interface to MySQL Fabric management system.
Provides low-level access to Fabric services for server group
management, sharding operations, and high availability configuration.
"""
def __init__(self, host='localhost', port=32274, user='', password='', **kwargs):
"""
Initialize Fabric interface.
Parameters:
- host (str): Fabric server hostname
- port (int): Fabric server port (default: 32274)
- user (str): Fabric username
- password (str): Fabric password
- **kwargs: Additional connection parameters
"""
def connect(self):
"""Connect to Fabric server"""
def disconnect(self):
"""Disconnect from Fabric server"""
def is_connected(self):
"""Check if connected to Fabric"""
def get_group_servers(self, group_id, mode=None):
"""
Get servers in a group.
Parameters:
- group_id (str): Server group identifier
- mode (int): Connection mode filter
Returns:
list: Server information tuples
"""
def get_group_server(self, group_id, mode=MODE_READWRITE):
"""
Get primary server from group.
Parameters:
- group_id (str): Server group identifier
- mode (int): Connection mode
Returns:
tuple: Server information (host, port, weight, status)
"""
def report_error(self, server_uuid, error_code, error_message):
"""
Report server error to Fabric.
Parameters:
- server_uuid (str): Server UUID
- error_code (int): Error code
- error_message (str): Error description
"""
def report_failure(self, server_uuid):
"""
Report server failure to Fabric.
Parameters:
- server_uuid (str): Server UUID that failed
"""
def lookup_servers(self, tables, key=None, group=None, mode=MODE_READWRITE):
"""
Lookup servers for sharding key.
Parameters:
- tables (list): Table names for sharding
- key (str): Sharding key value
- group (str): Server group name
- mode (int): Connection mode
Returns:
list: Server information for sharding key
"""
def get_sharding_information(self, tables, scope=SCOPE_GLOBAL):
"""
Get sharding configuration information.
Parameters:
- tables (list): Table names
- scope (str): Sharding scope
Returns:
dict: Sharding configuration details
"""
# Sharding scope constants
SCOPE_GLOBAL = "GLOBAL"
SCOPE_LOCAL = "LOCAL"Database sharding configuration and management through Fabric.
def connect(group=None, key=None, tables=None, scope=SCOPE_GLOBAL, mode=MODE_READWRITE, **config):
"""
Connect with sharding support.
Parameters:
- group (str): Server group for non-sharded tables
- key (str): Sharding key value for routing
- tables (list): Tables involved in sharded operations
- scope (str): Sharding scope (GLOBAL or LOCAL)
- mode (int): Connection mode (readonly/readwrite)
- **config: Fabric and MySQL connection configuration
Returns:
MySQLFabricConnection: Sharding-aware connection
"""Usage Example:
import mysql.connector.fabric as fabric
# Sharded connection configuration
config = {
'fabric': {
'host': 'fabric.example.com',
'user': 'fabric_admin',
'password': 'fabric_secret'
},
'user': 'app_user',
'password': 'app_password',
'database': 'ecommerce'
}
# Connect for specific customer (sharding key)
customer_id = 12345
connection = fabric.connect(
key=str(customer_id),
tables=['orders', 'order_items', 'payments'],
mode=fabric.MODE_READWRITE,
**config
)
cursor = connection.cursor()
# Queries automatically routed to correct shard
cursor.execute(
"SELECT * FROM orders WHERE customer_id = %s",
(customer_id,)
)
orders = cursor.fetchall()
# Insert into sharded table
cursor.execute(
"INSERT INTO orders (customer_id, total) VALUES (%s, %s)",
(customer_id, 99.99)
)
connection.commit()
cursor.close()
connection.close()Automatic failover and load balancing configuration.
def connect(group=None, mode=MODE_READWRITE, attempts=3, attempt_delay=1, **config):
"""
Connect with high availability features.
Parameters:
- group (str): Server group name for HA
- mode (int): Connection mode for server selection
- attempts (int): Number of failover attempts
- attempt_delay (int): Delay between failover attempts
- **config: Connection configuration
Returns:
MySQLFabricConnection: HA-enabled connection
"""
class MySQLFabricConnection:
"""Connection with automatic failover support"""
def failover(self):
"""Trigger manual failover to backup server"""
def get_server_uuid(self):
"""Get UUID of currently connected server"""
def report_connection_error(self, error):
"""Report connection error for failover decision"""Usage Example:
import mysql.connector.fabric as fabric
# High availability configuration
ha_config = {
'fabric': {
'host': 'fabric-manager.example.com',
'user': 'ha_admin',
'password': 'ha_secret'
},
'user': 'app_user',
'password': 'app_password',
'database': 'production',
'group': 'production_masters',
'mode': fabric.MODE_READWRITE,
'attempts': 5,
'attempt_delay': 2
}
try:
connection = fabric.connect(**ha_config)
cursor = connection.cursor()
# Application queries with automatic failover
cursor.execute("SELECT COUNT(*) FROM active_sessions")
session_count = cursor.fetchone()[0]
print(f"Active sessions: {session_count}")
except fabric.DatabaseError as err:
print(f"Database error with failover: {err}")
finally:
if 'connection' in locals():
cursor.close()
connection.close()
# Read-only connections for reporting
readonly_config = ha_config.copy()
readonly_config.update({
'group': 'production_slaves',
'mode': fabric.MODE_READONLY
})
ro_connection = fabric.connect(**readonly_config)
ro_cursor = ro_connection.cursor()
# Read-only queries distributed across slaves
ro_cursor.execute("SELECT DATE(created_at), COUNT(*) FROM users GROUP BY DATE(created_at)")
daily_signups = ro_cursor.fetchall()
for date, count in daily_signups:
print(f"{date}: {count} signups")
ro_cursor.close()
ro_connection.close()Administrative operations for Fabric server groups.
class Fabric:
"""Administrative interface for Fabric management"""
def create_group(self, group_id, description=''):
"""
Create new server group.
Parameters:
- group_id (str): Unique group identifier
- description (str): Group description
"""
def destroy_group(self, group_id):
"""
Delete server group.
Parameters:
- group_id (str): Group identifier to delete
"""
def add_server(self, group_id, host, port, weight=1.0):
"""
Add server to group.
Parameters:
- group_id (str): Target group identifier
- host (str): MySQL server hostname
- port (int): MySQL server port
- weight (float): Server weight for load balancing
"""
def remove_server(self, server_uuid):
"""
Remove server from all groups.
Parameters:
- server_uuid (str): Server UUID to remove
"""
def promote_server(self, group_id, server_uuid):
"""
Promote server to master in group.
Parameters:
- group_id (str): Group identifier
- server_uuid (str): Server UUID to promote
"""
def set_server_weight(self, server_uuid, weight):
"""
Set server weight for load balancing.
Parameters:
- server_uuid (str): Server UUID
- weight (float): New weight value
"""FabricConfig = {
'fabric': {
'host': str,
'port': int,
'user': str,
'password': str,
'connection_timeout': int,
'connection_attempts': int,
'connection_delay': int
},
'group': str,
'key': str,
'tables': list[str],
'scope': str,
'mode': int,
'attempts': int,
'attempt_delay': int
}
ServerInfo = tuple[
str, # host
int, # port
float, # weight
str, # status
str # mode
]
ShardingInfo = {
'shard_id': int,
'group_id': str,
'global_group': str,
'state': str,
'sharding_type': str,
'sharding_key': str
}Install with Tessl CLI
npx tessl i tessl/pypi-mysql-connector