Apache Airflow provider package enabling MySQL database integration with hooks, operators, and transfer functionality for data workflows.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core MySQL database connectivity and operations through MySqlHook. This hook provides comprehensive database interaction capabilities including connection management, query execution, bulk operations, and support for multiple MySQL client libraries.
The MySqlHook class extends Airflow's DbApiHook to provide MySQL-specific functionality with support for multiple client libraries and authentication methods.
class MySqlHook(DbApiHook):
"""
Interact with MySQL databases.
Attributes:
- conn_name_attr: "mysql_conn_id"
- default_conn_name: "mysql_default"
- conn_type: "mysql"
- hook_name: "MySQL"
- supports_autocommit: True
"""
def __init__(
self,
*args,
schema: str = None,
local_infile: bool = False,
init_command: str = None,
**kwargs
):
"""
Initialize MySQL hook.
Parameters:
- schema: MySQL database schema to connect to
- local_infile: Enable local_infile MySQL feature (default: False)
- init_command: Initial command to issue upon connection
"""Establish and manage connections to MySQL databases with support for multiple client libraries and authentication methods.
def get_conn(self) -> MySQLConnectionTypes:
"""
Get connection to a MySQL database.
Establishes connection by extracting configuration from Airflow connection.
Supports mysqlclient (default) and mysql-connector-python libraries.
Returns:
MySQL connection object (MySQLdb or mysql.connector connection)
Raises:
RuntimeError: If required MySQL client library is not installed
ValueError: If unknown MySQL client name is provided
AirflowOptionalProviderFeatureException: If optional dependency missing
"""
def get_uri(self) -> str:
"""
Get URI for MySQL connection.
Generates connection URI based on client library and connection parameters.
Returns:
Connection URI string (mysql:// or mysql+mysqlconnector://)
"""Manage transaction autocommit behavior across different MySQL client libraries.
def set_autocommit(self, conn: MySQLConnectionTypes, autocommit: bool) -> None:
"""
Set autocommit mode for MySQL connection.
Handles differences between mysqlclient (uses method) and
mysql-connector-python (uses property) libraries.
Parameters:
- conn: MySQL connection object
- autocommit: Enable/disable autocommit
"""
def get_autocommit(self, conn: MySQLConnectionTypes) -> bool:
"""
Get current autocommit setting for MySQL connection.
Parameters:
- conn: MySQL connection object
Returns:
Current autocommit setting (True/False)
"""Efficient data loading and dumping operations for large datasets using MySQL's native bulk operations.
def bulk_load(self, table: str, tmp_file: str) -> None:
"""
Load tab-delimited file into database table using LOAD DATA LOCAL INFILE.
Parameters:
- table: Target table name (validated for safety)
- tmp_file: Path to tab-delimited file
Raises:
ValueError: If table name contains invalid characters
"""
def bulk_dump(self, table: str, tmp_file: str) -> None:
"""
Dump database table into tab-delimited file using SELECT INTO OUTFILE.
Parameters:
- table: Source table name (validated for safety)
- tmp_file: Output file path
Raises:
ValueError: If table name contains invalid characters
"""
def bulk_load_custom(
self,
table: str,
tmp_file: str,
duplicate_key_handling: str = "IGNORE",
extra_options: str = ""
) -> None:
"""
Load data with configurable options using LOAD DATA LOCAL INFILE.
Warning: This function has security implications according to MySQL docs.
Parameters:
- table: Target table name
- tmp_file: Path to data file
- duplicate_key_handling: "IGNORE" or "REPLACE" for duplicate handling
- extra_options: Additional SQL options for LOAD DATA statement
"""Support for AWS IAM database authentication for secure, token-based MySQL connections.
def get_iam_token(self, conn: Connection) -> tuple[str, int]:
"""
Retrieve temporary password for AWS IAM authentication to MySQL.
Uses AWS RDS generate_db_auth_token to create temporary password.
Parameters:
- conn: Airflow connection with IAM configuration
Returns:
Tuple of (temporary_password, port)
Configuration in connection extra:
{"iam": true, "aws_conn_id": "aws_default"}
"""Data lineage and metadata support for OpenLineage tracking systems.
def get_openlineage_database_info(self, connection) -> DatabaseInfo:
"""
Return MySQL-specific information for OpenLineage data lineage.
Parameters:
- connection: Database connection
Returns:
DatabaseInfo object with MySQL schema and authority information
"""
def get_openlineage_database_dialect(self, _) -> str:
"""
Return database dialect identifier.
Returns:
"mysql"
"""
def get_openlineage_default_schema(self) -> None:
"""
Return default schema (MySQL has no schema concept).
Returns:
None
"""Handle data type conversion for MySQL database operations.
@staticmethod
def _serialize_cell(cell: object, conn: Connection = None) -> Any:
"""
Convert argument to database literal.
MySQLdb handles serialization automatically, so this method
returns the cell unchanged.
Parameters:
- cell: Data to serialize
- conn: Database connection (unused)
Returns:
Unchanged cell value
"""from airflow.providers.mysql.hooks.mysql import MySqlHook
# Create hook with default connection
hook = MySqlHook(mysql_conn_id='mysql_default')
# Execute query and fetch results
records = hook.get_records('SELECT * FROM users WHERE active = %s', (True,))
# Execute single query
hook.run('UPDATE users SET last_login = NOW() WHERE id = %s', parameters=(user_id,))# Load data from CSV file
hook = MySqlHook(mysql_conn_id='mysql_default', local_infile=True)
# Simple bulk load (tab-delimited)
hook.bulk_load('staging_table', '/tmp/data.tsv')
# Custom bulk load with duplicate handling
hook.bulk_load_custom(
table='users',
tmp_file='/tmp/users.csv',
duplicate_key_handling='REPLACE',
extra_options='FIELDS TERMINATED BY "," ENCLOSED BY "\""'
)# Configure connection with IAM authentication
# Connection extra: {"iam": true, "aws_conn_id": "aws_default"}
hook = MySqlHook(mysql_conn_id='mysql_iam_conn')
connection = hook.get_conn() # Uses temporary IAM tokenhook = MySqlHook(mysql_conn_id='mysql_default')
uri = hook.get_uri() # Returns: mysql://user:pass@host:port/database# MySQL connection type union
MySQLConnectionTypes = Union[MySQLdbConnection, MySQLConnectionAbstract]
# Connection extra configuration
ConnectionExtra = {
"charset": str, # Character encoding (e.g., "utf8")
"cursor": str, # Cursor type ("SSCursor", "DictCursor", "SSDictCursor")
"ssl": dict, # SSL configuration dictionary
"ssl_mode": str, # SSL mode ("REQUIRED", "PREFERRED", etc.)
"unix_socket": str, # Unix socket path
"client": str, # Client library ("mysqlclient", "mysql-connector-python")
"iam": bool, # Enable AWS IAM authentication
"aws_conn_id": str # AWS connection ID for IAM
}Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-mysql