or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

asset-uri-handling.mddata-transfer-operations.mddatabase-operations.mdindex.md
tile.json

tessl/pypi-apache-airflow-providers-mysql

Apache Airflow provider package enabling MySQL database integration with hooks, operators, and transfer functionality for data workflows.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-mysql@6.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-mysql@6.3.0

index.mddocs/

Apache Airflow MySQL Provider

Provider package for Apache Airflow that enables comprehensive MySQL database integration within data workflows. This package provides hooks for database connections, transfer operators for moving data between MySQL and other systems, and asset URI handling for MySQL and MariaDB schemes.

Package Information

  • Package Name: apache-airflow-providers-mysql
  • Language: Python
  • Installation: pip install apache-airflow-providers-mysql

Core Imports

from airflow.providers.mysql.hooks.mysql import MySqlHook

For transfer operations:

from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
from airflow.providers.mysql.transfers.vertica_to_mysql import VerticaToMySqlOperator
from airflow.providers.mysql.transfers.presto_to_mysql import PrestoToMySqlOperator
from airflow.providers.mysql.transfers.trino_to_mysql import TrinoToMySqlOperator

For asset handling:

from airflow.providers.mysql.assets.mysql import sanitize_uri

Basic Usage

from airflow import DAG
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
from datetime import datetime, timedelta

# Define default DAG arguments
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# Create DAG
dag = DAG(
    'mysql_data_pipeline',
    default_args=default_args,
    description='MySQL data processing pipeline',
    schedule_interval=timedelta(days=1),
    catchup=False
)

# Use MySqlHook for direct database operations
def query_mysql_data():
    hook = MySqlHook(mysql_conn_id='mysql_default')
    records = hook.get_records('SELECT * FROM users WHERE created_date >= %s', (datetime.now().date(),))
    return records

# Use S3ToMySqlOperator for data transfer
s3_to_mysql_task = S3ToMySqlOperator(
    task_id='load_data_from_s3',
    s3_source_key='data/users.csv',
    mysql_table='staging.users',
    mysql_conn_id='mysql_default',
    aws_conn_id='aws_default',
    dag=dag
)

Architecture

The MySQL provider package follows Airflow's provider pattern with three main components:

  • Hooks: Low-level interfaces for database connections and operations (MySqlHook)
  • Operators: High-level task definitions for data transfers between systems
  • Assets: URI handlers for MySQL/MariaDB dataset tracking and lineage

The package supports multiple MySQL client libraries (mysqlclient, mysql-connector-python, aiomysql) with automatic client selection and configuration, providing flexibility for different deployment environments and requirements.

Capabilities

Database Operations

Core MySQL database connectivity and operations through MySqlHook, supporting multiple MySQL clients, connection management, bulk operations, and AWS IAM authentication.

class MySqlHook(DbApiHook):
    def __init__(self, schema=None, local_infile=False, init_command=None, **kwargs): ...
    def get_conn(self): ...
    def bulk_load(self, table: str, tmp_file: str) -> None: ...
    def bulk_dump(self, table: str, tmp_file: str) -> None: ...
    def get_uri(self) -> str: ...

Database Operations

Data Transfer Operations

Transfer operators for moving data from various source systems (S3, Vertica, Presto, Trino) into MySQL tables, with support for bulk loading and transformation options.

class S3ToMySqlOperator(BaseOperator): ...
class VerticaToMySqlOperator(BaseOperator): ...
class PrestoToMySqlOperator(BaseOperator): ...
class TrinoToMySqlOperator(BaseOperator): ...

Data Transfer Operations

Asset URI Handling

URI sanitization and validation for MySQL and MariaDB assets, ensuring proper format and default port assignment for dataset tracking and lineage.

def sanitize_uri(uri: SplitResult) -> SplitResult: ...

Asset URI Handling

Connection Configuration

MySQL connections are configured through Airflow's connection management system with the connection type mysql. The provider supports multiple MySQL client libraries and authentication methods including AWS IAM.

Connection Parameters

# Connection configuration options in extra field
{
    "charset": "utf8",           # Character set
    "cursor": "SSCursor",        # Cursor type (SSCursor, DictCursor, SSDictCursor)
    "ssl": {...},                # SSL configuration dictionary
    "ssl_mode": "REQUIRED",      # SSL mode
    "unix_socket": "/path/to/socket",  # Unix socket path
    "client": "mysqlclient",     # MySQL client library
    "iam": true,                 # Enable AWS IAM authentication
    "aws_conn_id": "aws_default" # AWS connection for IAM auth
}

Error Handling

The package includes comprehensive error handling for connection failures, authentication issues, and data transfer problems. Common exceptions include connection timeouts, authentication failures, and data format errors.

# Common exception types
RuntimeError  # Missing MySQL client libraries
ValueError    # Invalid table names or URI formats
AirflowOptionalProviderFeatureException  # Missing optional dependencies