Python interface to MySQL databases implementing the Python Database API version 2.0 specification.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Direct access to MySQL C API functionality through the _mysql module, providing fine-grained control over database operations, performance optimization, and advanced features not available through the high-level interface.
Core functions for database connectivity and utility operations at the C API level.
def connect(**kwargs):
"""
Create low-level database connection directly to MySQL C API.
Parameters:
- host (str): MySQL server hostname
- user (str): Username for authentication
- passwd (str): Password for authentication
- db (str): Database name to connect to
- port (int): TCP port number
- unix_socket (str): Unix socket path
- client_flag (int): Client connection flags
- And other connection parameters...
Returns:
_mysql.connection: Low-level connection object
"""
def debug(s):
"""
Set debug options for MySQL client library.
Parameters:
- s (str): Debug option string (e.g., 'd:t:o,/tmp/mysql.trace')
"""
def escape(s, dict):
"""
Escape string using character set information from connection.
Parameters:
- s (str): String to escape
- dict (dict): Character set mapping dictionary
Returns:
str: Escaped string safe for SQL queries
"""
def escape_dict(dict, dict2):
"""
Escape all string values in a dictionary.
Parameters:
- dict (dict): Dictionary with values to escape
- dict2 (dict): Character set mapping dictionary
Returns:
dict: Dictionary with escaped string values
"""
def escape_sequence(seq, dict):
"""
Escape all string values in a sequence.
Parameters:
- seq (sequence): Sequence with values to escape
- dict (dict): Character set mapping dictionary
Returns:
tuple: Tuple with escaped string values
"""
def escape_string(s):
"""
Escape string for SQL (deprecated - use escape instead).
Parameters:
- s (str): String to escape
Returns:
str: Escaped string
"""
def string_literal(obj):
"""
Convert Python object to SQL string literal.
Parameters:
- obj: Python object to convert
Returns:
str: SQL string literal representation
"""
def get_client_info():
"""
Get MySQL client library version information.
Returns:
str: Client library version string
"""Functions for managing embedded MySQL server instances.
def server_init(args):
"""
Initialize embedded MySQL server.
Parameters:
- args (list): Server initialization arguments
Returns:
int: 0 on success, non-zero on error
"""
def server_end():
"""
Shutdown embedded MySQL server.
"""
def thread_safe():
"""
Check if MySQL client library is thread-safe.
Returns:
bool: True if thread-safe, False otherwise
"""Constants and version information from the _mysql module.
NULL: object # Special NULL value representation
version_info: tuple # Module version information tuple (major, minor, patch)Direct connection object with C API methods for maximum performance and control.
class connection:
"""Low-level connection object from _mysql module."""
def affected_rows(self):
"""
Get number of rows affected by last statement.
Returns:
int: Number of affected rows
"""
def autocommit(self, flag=None):
"""
Set or get autocommit mode.
Parameters:
- flag (bool, optional): True to enable, False to disable, None to query
Returns:
bool: Current autocommit status (when flag is None)
"""
def get_autocommit(self):
"""
Get current autocommit status.
Returns:
bool: True if autocommit enabled, False otherwise
"""
def commit(self):
"""Commit current transaction."""
def rollback(self):
"""Rollback current transaction."""
def close(self):
"""Close database connection and free resources."""
def query(self, s):
"""
Execute SQL query directly.
Parameters:
- s (str): SQL query string
Returns:
int: 0 on success, non-zero on error
"""
def store_result(self):
"""
Store complete result set in memory.
Returns:
_mysql.result: Result object or None if no result set
"""
def use_result(self):
"""
Initialize result set retrieval (streaming mode).
Returns:
_mysql.result: Result object or None if no result set
"""
def next_result(self):
"""
Move to next result set (for multiple result sets).
Returns:
int: 0 if more results, -1 if no more results, >0 on error
"""
def character_set_name(self):
"""
Get current character set name.
Returns:
str: Character set name
"""
def get_host_info(self):
"""
Get host connection information.
Returns:
str: Host connection description
"""
def get_proto_info(self):
"""
Get protocol version information.
Returns:
int: Protocol version number
"""
def get_server_info(self):
"""
Get server version information.
Returns:
str: Server version string
"""
def info(self):
"""
Get information about last query.
Returns:
str: Query information string or None
"""
def insert_id(self):
"""
Get last AUTO_INCREMENT value inserted.
Returns:
int: Last insert ID
"""
def kill(self, pid):
"""
Kill MySQL server thread.
Parameters:
- pid (int): Thread ID to kill
Returns:
int: 0 on success, non-zero on error
"""
def ping(self):
"""
Check if connection is alive.
Returns:
int: 0 if connection alive, non-zero if dead
"""
def select_db(self, db):
"""
Select database for connection.
Parameters:
- db (str): Database name
Returns:
int: 0 on success, non-zero on error
"""
def shutdown(self):
"""
Shutdown MySQL server.
Returns:
int: 0 on success, non-zero on error
"""
def stat(self):
"""
Get server statistics.
Returns:
str: Server status string
"""
def thread_id(self):
"""
Get connection thread ID.
Returns:
int: Thread ID
"""
def warning_count(self):
"""
Get number of warnings from last operation.
Returns:
int: Warning count
"""Result object for direct access to query results with C API performance.
class result:
"""Low-level result object from _mysql module."""
def data_seek(self, n):
"""
Seek to specific row number in result set.
Parameters:
- n (int): Row number to seek to (0-based)
"""
def row_seek(self, offset):
"""
Seek to row using row offset.
Parameters:
- offset: Row offset object from row_tell()
Returns:
offset: Previous row offset
"""
def row_tell(self):
"""
Get current row position.
Returns:
offset: Current row offset object
"""
def describe(self):
"""
Get result set field descriptions.
Returns:
tuple: Tuple of field description tuples
"""
def fetch_row(self, maxrows=1, how=1):
"""
Fetch rows from result set.
Parameters:
- maxrows (int): Maximum number of rows to fetch
- how (int): Row format (1=tuple, 2=dict)
Returns:
tuple: Tuple of rows in requested format
"""
def field_flags(self):
"""
Get field flags for all columns.
Returns:
tuple: Tuple of field flag integers
"""
def num_fields(self):
"""
Get number of fields in result set.
Returns:
int: Number of fields
"""
def num_rows(self):
"""
Get number of rows in result set.
Returns:
int: Number of rows
"""import _mysql
# Create low-level connection
conn = _mysql.connect(host="localhost", user="user", passwd="pass", db="test")
# Execute query directly
conn.query("SELECT id, name FROM users WHERE active = 1")
# Get result set
result = conn.store_result()
if result:
# Fetch all rows as tuples
rows = result.fetch_row(maxrows=0) # 0 means all rows
for row in rows:
print(f"User ID {row[0]}: {row[1]}")
conn.close()import _mysql
conn = _mysql.connect(host="localhost", user="user", passwd="pass", db="test")
# Check connection performance
start_time = time.time()
result = conn.ping()
ping_time = time.time() - start_time
if result == 0:
print(f"Connection alive, ping time: {ping_time:.4f}s")
else:
print("Connection dead")
# Get detailed connection info
print(f"Server info: {conn.get_server_info()}")
print(f"Host info: {conn.get_host_info()}")
print(f"Protocol: {conn.get_proto_info()}")
print(f"Thread ID: {conn.thread_id()}")
conn.close()import _mysql
conn = _mysql.connect(host="localhost", user="user", passwd="pass", db="test")
try:
# Begin transaction
conn.autocommit(False)
# Process multiple queries efficiently
queries = [
"INSERT INTO log (message) VALUES ('Batch 1')",
"INSERT INTO log (message) VALUES ('Batch 2')",
"INSERT INTO log (message) VALUES ('Batch 3')"
]
for query in queries:
result = conn.query(query)
if result != 0:
raise Exception(f"Query failed: {query}")
# Commit batch
conn.commit()
print(f"Processed {len(queries)} queries successfully")
except Exception as e:
conn.rollback()
print(f"Batch failed: {e}")
finally:
conn.close()import _mysql
# Escape potentially dangerous strings
user_input = "'; DROP TABLE users; --"
escaped = _mysql.escape_string(user_input)
print(f"Original: {user_input}")
print(f"Escaped: {escaped}")
# Escape dictionary values
data = {
'name': "O'Connor",
'comment': 'Said "Hello world!"',
'sql_injection': "'; DELETE FROM users; --"
}
# Note: escape_dict needs connection context for proper character set handling
conn = _mysql.connect(host="localhost", user="user", passwd="pass", db="test")
escaped_data = _mysql.escape_dict(data, conn.converter)
print(f"Escaped data: {escaped_data}")
conn.close()import _mysql
conn = _mysql.connect(host="localhost", user="user", passwd="pass", db="test")
conn.query("SELECT id, name, email, created_at FROM users")
result = conn.store_result()
if result:
# Get result metadata
print(f"Fields: {result.num_fields()}")
print(f"Rows: {result.num_rows()}")
# Get field descriptions
descriptions = result.describe()
for i, desc in enumerate(descriptions):
field_name, field_type, max_length, flags = desc[:4]
print(f"Field {i}: {field_name} (type: {field_type})")
# Fetch rows with different formats
result.data_seek(0) # Reset to beginning
# Fetch as tuples
tuple_rows = result.fetch_row(maxrows=2, how=1)
print(f"Tuple format: {tuple_rows}")
# Fetch as dictionaries
dict_rows = result.fetch_row(maxrows=2, how=2)
print(f"Dictionary format: {dict_rows}")
conn.close()import _mysql
# Initialize embedded server
server_args = [
"embedded_server",
"--datadir=/var/lib/mysql",
"--language=/usr/share/mysql/english"
]
result = _mysql.server_init(server_args)
if result != 0:
print("Failed to initialize embedded server")
exit(1)
try:
# Connect to embedded server
conn = _mysql.connect(db="test")
# Use embedded server normally
conn.query("SHOW TABLES")
result = conn.store_result()
if result:
tables = result.fetch_row(maxrows=0)
for table in tables:
print(f"Table: {table[0]}")
conn.close()
finally:
# Always shutdown embedded server
_mysql.server_end()Advanced methods for result set navigation and manipulation.
class _mysql.result:
def data_seek(self, n):
"""
Seek to specific row number in result set.
Parameters:
- n (int): Row number to seek to (0-based)
"""
def row_seek(self, offset):
"""
Seek by offset from current position in result set.
Parameters:
- offset (int): Number of rows to move (can be negative)
"""
def row_tell(self):
"""
Get current row position in result set.
Returns:
int: Current row number (0-based)
"""import _mysql
conn = _mysql.connect(host="localhost", user="user", passwd="pass", db="test")
conn.query("SELECT id, name FROM users ORDER BY id")
result = conn.store_result()
if result:
# Get current position
pos = result.row_tell()
print(f"Current position: {pos}")
# Seek to specific row
result.data_seek(5)
row = result.fetch_row()
if row:
print(f"Row 5: {row[0]}")
# Seek relative to current position
result.row_seek(-2) # Go back 2 rows
row = result.fetch_row()
if row:
print(f"Row 3: {row[0]}")
# Get new position
pos = result.row_tell()
print(f"New position: {pos}")
conn.close()import _mysql
import threading
def worker():
"""Worker function for threading test."""
try:
conn = _mysql.connect(host="localhost", user="user", passwd="pass", db="test")
conn.query("SELECT CONNECTION_ID()")
result = conn.store_result()
if result:
connection_id = result.fetch_row()[0][0]
print(f"Thread {threading.current_thread().name}: Connection ID {connection_id}")
conn.close()
except Exception as e:
print(f"Thread {threading.current_thread().name}: Error {e}")
# Check if library is thread-safe
if _mysql.thread_safe():
print("MySQL library is thread-safe")
# Create multiple threads
threads = []
for i in range(3):
t = threading.Thread(target=worker, name=f"Worker-{i}")
threads.append(t)
t.start()
# Wait for all threads
for t in threads:
t.join()
else:
print("MySQL library is NOT thread-safe")Install with Tessl CLI
npx tessl i tessl/pypi-mysql-python