CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-mysql

Apache Airflow provider package enabling MySQL database integration with hooks, operators, and transfer functionality for data workflows.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

database-operations.mddocs/

Database Operations

Core MySQL database connectivity and operations through MySqlHook. This hook provides comprehensive database interaction capabilities including connection management, query execution, bulk operations, and support for multiple MySQL client libraries.

Capabilities

MySQL Hook

The MySqlHook class extends Airflow's DbApiHook to provide MySQL-specific functionality with support for multiple client libraries and authentication methods.

class MySqlHook(DbApiHook):
    """
    Interact with MySQL databases.
    
    Attributes:
    - conn_name_attr: "mysql_conn_id"
    - default_conn_name: "mysql_default" 
    - conn_type: "mysql"
    - hook_name: "MySQL"
    - supports_autocommit: True
    """
    
    def __init__(
        self,
        *args,
        schema: str = None,
        local_infile: bool = False,
        init_command: str = None,
        **kwargs
    ):
        """
        Initialize MySQL hook.
        
        Parameters:
        - schema: MySQL database schema to connect to
        - local_infile: Enable local_infile MySQL feature (default: False)
        - init_command: Initial command to issue upon connection
        """

Connection Management

Establish and manage connections to MySQL databases with support for multiple client libraries and authentication methods.

def get_conn(self) -> MySQLConnectionTypes:
    """
    Get connection to a MySQL database.
    
    Establishes connection by extracting configuration from Airflow connection.
    Supports mysqlclient (default) and mysql-connector-python libraries.
    
    Returns:
    MySQL connection object (MySQLdb or mysql.connector connection)
    
    Raises:
    RuntimeError: If required MySQL client library is not installed
    ValueError: If unknown MySQL client name is provided
    AirflowOptionalProviderFeatureException: If optional dependency missing
    """

def get_uri(self) -> str:
    """
    Get URI for MySQL connection.
    
    Generates connection URI based on client library and connection parameters.
    
    Returns:
    Connection URI string (mysql:// or mysql+mysqlconnector://)
    """

Autocommit Control

Manage transaction autocommit behavior across different MySQL client libraries.

def set_autocommit(self, conn: MySQLConnectionTypes, autocommit: bool) -> None:
    """
    Set autocommit mode for MySQL connection.
    
    Handles differences between mysqlclient (uses method) and 
    mysql-connector-python (uses property) libraries.
    
    Parameters:
    - conn: MySQL connection object
    - autocommit: Enable/disable autocommit
    """

def get_autocommit(self, conn: MySQLConnectionTypes) -> bool:
    """
    Get current autocommit setting for MySQL connection.
    
    Parameters:
    - conn: MySQL connection object
    
    Returns:
    Current autocommit setting (True/False)
    """

Bulk Data Operations

Efficient data loading and dumping operations for large datasets using MySQL's native bulk operations.

def bulk_load(self, table: str, tmp_file: str) -> None:
    """
    Load tab-delimited file into database table using LOAD DATA LOCAL INFILE.
    
    Parameters:
    - table: Target table name (validated for safety)
    - tmp_file: Path to tab-delimited file
    
    Raises:
    ValueError: If table name contains invalid characters
    """

def bulk_dump(self, table: str, tmp_file: str) -> None:
    """
    Dump database table into tab-delimited file using SELECT INTO OUTFILE.
    
    Parameters:
    - table: Source table name (validated for safety)  
    - tmp_file: Output file path
    
    Raises:
    ValueError: If table name contains invalid characters
    """

def bulk_load_custom(
    self,
    table: str,
    tmp_file: str,
    duplicate_key_handling: str = "IGNORE",
    extra_options: str = ""
) -> None:
    """
    Load data with configurable options using LOAD DATA LOCAL INFILE.
    
    Warning: This function has security implications according to MySQL docs.
    
    Parameters:
    - table: Target table name
    - tmp_file: Path to data file
    - duplicate_key_handling: "IGNORE" or "REPLACE" for duplicate handling
    - extra_options: Additional SQL options for LOAD DATA statement
    """

AWS IAM Authentication

Support for AWS IAM database authentication for secure, token-based MySQL connections.

def get_iam_token(self, conn: Connection) -> tuple[str, int]:
    """
    Retrieve temporary password for AWS IAM authentication to MySQL.
    
    Uses AWS RDS generate_db_auth_token to create temporary password.
    
    Parameters:
    - conn: Airflow connection with IAM configuration
    
    Returns:
    Tuple of (temporary_password, port)
    
    Configuration in connection extra:
    {"iam": true, "aws_conn_id": "aws_default"}
    """

OpenLineage Integration

Data lineage and metadata support for OpenLineage tracking systems.

def get_openlineage_database_info(self, connection) -> DatabaseInfo:
    """
    Return MySQL-specific information for OpenLineage data lineage.
    
    Parameters:
    - connection: Database connection
    
    Returns:
    DatabaseInfo object with MySQL schema and authority information
    """

def get_openlineage_database_dialect(self, _) -> str:
    """
    Return database dialect identifier.
    
    Returns:
    "mysql"
    """

def get_openlineage_default_schema(self) -> None:
    """
    Return default schema (MySQL has no schema concept).
    
    Returns:
    None
    """

Data Serialization

Handle data type conversion for MySQL database operations.

@staticmethod
def _serialize_cell(cell: object, conn: Connection = None) -> Any:
    """
    Convert argument to database literal.
    
    MySQLdb handles serialization automatically, so this method
    returns the cell unchanged.
    
    Parameters:
    - cell: Data to serialize
    - conn: Database connection (unused)
    
    Returns:
    Unchanged cell value
    """

Usage Examples

Basic Database Connection

from airflow.providers.mysql.hooks.mysql import MySqlHook

# Create hook with default connection
hook = MySqlHook(mysql_conn_id='mysql_default')

# Execute query and fetch results
records = hook.get_records('SELECT * FROM users WHERE active = %s', (True,))

# Execute single query
hook.run('UPDATE users SET last_login = NOW() WHERE id = %s', parameters=(user_id,))

Bulk Data Loading

# Load data from CSV file
hook = MySqlHook(mysql_conn_id='mysql_default', local_infile=True)

# Simple bulk load (tab-delimited)
hook.bulk_load('staging_table', '/tmp/data.tsv')

# Custom bulk load with duplicate handling
hook.bulk_load_custom(
    table='users',
    tmp_file='/tmp/users.csv',
    duplicate_key_handling='REPLACE',
    extra_options='FIELDS TERMINATED BY "," ENCLOSED BY "\""'
)

AWS IAM Authentication

# Configure connection with IAM authentication
# Connection extra: {"iam": true, "aws_conn_id": "aws_default"}

hook = MySqlHook(mysql_conn_id='mysql_iam_conn')
connection = hook.get_conn()  # Uses temporary IAM token

Connection URI Generation

hook = MySqlHook(mysql_conn_id='mysql_default')
uri = hook.get_uri()  # Returns: mysql://user:pass@host:port/database

Type Definitions

# MySQL connection type union
MySQLConnectionTypes = Union[MySQLdbConnection, MySQLConnectionAbstract]

# Connection extra configuration
ConnectionExtra = {
    "charset": str,              # Character encoding (e.g., "utf8")
    "cursor": str,               # Cursor type ("SSCursor", "DictCursor", "SSDictCursor")
    "ssl": dict,                 # SSL configuration dictionary
    "ssl_mode": str,             # SSL mode ("REQUIRED", "PREFERRED", etc.)
    "unix_socket": str,          # Unix socket path
    "client": str,               # Client library ("mysqlclient", "mysql-connector-python")
    "iam": bool,                 # Enable AWS IAM authentication
    "aws_conn_id": str           # AWS connection ID for IAM
}

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-mysql

docs

asset-uri-handling.md

data-transfer-operations.md

database-operations.md

index.md

tile.json